In this post I will show you how to by Spark" Select The First Row Of Each Group! It’s very common task when you work with data. The power of Spark API is that the same code works with Scala and Python" so the same snippets of code will work with PySpark" as well.
Spark Free Tutorials
This post is a part of Spark Free Tutorial. Check the rest of the Spark" tutorials which uou can find on the right side bar of this page! Stay tuned!
Table of Contents
Introduction
To select the first row of each group in a DataFrame in Apache Spark", you can use the first
function in the Window
class. This function allows you to specify a window specification, which defines how the rows are divided into groups, and then returns the first row of each group.
Here’s an example of how to use the first
function to select the first row of each group in a DataFrame":
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val df = Seq( (1, "Alice", 10), (2, "Alice", 20), (3, "Bob", 30), (4, "Bob", 40), (5, "Charlie", 50) ).toDF("id", "name", "score") val windowSpec = Window.partitionBy("name").orderBy("score") val firstRowDf = df.withColumn("first_row", first("id").over(windowSpec)) firstRowDf.show()
This will output the following DataFrame":
+---+-------+-----+---------+ | id| name|score|first_row| +---+-------+-----+---------+ | 1| Alice| 10| 1| | 2| Alice| 20| 1| | 3| Bob| 30| 3| | 4| Bob| 40| 3| | 5|Charlie| 50| 5| +---+-------+-----+---------+
Note that the first
function returns the first row of each group based on the order specified by the orderBy
clause in the window specification. In this case, the rows are ordered by score
in ascending order, so the first row of each group is the row with the lowest score. If you want to select the first row of each group based on a different order, you can specify a different orderBy
clause in the window specification.
Spark Select The First Row Of Each Group – Car Data
Let’s consider the case when you have to analyse the cars data. You have information about cars like:
- Car
- Cylinders
- Displacement
- Horsepower
- Weight
- Acceleration
- Model
- Origin
which look in the following way:
The file with full data you can find on my GitLab repository!
Spark Window Functions
To select the rows in by specific group (partitions) and order by some columns / values we can use Spark built-it function which is called Window. The Window object allow us to provide the columns which we will use to group data and then order the values within such groups.
To do this we must use the partitionBy method in Window object. Please find the Python" implementation of Spark row_number function.
@since(1.6) def row_number(): """ Window function: returns a sequential number starting at 1 within a window partition. """ return _invoke_function("row_number")
Read The Car Data
Before we will use the function data we must read the Cars data from CSV file to Spark DataFrame".
val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("BigData-ETL.com") .getOrCreate() val carsData = spark.read .option("delimiter", ";") .option("header", "true") .option("inferSchema", true) .csv("src/main/resources/cars.csv")
Case #1 – Select The Car Which Has The Highest Horsepower By Origin
In the case #1 we want to choose a car that dominates in the value of horsepower by origin. As you can see we have multiple origins like:
- US
- Europe
- Japan
When we look into data we see that to select these cars we must select the “car”, “horsepower”, “origin” columns and then with Spark row_number and Spark Window functions select only these records for which in group of origin the horsepower is the highest one.
To filter only the one records we must use the .where(col(“row”) === 1) syntax. If you will not do this you will get all numbered records within each group.
val carWithTheMostHorsepowerPerOrigin = carsData.select("car", "horsepower", "origin") .withColumn( "row", row_number().over( Window.partitionBy("origin").orderBy(col("horsepower").desc) ) ) .where(col("row") === 1) .drop("row") carWithTheMostHorsepowerPerOrigin.show()
The result of above code is the following response:
+------------------+----------+------+ | car|horsepower|origin| +------------------+----------+------+ | Peugeot 604sl| 133.0|Europe| | Datsun 280-ZX| 132.0| Japan| |Pontiac Grand Prix| 230.0| US| +------------------+----------+------+
Case #2 – Select The Car Which Has The Highest Amount of Cylinders With The Lowest Weight By Origin
In case #2 we will pick the vehicle with the most cylinders and the lightest weight according to origin.
val carWithTheMostCylindersAndTheLowestWeightByOrigin = carsData.select("car", "cylinders", "weight" , "origin") .withColumn( "row", row_number().over( Window.partitionBy("origin").orderBy(col("cylinders").desc, col("cylinders").asc) ) ) .where(col("row") === 1) .drop("row") carWithTheMostCylindersAndTheLowestWeightByOrigin.show(false)
The result is:
+-------------------------+---------+------+------+ |car |cylinders|weight|origin| +-------------------------+---------+------+------+ |Mercedes-Benz 280s |6 |3820 |Europe| |Toyota Mark II |6 |2807 |Japan | |Chevrolet Chevelle Malibu|8 |3504 |US | +-------------------------+---------+------+------+
Case #3 – Select The Car Which Has The Highest Horsepower By Origin And Cylinders Amount
In the last case #3 we want to choose a car that dominates in the value of horsepower by origin and cylinders amount. As I mentioned above we have three origins: US, Europe and Japan. We need to execute the Spark window partitionBy multiple columns.
The available cylinders amount are: 3", 4, 5, 6 and 8.
Let’s run the snippet of code ot get these values!
val carWithTheMostHorsepowerByOriginAndCylinders = carsData.select("car", "horsepower", "origin", "cylinders") .withColumn( "row", row_number().over( Window.partitionBy("origin", "cylinders").orderBy(col("horsepower").desc) ) ) .where(col("row") === 1) .drop("row") carWithTheMostHorsepowerByOriginAndCylinders.show(false)
The output is:
+-------------------------------+----------+------+---------+ |car |horsepower|origin|cylinders| +-------------------------------+----------+------+---------+ |Citroen DS-21 Pallas |115.0 |Europe|4 | |Audi 5000 |103.0 |Europe|5 | |Peugeot 604sl |133.0 |Europe|6 | |Mazda RX-4 |110.0 |Japan |3 | |Datsun 200SX |100.0 |Japan |4 | |Datsun 280-ZX |132.0 |Japan |6 | |Plymouth Sapporo |105.0 |US |4 | |Buick Regal Sport Coupe (turbo)|165.0 |US |6 | |Pontiac Grand Prix |230.0 |US |8 | +-------------------------------+----------+------+---------+
Code On GitLab
The whole project you can find on my GItLab repository!
Summary
In this tutorial you have learned how to use the Spark Window and row_number function. We read the cars data from CSV to DataFrame" to then to some data analysis on them.
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!