Question

Collect values as dictionary in parent column using Pyspark

I have code and data like below:

df_renamed = df.withColumnRenamed("id","steps.id").withColumnRenamed("status_1","steps.status").withColumnRenamed("severity","steps.error.severity")

df_renamed.show(truncate=False)

+----------+-------+------+-----------------------+------------+--------------------+
|apiVersion|expired|status|steps.id               |steps.status|steps.error.severity|
+----------+-------+------+-----------------------+------------+--------------------+
|2         |false  |200   |mexican-curp-validation|200         |null                |
+----------+-------+------+-----------------------+------------+--------------------+  

Now I want to transform this data as below:

+----------+-------+------+-----------------------+------------+--------------------+
|apiVersion|expired|status|steps                                                    |
+----------+-------+------+-----------------------+------------+--------------------+
|2         |false  |200   |{"id":"mexican-curp-validation", "status":200 ,"error":{"severity":null}}               |
+----------+-------+------+-----------------------+------------+--------------------+  

where it can be seen that based on the dot notation of the column names, JSON struct is formed in the data. For this reason, I used below code:

cols_list = [name for name in df_renamed.columns if "." in name]
df_new = df_renamed.withColumn("steps",F.to_json(F.struct(*cols_list)))
df_new.show()   

But it gives below error even though the column is present:

 df_new = df_renamed.withColumn("steps",F.to_json(F.struct(*cols_list)))
  File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 3036, in withColumn
    return DataFrame(self._jdf.withColumn(colName, col._jc), self.sparkSession)
  File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Column 'steps.id' does not exist. Did you mean one of the following? [steps.id, expired, status, steps.status, apiVersion, steps.error.severity];
'Project [apiVersion#17, expired#18, status#19, steps.id#29, steps.status#37, steps.error.severity#44, to_json(struct(id, 'steps.id, status, 'steps.status, severity, 'steps.error.severity), Some(GMT+05:30)) AS steps#82]
+- Project [apiVersion#17, expired#18, status#19, steps.id#29, steps.status#37, severity#22 AS steps.error.severity#44]
   +- Project [apiVersion#17, expired#18, status#19, steps.id#29, status_1#21 AS steps.status#37, severity#22]
      +- Project [apiVersion#17, expired#18, status#19, id#20 AS steps.id#29, status_1#21, severity#22]
         +- Relation [apiVersion#17,expired#18,status#19,id#20,status_1#21,severity#22] csv

Where am I going wrong? Any help is much appreciated.

 3  63  3
1 Jan 1970

Solution

 2

Special characters in column names can be escaped using back ticks in the name"`" as follows

cols_list = ["`" + name + "`" for name in df1.columns if "." in name]

So that when you refer to it inside the to_json function then it will be "step.id" which successfully escapes the "." character

Be careful when dealing with column names that contain any special characters and I would recommend to avoid it if possible.

2024-07-08
Ahmed Nader

Solution

 1

You can get the same output without renaming the columns using struct and to_json:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

data = [(2, False, 200, "mexican-curp-validation", 200, None)]
schema = "apiVersion int, expired string, status int, id string, status_1 int, severity string"
df = spark.createDataFrame(data, schema)
df.show(truncate=False)

# +----------+-------+------+-----------------------+--------+--------+
# |apiVersion|expired|status|id                     |status_1|severity|
# +----------+-------+------+-----------------------+--------+--------+
# |2         |false  |200   |mexican-curp-validation|200     |NULL    |
# +----------+-------+------+-----------------------+--------+--------+

cols_list = F.struct(
    F.col("id"),
    F.col("status_1").alias("status"),
    F.struct(F.col("severity")).alias("error")
)
df_transformed = df.withColumn("steps", F.to_json(cols_list)).drop("id", "status_1", "severity")
df_transformed.printSchema()
df_transformed.show(truncate=False)

# root
#  |-- apiVersion: integer (nullable = true)
#  |-- expired: string (nullable = true)
#  |-- status: integer (nullable = true)
#  |-- steps: string (nullable = true)

# +----------+-------+------+--------------------------------------------------------+
# |apiVersion|expired|status|steps                                                   |
# +----------+-------+------+--------------------------------------------------------+
# |2         |false  |200   |{"id":"mexican-curp-validation","status":200,"error":{}}|
# +----------+-------+------+--------------------------------------------------------+

The reason you are not seeing severity is because of None - to_json won't include the nulls.

Here's the confirmation that the schema is intact - just convert the None to string for testing:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

data = [(2, False, 200, "mexican-curp-validation", 200, "None")]
schema = "apiVersion int, expired string, status int, id string, status_1 int, severity string"
df = spark.createDataFrame(data, schema)

cols_list = F.struct(
    F.col("id"),
    F.col("status_1").alias("status"),
    F.struct(F.col("severity")).alias("error")
)
df_transformed = df.withColumn("steps", F.to_json(cols_list)).drop("id", "status_1", "severity")

df_transformed.show(truncate=False)

# +----------+-------+------+-------------------------------------------------------------------------+
# |apiVersion|expired|status|steps                                                                    |
# +----------+-------+------+-------------------------------------------------------------------------+
# |2         |false  |200   |{"id":"mexican-curp-validation","status":200,"error":{"severity":"None"}}|
# +----------+-------+------+-------------------------------------------------------------------------+
2024-07-09
Vikas Sharma