PySpark / Spark Pivot And UnPivot DataFrame Or DataSet – Let’s Learn This 2 Cool Function!

PySpark / Spark Pivot And UnPivot DataFrame Or DataSet
Share this post and Earn Free Points!

In this post I will show you how to using Spark Pivot And UnPivot DataFrame Or DataSet!

Introduction

Spark Pivot SQL

The PIVOT clause in SQL allows you to rotate rows into columns in a database, thereby converting data from a wide to a long format. The PIVOT clause is combined with an aggregate function (e.g., SUM, COUNT, AVG) to summarise the data and construct new columns depending on the values in one or more columns.

It’s worth noting that not all databases support the PIVOT clause. Some databases have similar functionality, such as the GROUP BY and CASE statements, or the CROSSTAB function in PostgreSQL.

Spark UnPivot SQL

In SQL, the UNPIVOT clause allows you to rotate columns into rows in a table, effectively transforming the data from a long format to a wide format. The UNPIVOT clause is used to “unpivot” the data, so that each row contains one value and one column for the attribute that corresponds to that value.

Spark Pivot And UnPivot DataFrame Or DataSet

To reshape a DataFrame or DataSet in PySpark and Spark, utilise the pivot and unpivot functions.

The pivot technique converts a DataFrame or DataSet from a wide to a long format by using one or more columns as the identifier columns, one column as the value column, and one or more columns as the pivot columns. It takes three arguments:

  • pivot_col: the column to pivot on
  • values: the column(s) to aggregate
  • aggfunc: the aggregation function to use (e.g. sum, count, avg)

Here’s an example of using the pivot method to reshape a DataFrame in PySpark:

from pyspark.sql.functions import *
df = spark.createDataFrame([(1, "a", "x", 5), (2, "b", "x", 3), (3, "c", "y", 4)], ["id", "name", "category", "value"])

pivot_df = df.groupBy("id", "name").pivot("category").sum("value")

The unpivot method allows you to transform a DataFrame or DataSet from a long format to a wide format, it takes two arguments:

  • cols: a list of columns to be unpivoted
  • values: the column to be used as values

Here’s an example of using the unpivot method to reshape a DataFrame in PySpark:

df = spark.createDataFrame([(1, "a", "x", 5), (1, "a", "y", 3), (2, "b", "x", 4)], ["id", "name", "category", "value"])

unpivot_df = df.selectExpr("id", "name", "stack(" + str(len(df.columns) - 2) + ", 'category', category, 'value', value) as (category, value)")

The pivot and unpivot methods are not built-in Spark operations, but are accessible as part of the spark-daria library.

To summarise, the pivot and unpivot methods enable you to transform a DataFrame or DataSet from a wide to a long format and vice versa. The pivot method aggregates data by taking three arguments: pivot col, values, and aggfunc. The unpivot function, which accepts two inputs, cols and values, allows you to stack columns into a single column. Both approaches are included in the spark-daria library.

Spark Pivot And UnPivot Functions – More Examples

Here are more examples of using the pivot and unpivot methods for a Spark DataFrame with the Scala API. Example of using pivot with multiple value columns:

val df = Seq(
    (1, "a", "x", 5, 10),
    (2, "b", "x", 3, 20),
    (3, "c", "y", 4, 15)
).toDF("id", "name", "category", "value1", "value2")

val pivotDf = df.groupBy("id", "name").pivot("category", Seq("x", "y"))
    .agg(sum("value1"), sum("value2"))

pivotDf.show()

This will create a new DataFrame with columns for the sum of value1 and value2 for each category and id and name. The output will look like this:

+---+----+-------------+-------------+-------------+-------------+
| id|name|x_sum(value1)|x_sum(value2)|y_sum(value1)|y_sum(value2)|
+---+----+-------------+-------------+-------------+-------------+
|  3|   c|         null|         null|            4|           15|
|  2|   b|            3|           20|         null|         null|
|  1|   a|            5|           10|         null|         null|
+---+----+-------------+-------------+-------------+-------------+

Example of using unpivot with multiple columns:

val df = Seq(
    (1, "a", 5, 10, 15),
    (2, "b", 3, 20, 25),
    (3, "c", 4, 15, 35)
).toDF("id", "name", "value1", "value2", "value3")

val unpivotDf = df.selectExpr("id", "name", "stack(3, 'value1', value1, 'value2', value2, 'value3', value3) as (attribute, value)")

unpivotDf.show()

This will create a new DataFrame with one row per id and name and two columns, attribute and value, representing the values of the original columns. The output will look like this:

+---+----+---------+-----+
| id|name|attribute|value|
+---+----+---------+-----+
|  1|   a|   value1|    5|
|  1|   a|   value2|   10|
|  1|   a|   value3|   15|
|  2|   b|   value1|    3|
|  2|   b|   value2|   20|
|  2|   b|   value3|   25|
|  3|   c|   value1|    4|
|  3|   c|   value2|   15|
|  3|   c|   value3|   35|
+---+----+---------+-----+

Code On GitLab

As usual you can find full code on our GitLab!

Summary

The pivot and unpivot functions in PySpark and Spark allow you to reshape a DataFrame or DataSet. The pivot technique converts a DataFrame or DataSet from a wide format to a long format by using one or more columns as identifier columns, one column as the value column, and one or more columns as pivot columns. It accepts three parameters: pivot col, values, and aggfunc.

The unpivot method, on the other hand, converts a DataFrame or DataSet from a long to a wide format; it accepts two inputs, cols and values.

Both techniques are not Spark functions, but are included in the spark-daria package.

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

How useful was this post?

Click on a star to rate it!

Average rating 5 / 5. Vote count: 1

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?