You are currently viewing Spark Where And Filter DataFrame Or DataSet – Check 5 Easy And Complex Examples!
Could You Please Share This Post? I Appreciate It And Thank YOU! :) Have A Nice Day!
5
(2)

In this tutorial we will dive into the topic, which is called: Spark Where And Filter DataFrame Or DataSet. filter() or where() methods are used when you want to filter some rows from DataFrame or DataSet based on some condition.

Spark Where And Filter DataFrame Or DataSet

When we work with data one of the most common operation on data is to filter some rows from tables and then execute additional calculation only on the subset of data.

In Spark to filter data we can use two methods which are equivalent each other. In other words – the where() is an alias for filter() function.

To understand it we must deep into the Spark code and we see that when we call the where method the filter method is executed.

We have these two method to choice do to fact that for people which the SQL is the origin the where() method is more natural, but for people which come from programming world the filter() method is obvious.

def where(condition: Column): Dataset[T] = filter(condition)

As usual we will focus on the examples of Spark code and Scala programming language.

Spark filter() Function On DataFrame Or DataSet

As I described earlier the where() function is an alias for filter() method, so we will only focus on the syntaxes for filter() function. Spark comes to us with 4 possible syntaxes:

// Filters rows using the given condition.
def filter(condition: Column): Dataset[T] 

// Filters rows using the given SQL expression.
def filter(conditionExpr: String): Dataset[T]

// Returns a new DataFrame or Dataset that only contains elements where func returns true.
def filter(func: T => Boolean): Dataset[T] 
def filter(func: FilterFunction[T]): Dataset[T] 

Example Data

As usual at the beginning we will create the input DataFrame which we will use in the next examples.

  val carsData = Seq(
    ("Ford Torino", 140, 3449, "US"),
    ("Chevrolet Monte Carlo", 150, 3761, "US"),
    ("BMW 2002", 113, 2234, "Europe"),
    ("Volkswagen Model 111", 97, 1834, "Europe"),
    ("Datsun 510 (sw)", 97, 2288, "Japan")
  )
  val columns = Seq("car", "horsepower", "weight", "origin")
  val carsDf = carsData.toDF(columns: _*)

The carsDf schema and data is:

root
 |-- car: string (nullable = true)
 |-- horsepower: integer (nullable = false)
 |-- weight: integer (nullable = false)
 |-- origin: string (nullable = true)

+---------------------+----------+------+------+
|car                  |horsepower|weight|origin|
+---------------------+----------+------+------+
|Ford Torino          |140       |3449  |US    |
|Chevrolet Monte Carlo|150       |3761  |US    |
|BMW 2002             |113       |2234  |Europe|
|Volkswagen Model 111 |97        |1834  |Europe|
|Datsun 510 (sw)      |97        |2288  |Japan |
+---------------------+----------+------+------+

Spark filter() DataFrame Using Column Condition

In the first example we will focus on filtering data by condition: Column. Scroll a little bit up to see the first syntax.

In Spark we can refer to DataFrame column in 4 ways:

Please find the 8 equivalent syntaxes which give the same result. 4 for filter() and 4 for where() function:

  carsDf.filter(col("origin") === lit("US")).show()
  carsDf.filter($"origin" === lit("US")).show()
  carsDf.filter(carsDf("origin") === lit("US")).show()
  carsDf.filter('origin === lit("US")).show()

  carsDf.where(col("origin") === lit("US")).show()
  carsDf.where($"origin" === lit("US")).show()
  carsDf.where(carsDf("origin") === lit("US")).show()
  carsDf.where('origin === lit("US")).show()

Spark Filter DataFrame Rows Using The Given SQL Expression

The previous example was more for people which comes from programming world. For people which the SQL is more natural way to operate on data the second method will more self-explanatory.

In this example we will filter data by SQL expression which will be passed to filter() or where() method.

  carsDf.filter("origin == 'US'").show()
  carsDf.where("origin == 'US'").show()

Which prints the filtered two rows from our test DataFrame:

+---------------------+----------+------+------+
|car                  |horsepower|weight|origin|
+---------------------+----------+------+------+
|Ford Torino          |140       |3449  |US    |
|Chevrolet Monte Carlo|150       |3761  |US    |
+---------------------+----------+------+------+

Spark Filter DataFrame By Multiple Column Conditions

Till now have learned the basics. Now we can collect all the past knowledge and go step further. In this example we will try to filter DataFrame or DataSet by multiple column conditions. In Spark we can accomplish it easily by operators && (and), || (or) .

Check the following simple examples:

  carsDf
    .filter(col("origin") === lit("US") && col("horsepower") >= lit(150))
    .show()

  carsDf
    .where(col("origin") === lit("US") && col("horsepower") >= lit(150))
    .show()

Spark Filter Nested Struct In DataFrame Or DataSet

Now we will go through more complex example where we need to filter nested Struct in DataFrame or DataSet. Before we will start we need to create the test data which will be used in the code snippets.

  val arrayStructData = Seq(
    Row(Row("Denver", "New York"), List("5th Avenue", "7th Avenue"), "US"),
    Row(Row("Los Angeles", "San Francisco"), List("Street1", "Street2"), "US"),
    Row(
      Row("Paris", "Warsaw"),
      List("Al. Jerozolimskie", "S2", "Street3"),
      "EU"
    )
  )

  val arrayStructureSchema = new StructType()
    .add(
      "country",
      new StructType()
        .add("city1", StringType)
        .add("city2", StringType)
    )
    .add("streets", ArrayType(StringType))
    .add("country_code", StringType);

  val cityData = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructData),
    arrayStructureSchema
  )
  cityData.printSchema()
  cityData.show()

Which gives results:

root
 |-- country: struct (nullable = true)
 |    |-- city1: string (nullable = true)
 |    |-- city2: string (nullable = true)
 |-- streets: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- country_code: string (nullable = true)

+--------------------+--------------------+------------+
|             country|             streets|country_code|
+--------------------+--------------------+------------+
|  {Denver, New York}|[5th Avenue, 7th ...|          US|
|{Los Angeles, San...|  [Street1, Street2]|          US|
|     {Paris, Warsaw}|[Al. Jerozolimski...|          EU|
+--------------------+--------------------+------------+

To filter nested columns in DataFrame use the following syntax to filter struct key in DataSet or DataFrame:

  cityData
    .filter(cityData("country.city2") === "Warsaw")
    .show(false)

And the outcome is:

+---------------+--------------------------------+------------+
|country        |streets                         |country_code|
+---------------+--------------------------------+------------+
|{Paris, Warsaw}|[Al. Jerozolimskie, S2, Street3]|EU          |
+---------------+--------------------------------+------------+

Spark Filter Array Column In DataFrame Or DataSet

Using Spark you can easily filter array data which are stored in some columns as well. Since Spark 1.5.0 version Spark provides array_contains() method which allows us to filter array column in DataFrame or DataSet and returns null if the array is null, true if the array contains value, and false otherwise.

  def array_contains(column: Column, value: Any): Column = withExpr {
    ArrayContains(column.expr, lit(value).expr)
  }

In the next example we reuse the previous cityData DataFrame which has column streets as Array.

  cityData
    .filter(array_contains(col("streets"), "7th Avenue"))
    .show(false)

Which leads to the following results:

+------------------+------------------------+------------+
|country           |streets                 |country_code|
+------------------+------------------------+------------+
|{Denver, New York}|[5th Avenue, 7th Avenue]|US          |
+------------------+------------------------+------------+

Full Code

package com.bigdataetl.sparktutorial.sql

import org.apache.spark.sql.functions.{array_contains, col, lit}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object SparkFilterAndWhere extends App {

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("BigData-ETL.com")
    .getOrCreate()

  import spark.implicits._

  val carsData = Seq(
    ("Ford Torino", 140, 3449, "US"),
    ("Chevrolet Monte Carlo", 150, 3761, "US"),
    ("BMW 2002", 113, 2234, "Europe"),
    ("Volkswagen Model 111", 97, 1834, "Europe"),
    ("Datsun 510 (sw)", 97, 2288, "Japan")
  )
  val columns = Seq("car", "horsepower", "weight", "origin")
  val carsDf = carsData.toDF(columns: _*)

  carsDf.printSchema()
  carsDf.show(false)

  println("Spark filter() DataFrame Using Column Condition")
  carsDf.filter(col("origin") === lit("US")).show()
  carsDf.filter($"origin" === lit("US")).show()
  carsDf.filter(carsDf("origin") === lit("US")).show()
  carsDf.filter('origin === lit("US")).show()

  carsDf.where(col("origin") === lit("US")).show()
  carsDf.where($"origin" === lit("US")).show()
  carsDf.where(carsDf("origin") === lit("US")).show()
  carsDf.where('origin === lit("US")).show()

  println("Spark Filter DataFrame Rows Using The Given SQL Expression")
  carsDf.filter("origin == 'US'").show()
  carsDf.where("origin == 'US'").show(false)

  println("Spark Filter DataFrame By Multiple Column Conditions")
  carsDf
    .filter(col("origin") === lit("US") && col("horsepower") >= lit(150))
    .show()

  carsDf
    .where(col("origin") === lit("US") && col("horsepower") >= lit(150))
    .show()

  println("Spark Filter Nested Struct In DataFrame Or DataSet")

  val arrayStructData = Seq(
    Row(Row("Denver", "New York"), List("5th Avenue", "7th Avenue"), "US"),
    Row(Row("Los Angeles", "San Francisco"), List("Street1", "Street2"), "US"),
    Row(
      Row("Paris", "Warsaw"),
      List("Al. Jerozolimskie", "S2", "Street3"),
      "EU"
    )
  )

  val arrayStructureSchema = new StructType()
    .add(
      "country",
      new StructType()
        .add("city1", StringType)
        .add("city2", StringType)
    )
    .add("streets", ArrayType(StringType))
    .add("country_code", StringType);

  val cityData = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructData),
    arrayStructureSchema
  )
  cityData.printSchema()
  cityData.show(false)

  cityData
    .filter(cityData("country.city2") === "Warsaw")
    .show(false)

  println("Spark Filter Array Column In DataFrame Or DataSet")
  cityData
    .filter(array_contains(col("streets"), "7th Avenue"))
    .show(false)
}

GitLab Repository

As usual, please find the full code on our GitLab repository!

Summary

Thank you for visiting my site. I hope that after reading this post you will learn how to filter DataFrame and the various Spark filter()syntaxes now look more familiar then before!

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

BigData-ETL: image 7YOU MIGHT ALSO LIKE

How useful was this post?

Click on a star to rate it!

Average rating 5 / 5. Vote count: 2

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?