Question

How to get the name of column with second highest value in pyspark dataframe

I have a PySpark dataframe looking like this:

id trx_holiday trx_takeout trx_pet max_value MAX
1 12.5 5.5 9.5 12.5 trx_holiday
2 3.0 14.0 6.7 14.0 trx_takeout

I want to create a second column MAX_2 that contains the category on which the client spent the second most.

I want to tweak the code below (see: how to get the name of column with maximum value in pyspark dataframe) by excluding the column_name in the MAX column from the withColumn statement creating the max_value column:

cond = "psf.when" + ".when".join(["(psf.col('" + c + "') == psf.col('max_value'), psf.lit('" + c + "'))" for c in df.columns]) 

df = df.withColumn("max_value", psf.greatest(*[c for c in columns])) \
       .withColumn("MAX", when(cond, 1).otherwise(0))

Ideally, the new dataframe should look like this:

id trx_holiday trx_takeout trx_pet max_value MAX max_value_2 MAX_2
1 12.5 5.5 9.5 12.5 trx_holiday 9.5 trx_pet
2 3.0 14.0 6.7 14.0 trx_takeout 6.7 trx_pet
 3  48  3
1 Jan 1970

Solution

 0

to achieve the desired output you should identify the maximum value and the corresponding column name and identify the second maximum value by excluding the maximum column found in step... something like this:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, array, sort_array, struct

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Max and Second Max Columns") \
    .getOrCreate()

# Sample data
data = [
    (1, 12.5, 5.5, 9.5),
    (2, 3.0, 14.0, 6.7)
]

# Define schema
columns = ["id", "trx_holiday", "trx_takeout", "trx_pet"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Step 1: Identify max value and corresponding column
df = df.withColumn(
    "sorted_array",
    sort_array(array(
        struct(col("trx_holiday"), col("trx_holiday").alias("trx_holiday")),
        struct(col("trx_takeout"), col("trx_takeout").alias("trx_takeout")),
        struct(col("trx_pet"), col("trx_pet").alias("trx_pet"))
    ), asc=False)
).withColumn("max_value", col("sorted_array")[0][0]) \
 .withColumn("MAX", col("sorted_array")[0][1]) \
 .withColumn("max_value_2", col("sorted_array")[1][0]) \
 .withColumn("MAX_2", col("sorted_array")[1][1]) \
 .drop("sorted_array")

# Show the result
df.show()
2024-07-04
Hernando Abella

Solution

 0

You can extend your current approach as follows:

from pyspark.sql import SparkSession
from pyspark.sql import functions as psf
from pyspark.sql.functions import col, when

spark = SparkSession.builder.getOrCreate()

data = [
    (1, 12.5, 5.5, 9.5, 12.5, 'trx_holiday'),
    (2, 3.0, 14.0, 6.7, 14.0, 'trx_takeout')
]

schema = ["id", "trx_holiday", "trx_takeout", "trx_pet", "max_value", "MAX"]

df = spark.createDataFrame(data, schema=schema)

df = df.withColumn(
    "max_value_2", 
    psf.greatest(*[when(col(c) < col("max_value"), col(c)) for c in df.columns[1:4]])
    ).withColumn(
    "MAX_2", 
    psf.coalesce(*[when(col(c) == col("max_value_2"), c) for c in df.columns[1:4]])
)

df.show()

# +---+-----------+-----------+-------+---------+-----------+-----------+-------+
# | id|trx_holiday|trx_takeout|trx_pet|max_value|        MAX|max_value_2|  MAX_2|
# +---+-----------+-----------+-------+---------+-----------+-----------+-------+
# |  1|       12.5|        5.5|    9.5|     12.5|trx_holiday|        9.5|trx_pet|
# |  2|        3.0|       14.0|    6.7|     14.0|trx_takeout|        6.7|trx_pet|
# +---+-----------+-----------+-------+---------+-----------+-----------+-------+

As you already have the maximum value in max_value column - you can use it to get the values less than itself and apply the greatest function over those values to fetch the "second greatest" value overall. Here's the relevant part of doing this:

psf.greatest(*[when(col(c) < col("max_value"), col(c)) for c in df.columns[1:4]])

Here df.columns[1:4] means we are taking all but the columns - id, max_value, MAX - as they are irrelevant for this comparison.

Finally, calculating the MAX_2 column is a trivial comparison and coalesce to fetch the column name as follows:

psf.coalesce(*[when(col(c) == col("max_value_2"), c) for c in df.columns[1:4]])

NOTE: This is based on the assumption that the values are distinct row-wise over trx_* columns. The coalesce function only takes the first non-null value.

2024-07-08
Vikas Sharma

Solution

 0

Try something like below (unpivot and then join)

from pyspark.sql import Window
from pyspark.sql.functions import *
#create the index/id/pkey column
df.show()
df=df.withColumn('id',monotonically_increasing_id())
#unpivot/melt the dataframe
stack_expr="stack(4,'trx_holiday',trx_holiday,'trx_takeout',trx_takeout,'trx_pet',trx_pet,'max_value',max_value) as (txn_type,amount)"
df1=df.selectExpr("id",stack_expr)
#apply window
window_spec=Window.partitionBy('id').orderBy(desc('amount'))
df1=df1.withColumn('dense_rank',dense_rank().over(window_spec))
df1=df1.filter(df1.dense_rank==2).select('id','txn_type')
#Now join the df1 with df
df=df.join(df1,on=(df.id==df1.id),how='left')
df.show()
2024-07-26
Rakesh Kushwaha