In this tutorial" we will dive into the topic, which is called: Spark Where And Filter DataFrame" Or DataSet". filter() or where() methods are used when you want to filter some rows from DataFrame or DataSet" based on some condition.
Table of Contents
Introduction
Apache Spark
Apache Spark" is a distributed data processing platform that is designed to be fast, easy to use, and flexible. It is widely used for a variety of data processing tasks, including batch processing, stream processing, machine learning", and interactive data analytics.
Some of the key features and characteristics of Apache Spark" include:
- Speed: Spark is designed to be much faster than other data processing platforms, thanks to its ability to cache data in memory and its use of a distributed processing model. This makes it well-suited for tasks that require fast processing of large volumes of data.
- Ease of use: Spark includes a simple and expressive programming" model, as well as a wide range of high-level APIs for common data processing tasks. This makes it easy for developers to build and deploy data processing pipelines without having to worry about the low-level details of distributed computing.
- Flexibility: Spark is designed to be highly flexible and can be used for a wide range of data processing tasks, including batch processing, stream processing, machine learning", and interactive data analytics. It can be run on a wide range of platforms, including standalone clusters, cloud platforms, and Hadoop" clusters.
- Scalability: Spark can scale out to large clusters of machines, making it well-suited for processing very large volumes of data. It also includes features for fault tolerance and recovery, which help to ensure that data processing jobs can be completed even in the event of machine or network failures.
Overall, Apache Spark" is a powerful and widely-used data processing platform that is well-suited for a wide range of data processing tasks.
Spark DataFrame And DataSet
In Apache Spark", a DataFrame" is a distributed collection of data organized into named columns. It is similar to a table in a traditional relational database or a data frame in R or Python", but is distributed across a cluster of machines. DataFrames can be created from a variety of sources, including structured data files, tables in Hive", or external databases.
A DataSet" is a more type-safe version of a DataFrame". In a DataSet", the data is strongly typed, which means that each column" has a specific data type (such as an integer or a string). This can make it easier to work with data in a DataSet", as the type-safety can help to catch errors at compile time rather than at runtime. However, the tradeoff is that Datasets can be more difficult to work with than DataFrames, as they require the user" to specify the data types of each column.
Some of the key differences between DataFrames and Datasets in Spark" include:
- Type-safety: As mentioned above, DataFrames are not type-safe, while Datasets are. This means that DataFrames can contain data of any type, while Datasets have a fixed set of data types for each column.
- Performance: In general, Datasets tend to be faster than DataFrames, as the type-safety allows the Spark runtime to apply certain performance optimizations. However, the difference in performance can vary depending on the specific workload and the size of the data.
- Compatibility: DataFrames are compatible with a wider range of APIs and functions than Datasets, as they are more flexible in terms of the data types that they can contain. This means that it may be easier to work with DataFrames in some cases, especially if you need to use functions or libraries that are not compatible with Datasets.
Overall, both DataFrames and Datasets are useful abstractions in Spark that can make it easier to work with large volumes of data. The choice between the two will depend on your specific needs and requirements, and may depend on factors such as performance, type-safety, and compatibility with other APIs and libraries.
Spark Actions And Transformations
In Apache Spark", actions and transformations are operations that can be applied to data in a distributed DataSet".
Actions are operations that trigger the execution of a Spark job and return a result. Some examples of actions include:
count
: Returns the number of rows in a DataSet".collect
: Returns the entire DataSet" as an array on the driver node.take
: Returns the first N rows of a DataSet".
Transformations are operations that create a new DataSet" from an existing one. Transformations are lazy, which means that they are not executed until an action is called on the transformed DataSet". Some examples of transformations include:
filter
/ where
: Returns a new DataSet" that contains only the rows that meet a specified condition.map
: Returns a new DataSet" by applying a function to each row in the original DataSet".groupBy
: Returns a new DataSet" that groups the rows of the original DataSet" by a specified key.
PySpark
PySpark is the Python" interface to Spark, and it allows users to write Spark programs using the Python programming language.
PySpark" is well-suited for a wide range of data processing tasks, including batch processing, stream processing, machine learning", and interactive data analytics. It can be run on a variety of platforms, including standalone clusters, cloud platforms, and Hadoop" clusters.
Some of the key features of PySpark" include:
- Ease of use: PySpark" has a simple and expressive programming" model, as well as a wide range of high-level APIs for common data processing tasks. This makes it easy for developers to build and deploy data processing pipelines without having to worry about the low-level details of distributed computing.
- Flexibility: PySpark" can be used for a wide range of data processing tasks, including batch processing, stream processing, machine learning", and interactive data analytics. It can also be used to process data in a variety of formats, including structured data, semi-structured data, and unstructured data.
- Scalability: PySpark" can scale out to large clusters of machines, making it well-suited for processing very large volumes of data. It also includes features for fault tolerance and recovery, which help to ensure that data processing jobs can be completed even in the event of machine or network failures.
Spark Scala API vs Spark Python API (PySpark) Filter / Where
Scala Spark and PySpark" are both implementations of the Apache Spark" data processing platform. Scala Spark uses the Scala" programming language, while PySpark" uses Python.
Both Scala Spark and PySpark" use the Spark SQL" engine, which includes a component called the Catalyst optimizer". The Catalyst optimizer" is a query optimization engine that is responsible for optimizing the logical and physical plans that are generated by the Spark SQL" engine. It uses a variety of optimization techniques, such as rule-based optimization and cost-based optimization, to transform the input query into an efficient execution plan.
Overall, the main difference between Scala Spark and PySpark" is the programming language that is used. Scala Spark is based on the Scala language, which is a statically-typed language that is known for its conciseness and performance. PySpark" is based on Python, which is a dynamically-typed language that is known for its simplicity and readability. Both languages have their own strengths and weaknesses, and the choice between Scala Spark and PySpark" will depend on the specific needs and preferences of the user".
SQL FIlter Data
In SQL", the WHERE
clause is used to filter rows from a table based on a boolean condition. The WHERE
clause is typically used in a SELECT
, UPDATE
, or DELETE
statement, and it allows you to specify a condition that determines which rows should be included or affected by the statement.
SELECT * FROM customers WHERE age > 30;
This SELECT
statement will return all rows from the customers
table where" the value in the age
column is greater than 30.
You can also use multiple conditions in the WHERE
clause by using logical operators such as AND
and OR
. For example, to select only the rows where the age
column is greater than 30 and the gender
column is “male”, you can use the following SELECT
statement:
SELECT * FROM customers WHERE age > 30 AND gender = 'male';
Overall, the WHERE
clause is a powerful tool for filtering rows from a table based on a boolean condition. It is an essential part of the SQL" language and is widely used in data analysis and data management tasks.
Spark Where And Filter DataFrame Or DataSet
When we work with data one of the most common operation on data is to filter some rows from tables and then execute additional calculation only on the subset of data.
In Spark to filter data we can use two methods which are equivalent each other. In other words – the where()
is an alias for filter()
function.
To understand it we must deep into the Spark code and we see that when we call the where
method the filter
method is executed.
We have these two method to choice do to fact that for people which the SQL" is the origin the where()
method is more natural, but for people which come from programming world the filter()
method is obvious.
def where(condition: Column): Dataset[T] = filter(condition)
As usual we will focus on the examples of Spark code and Scala programming language.
Spark filter() Function On DataFrame Or DataSet
As I described earlier the where() function is an alias for filter() method, so we will only focus on the syntaxes for filter()
function. Spark comes to us with 4 possible syntaxes:
// Filters rows using the given condition. def filter(condition: Column): Dataset[T] // Filters rows using the given SQL expression. def filter(conditionExpr: String): Dataset[T] // Returns a new DataFrame or Dataset that only contains elements where func returns true. def filter(func: T => Boolean): Dataset[T] def filter(func: FilterFunction[T]): Dataset[T]
Example Data
As usual at the beginning we will create the input DataFrame which we will use in the next examples.
val carsData = Seq( ("Ford Torino", 140, 3449, "US"), ("Chevrolet Monte Carlo", 150, 3761, "US"), ("BMW 2002", 113, 2234, "Europe"), ("Volkswagen Model 111", 97, 1834, "Europe"), ("Datsun 510 (sw)", 97, 2288, "Japan") ) val columns = Seq("car", "horsepower", "weight", "origin") val carsDf = carsData.toDF(columns: _*)
The carsDf
schema and data is:
root |-- car: string (nullable = true) |-- horsepower: integer (nullable = false) |-- weight: integer (nullable = false) |-- origin: string (nullable = true) +---------------------+----------+------+------+ |car |horsepower|weight|origin| +---------------------+----------+------+------+ |Ford Torino |140 |3449 |US | |Chevrolet Monte Carlo|150 |3761 |US | |BMW 2002 |113 |2234 |Europe| |Volkswagen Model 111 |97 |1834 |Europe| |Datsun 510 (sw) |97 |2288 |Japan | +---------------------+----------+------+------+
Spark filter() DataFrame Using Column Condition
In the first example we will focus on filtering data by condition: Column
. Scroll a little bit up to see the first syntax.
In Spark we can refer to DataFrame" column in 4 ways:
- col(col_name)
- $”col_name”
- dataFrameObject(“col_name”)
- ‘col_name – Symbol literals are deprecated in Scala 2.13. Use Symbol(“origin”) instead
Please find the 8 equivalent syntaxes which give the same result. 4 for filter()
and 4 for where()
function:
carsDf.filter(col("origin") === lit("US")).show() carsDf.filter($"origin" === lit("US")).show() carsDf.filter(carsDf("origin") === lit("US")).show() carsDf.filter('origin === lit("US")).show() carsDf.where(col("origin") === lit("US")).show() carsDf.where($"origin" === lit("US")).show() carsDf.where(carsDf("origin") === lit("US")).show() carsDf.where('origin === lit("US")).show()
Spark Filter DataFrame Rows Using The Given SQL Expression
The previous example was more for people which comes from programming world. For people which the SQL" is more natural way to operate on data the second method will more self-explanatory.
In this example we will filter data by SQL" expression which will be passed to filter()
or where()
method.
carsDf.filter("origin == 'US'").show() carsDf.where("origin == 'US'").show()
Which prints the filtered two rows from our test DataFrame":
+---------------------+----------+------+------+ |car |horsepower|weight|origin| +---------------------+----------+------+------+ |Ford Torino |140 |3449 |US | |Chevrolet Monte Carlo|150 |3761 |US | +---------------------+----------+------+------+
Spark Filter DataFrame By Multiple Column Conditions
Till now have learned the basics. Now we can collect all the past knowledge and go step further. In this example we will try to filter DataFrame" or DataSet" by multiple column conditions. In Spark we can accomplish it easily by operators &&
(and), ||
(or) .
Check the following simple examples:
carsDf .filter(col("origin") === lit("US") && col("horsepower") >= lit(150)) .show() carsDf .where(col("origin") === lit("US") && col("horsepower") >= lit(150)) .show()
Spark Filter Nested Struct In DataFrame Or DataSet
Now we will go through more complex example where we need to filter nested Struct in DataFrame" or DataSet". Before we will start we need to create the test data which will be used in the code snippets.
val arrayStructData = Seq( Row(Row("Denver", "New York"), List("5th Avenue", "7th Avenue"), "US"), Row(Row("Los Angeles", "San Francisco"), List("Street1", "Street2"), "US"), Row( Row("Paris", "Warsaw"), List("Al. Jerozolimskie", "S2", "Street3"), "EU" ) ) val arrayStructureSchema = new StructType() .add( "country", new StructType() .add("city1", StringType) .add("city2", StringType) ) .add("streets", ArrayType(StringType)) .add("country_code", StringType); val cityData = spark.createDataFrame( spark.sparkContext.parallelize(arrayStructData), arrayStructureSchema ) cityData.printSchema() cityData.show()
Which gives results:
root |-- country: struct (nullable = true) | |-- city1: string (nullable = true) | |-- city2: string (nullable = true) |-- streets: array (nullable = true) | |-- element: string (containsNull = true) |-- country_code: string (nullable = true) +--------------------+--------------------+------------+ | country| streets|country_code| +--------------------+--------------------+------------+ | {Denver, New York}|[5th Avenue, 7th ...| US| |{Los Angeles, San...| [Street1, Street2]| US| | {Paris, Warsaw}|[Al. Jerozolimski...| EU| +--------------------+--------------------+------------+
To filter nested columns in DataFrame" use the following syntax to filter struct key in DataSet" or DataFrame":
cityData .filter(cityData("country.city2") === "Warsaw") .show(false)
And the outcome is:
+---------------+--------------------------------+------------+ |country |streets |country_code| +---------------+--------------------------------+------------+ |{Paris, Warsaw}|[Al. Jerozolimskie, S2, Street3]|EU | +---------------+--------------------------------+------------+
Spark Filter Array Column In DataFrame Or DataSet
Using Spark you can easily filter array data which are stored in some columns as well. Since Spark 1.5.0 version Spark" provides array_contains()
method which allows us to filter array column in DataFrame" or DataSet" and returns null if the array is null, true if the array contains value, and false otherwise.
def array_contains(column: Column, value: Any): Column = withExpr { ArrayContains(column.expr, lit(value).expr) }
In the next example we reuse the previous cityData
DataFrame which has column" streets as Array
.
cityData .filter(array_contains(col("streets"), "7th Avenue")) .show(false)
Which leads to the following results:
+------------------+------------------------+------------+ |country |streets |country_code| +------------------+------------------------+------------+ |{Denver, New York}|[5th Avenue, 7th Avenue]|US | +------------------+------------------------+------------+
Full Code
package com.bigdataetl.sparktutorial.sql import org.apache.spark.sql.functions.{array_contains, col, lit} import org.apache.spark.sql.types.{ArrayType, StringType, StructType} import org.apache.spark.sql.{Row, SparkSession} object SparkFilterAndWhere extends App { val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("BigData-ETL.com") .getOrCreate() import spark.implicits._ val carsData = Seq( ("Ford Torino", 140, 3449, "US"), ("Chevrolet Monte Carlo", 150, 3761, "US"), ("BMW 2002", 113, 2234, "Europe"), ("Volkswagen Model 111", 97, 1834, "Europe"), ("Datsun 510 (sw)", 97, 2288, "Japan") ) val columns = Seq("car", "horsepower", "weight", "origin") val carsDf = carsData.toDF(columns: _*) carsDf.printSchema() carsDf.show(false) println("Spark filter() DataFrame Using Column Condition") carsDf.filter(col("origin") === lit("US")).show() carsDf.filter($"origin" === lit("US")).show() carsDf.filter(carsDf("origin") === lit("US")).show() carsDf.filter('origin === lit("US")).show() carsDf.where(col("origin") === lit("US")).show() carsDf.where($"origin" === lit("US")).show() carsDf.where(carsDf("origin") === lit("US")).show() carsDf.where('origin === lit("US")).show() println("Spark Filter DataFrame Rows Using The Given SQL Expression") carsDf.filter("origin == 'US'").show() carsDf.where("origin == 'US'").show(false) println("Spark Filter DataFrame By Multiple Column Conditions") carsDf .filter(col("origin") === lit("US") && col("horsepower") >= lit(150)) .show() carsDf .where(col("origin") === lit("US") && col("horsepower") >= lit(150)) .show() println("Spark Filter Nested Struct In DataFrame Or DataSet") val arrayStructData = Seq( Row(Row("Denver", "New York"), List("5th Avenue", "7th Avenue"), "US"), Row(Row("Los Angeles", "San Francisco"), List("Street1", "Street2"), "US"), Row( Row("Paris", "Warsaw"), List("Al. Jerozolimskie", "S2", "Street3"), "EU" ) ) val arrayStructureSchema = new StructType() .add( "country", new StructType() .add("city1", StringType) .add("city2", StringType) ) .add("streets", ArrayType(StringType)) .add("country_code", StringType); val cityData = spark.createDataFrame( spark.sparkContext.parallelize(arrayStructData), arrayStructureSchema ) cityData.printSchema() cityData.show(false) cityData .filter(cityData("country.city2") === "Warsaw") .show(false) println("Spark Filter Array Column In DataFrame Or DataSet") cityData .filter(array_contains(col("streets"), "7th Avenue")) .show(false) }
GitLab Repository
As usual, please find the full code on our GitLab" repository!
Summary
The WHERE clause in SQL" is used to filter rows from a table based on a boolean condition, and it is an important part of the SQL" language that is widely used in data analysis and data management tasks. It allows users to specify a condition that determines which rows should be included or affected by a statement, and it can be used in conjunction with logical operators such as AND and OR to create complex filtering criteria.
Thank you for visiting my site. I hope that after reading this post you will learn how to filter DataFrame" and the various Spark filter
syntaxes now look more familiar then before!
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!