Spark Select The First Row Of Each Group (PySpark) – Check 3 Cool Cases Of Usage

Spark Select The First Row Of Each Group (PySpark) - Check 3 Cool Cases Of Usage
Share this post and Earn Free Points!

In this post I will show you how to by Spark" Select The First Row Of Each Group! It’s very common task when you work with data. The power of Spark API is that the same code works with Scala and Python" so the same snippets of code will work with PySpark" as well.


Spark Free Tutorials

This post is a part of Spark Free Tutorial. Check the rest of the Spark" tutorials which uou can find on the right side bar of this page! Stay tuned!


Introduction

To select the first row of each group in a DataFrame in Apache Spark", you can use the first function in the Window class. This function allows you to specify a window specification, which defines how the rows are divided into groups, and then returns the first row of each group.

Here’s an example of how to use the first function to select the first row of each group in a DataFrame":

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = Seq(
  (1, "Alice", 10),
  (2, "Alice", 20),
  (3, "Bob", 30),
  (4, "Bob", 40),
  (5, "Charlie", 50)
).toDF("id", "name", "score")

val windowSpec = Window.partitionBy("name").orderBy("score")
val firstRowDf = df.withColumn("first_row", first("id").over(windowSpec))

firstRowDf.show()

This will output the following DataFrame":

+---+-------+-----+---------+
| id|   name|score|first_row|
+---+-------+-----+---------+
|  1|  Alice|   10|        1|
|  2|  Alice|   20|        1|
|  3|    Bob|   30|        3|
|  4|    Bob|   40|        3|
|  5|Charlie|   50|        5|
+---+-------+-----+---------+

Note that the first function returns the first row of each group based on the order specified by the orderBy clause in the window specification. In this case, the rows are ordered by score in ascending order, so the first row of each group is the row with the lowest score. If you want to select the first row of each group based on a different order, you can specify a different orderBy clause in the window specification.

Spark Select The First Row Of Each Group – Car Data

Let’s consider the case when you have to analyse the cars data. You have information about cars like:

  • Car
  • Cylinders
  • Displacement
  • Horsepower
  • Weight
  • Acceleration
  • Model
  • Origin

which look in the following way:

The file with full data you can find on my GitLab repository!

Spark Window Functions

To select the rows in by specific group (partitions) and order by some columns / values we can use Spark built-it function which is called Window. The Window object allow us to provide the columns which we will use to group data and then order the values within such groups.

To do this we must use the partitionBy method in Window object. Please find the Python" implementation of Spark row_number function.

@since(1.6)
def row_number():
    """
    Window function: returns a sequential number starting at 1 within a window partition.
    """
    return _invoke_function("row_number")

Read The Car Data

Before we will use the function data we must read the Cars data from CSV file to Spark DataFrame".

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("BigData-ETL.com")
    .getOrCreate()

  val carsData = spark.read
    .option("delimiter", ";")
    .option("header", "true")
    .option("inferSchema", true)
    .csv("src/main/resources/cars.csv")

Case #1 – Select The Car Which Has The Highest Horsepower By Origin

In the case #1 we want to choose a car that dominates in the value of horsepower by origin. As you can see we have multiple origins like:

  • US
  • Europe
  • Japan

When we look into data we see that to select these cars we must select the “car”, “horsepower”, “origin” columns and then with Spark row_number and Spark Window functions select only these records for which in group of origin the horsepower is the highest one.

To filter only the one records we must use the .where(col(“row”) === 1) syntax. If you will not do this you will get all numbered records within each group.

  val carWithTheMostHorsepowerPerOrigin = carsData.select("car", "horsepower", "origin")
    .withColumn(
      "row",
      row_number().over(
        Window.partitionBy("origin").orderBy(col("horsepower").desc)
      )
    )
    .where(col("row") === 1)
    .drop("row")

  carWithTheMostHorsepowerPerOrigin.show()

The result of above code is the following response:

+------------------+----------+------+
|               car|horsepower|origin|
+------------------+----------+------+
|     Peugeot 604sl|     133.0|Europe|
|     Datsun 280-ZX|     132.0| Japan|
|Pontiac Grand Prix|     230.0|    US|
+------------------+----------+------+

Case #2 – Select The Car Which Has The Highest Amount of Cylinders With The Lowest Weight By Origin

In case #2 we will pick the vehicle with the most cylinders and the lightest weight according to origin.

  val carWithTheMostCylindersAndTheLowestWeightByOrigin = carsData.select("car", "cylinders", "weight" , "origin")
    .withColumn(
      "row",
      row_number().over(
        Window.partitionBy("origin").orderBy(col("cylinders").desc, col("cylinders").asc)
      )
    )
    .where(col("row") === 1)
    .drop("row")

  carWithTheMostCylindersAndTheLowestWeightByOrigin.show(false)

The result is:

+-------------------------+---------+------+------+
|car                      |cylinders|weight|origin|
+-------------------------+---------+------+------+
|Mercedes-Benz 280s       |6        |3820  |Europe|
|Toyota Mark II           |6        |2807  |Japan |
|Chevrolet Chevelle Malibu|8        |3504  |US    |
+-------------------------+---------+------+------+

Case #3 – Select The Car Which Has The Highest Horsepower By Origin And Cylinders Amount

In the last case #3 we want to choose a car that dominates in the value of horsepower by origin and cylinders amount. As I mentioned above we have three origins: US, Europe and Japan. We need to execute the Spark window partitionBy multiple columns.

The available cylinders amount are: 3", 4, 5, 6 and 8.

Let’s run the snippet of code ot get these values!

  val carWithTheMostHorsepowerByOriginAndCylinders = carsData.select("car", "horsepower", "origin", "cylinders")
    .withColumn(
      "row",
      row_number().over(
        Window.partitionBy("origin", "cylinders").orderBy(col("horsepower").desc)
      )
    )
    .where(col("row") === 1)
    .drop("row")

  carWithTheMostHorsepowerByOriginAndCylinders.show(false)

The output is:

+-------------------------------+----------+------+---------+
|car                            |horsepower|origin|cylinders|
+-------------------------------+----------+------+---------+
|Citroen DS-21 Pallas           |115.0     |Europe|4        |
|Audi 5000                      |103.0     |Europe|5        |
|Peugeot 604sl                  |133.0     |Europe|6        |
|Mazda RX-4                     |110.0     |Japan |3        |
|Datsun 200SX                   |100.0     |Japan |4        |
|Datsun 280-ZX                  |132.0     |Japan |6        |
|Plymouth Sapporo               |105.0     |US    |4        |
|Buick Regal Sport Coupe (turbo)|165.0     |US    |6        |
|Pontiac Grand Prix             |230.0     |US    |8        |
+-------------------------------+----------+------+---------+

Code On GitLab

The whole project you can find on my GItLab repository!

Summary

In this tutorial you have learned how to use the Spark Window and row_number function. We read the cars data from CSV to DataFrame" to then to some data analysis on them.

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

How useful was this post?

Click on a star to rate it!

Average rating 5 / 5. Vote count: 1

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?