One of the most common operation which we can execute on DataFrame" is the Spark Select
function. In this tutorial" I will show you how to select columns from DataFrame" or DataSet".
Spark select()
function comes from Spark" SQL" library – it’s a part of DataSet" class in Spark code. We can select one or multiple columns form DataFrame" which can be additionally aliased by new names.
Spark select()
function is the transformation which returns the new DataFrame" with selected columns.
The select()
function has multiple variations. We can pass as the argument:
Set
ofColumn's
- columns as
Set
ofStrings
// Set of Columns def select(cols: Column*): ... // Columns as Set of Strings def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
Table of Contents
Introduction
Apache Spark
Apache Spark" is an open-source data processing engine designed for large-scale data processing. It is particularly well-suited for distributed computing, and can run on a variety of platforms, including standalone clusters, Hadoop" clusters, and the cloud.
Some of the key features of Apache Spark" include:
- In-memory computing: Spark stores data in memory, which allows for fast processing of data.
- Resilient Distributed Datasets (RDDs): Spark’s core data structure, which allows for fault-tolerant processing of data.
- A wide variety of data sources: Spark can read data from a variety of sources, including HDFS", Cassandra, and HBase".
- Multiple programming languages: Spark supports programming in Java", Scala", Python", and R.
- Support for a wide range of data processing tasks: Spark can be used for batch processing, stream processing, machine learning", and interactive data analysis.
Spark DataFrame
In Apache Spark", a DataFrame" is a distributed collection of data organized into named columns. It is similar to a table in a relational database" or a data frame in R/Python, but with richer optimizations under the hood.
DataFrames can be created from a variety of sources, including structured data files, tables in Hive", external databases, or existing RDDs. They also support a wide range of transformations and actions, such as filtering, aggregation, and joining with other data sources.
One of the key benefits of using DataFrames is that they allow you to use the familiar SQL" syntax to manipulate the data. You can use SQL" commands to filter, group, and aggregate data, as well as to join data" from multiple DataFrames.
Spark DataSet
n Apache Spark", a Dataset
is a data structure that represents a distributed collection of strongly-typed data. It is similar to a DataFrame
, but the data is statically-typed, which means that every item in the dataset" has the same type. This allows the Dataset
to be optimized for performance and enables the use of a wider range of functions on the data.
You can use the select
method on a Dataset
to retrieve a specific set of columns. The syntax for the select
method is the same for DataFrame
‘s and Dataset
‘s.
SQL Select
In SQL", the SELECT
statement is used to retrieve data from a database.
The SELECT
clause specifies the columns that you want to retrieve. You can specify one or more columns, separated by commas. The FROM
clause specifies the table that you want to retrieve data from. The WHERE
clause is optional and is used to specify a condition that rows must satisfy in order to be included in the result set.
In SQL", Data Query Language (DQL) is a set of commands that is used to retrieve data from a database. It is different from Data Definition Language (DDL) and Data Modification Language (DML), which are used to define and modify the structure and data in a database, respectively.
The main DQL command is SELECT
, which is used to retrieve data from one or more tables in a database.
Spark Select
In Apache Spark", the select
function allows you to retrieve a specific set of columns from a DataFrame
. It takes a variable number of column" names as arguments and returns a new DataFrame
with only the specified columns. The select
function can be used to select columns based on their names, or to apply SQL" functions or expressions to the data in the DataFrame
.
How To Select Columns From DataFrame
Example Data
To do some exercises first of all we need to prepare some data. We will create DataFrame" from the Seq
of Tuples.
Check the tutorial where I described how to create DataFrame in Spark. One of the possibility is to convert Scala" object to DataFrame".
val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("BigData-ETL.com") .getOrCreate() import spark.implicits._ val carsData = Seq( ("Ford Torino",140,3449,"US"), ("Chevrolet Monte Carlo",150,3761,"US"), ("BMW 2002",113,2234,"Europe") ) val columns = Seq("car","horsepower","weight","origin") val carsDf = carsData.toDF(columns:_*) carsDf.show(false)
And the result is:
+---------------------+----------+------+------+ |car |horsepower|weight|origin| +---------------------+----------+------+------+ |Ford Torino |140 |3449 |US | |Chevrolet Monte Carlo|150 |3761 |US | |BMW 2002 |113 |2234 |Europe| +---------------------+----------+------+------+
Spark Select Single Column
As I mentioned to select single column in Spark you need to use select() function. Spark will do not change the existing DataFrame", but it creates the new one with selected column. It’s due to fact Spark is immutable .
// DataFrame with one 'car' column carsDf.select("car").show(false) // or carsDf.select(col("car")).show(false)
And the result is:
+---------------------+ |car | +---------------------+ |Ford Torino | |Chevrolet Monte Carlo| |BMW 2002 | +---------------------+
Select Multiple Columns Spark
To select multiple columns is similarly simple as select one column. We can just provide comma-separated columns into select()
function
// DataFrame with multiple columns carsDf.select("car", "origin", "horsepower").show(false) val subColumns = Seq("origin", "horsepower") carsDf.select("car", subColumns: _*).show(false) carsDf.select(subColumns.map(c => col(c)): _*).show(false)
And the result is:
+---------------------+------+----------+ |car |origin|horsepower| +---------------------+------+----------+ |Ford Torino |US |140 | |Chevrolet Monte Carlo|US |150 | |BMW 2002 |Europe|113 | +---------------------+------+----------+ // carsDf.select(subColumns.map(c => col(c)): _*).show(false) +------+----------+ |origin|horsepower| +------+----------+ |US |140 | |US |150 | |Europe|113 | +------+----------+
Spark Select By Position Index In DataFrame
To select column by position index in DataFrame" or DataSet" you can use the column function and select element from Array[String] by index:
carsDf.select(carsDf.columns(2)).show()
Spark Select Columns From Sequence
In previous example I presented you how you can convert any Scala" object to Seq of Spark Columns. It allows you select all columns from List
, Seq
or Array
of Strings
val colList = List("origin", "horsepower") carsDf.select(colList.map(c => col(c)): _*).show(false) val colSeq = Seq("origin", "horsepower") carsDf.select(colSeq.map(c => col(c)): _*).show(false) val colArray = Array("origin", "horsepower") carsDf.select(colArray.map(c => col(c)): _*).show(false)
Spark Select N Columns From DataFrame
Sometimes we would like to select only first few columns from DataFrame". We don’t want to specify the columns explicitly.
DataFrame" class provides columns
methods which returns all column names as an array.
val myDfColumns: Array[String] = carsDf.columns println(myDfColumns.mkString(","))
Which provides the following output:
car,horsepower,weight,origin
If would like to select only first two columns from DataFrame" we can use the slice
method from ArrayOps
Scala" class which returns the new Array[String] from and until boundaries (indexes):
carsDf.select(carsDf.columns.slice(0,2).map(c => col(c)): _*).show() carsDf.select(carsDf.columns.slice(1,3).map(c => col(c)): _*).show()
Spark Select Columns By Regular Expression
We can limit columns in select statement by regular expression which we can provide to colRegex()
function. It selects column based on the column name specified as a regex and returns it as Column
.
In the following example we want to filter these columns which startWith "horse"
string.
carsDf.select(carsDf.colRegex("`^horse.*`")).show()
Spark Select Columns From DataFrame – Other Examples
Now I will show you how you can using pure Scala" functions (not Spark function) play with Spark DataFrame" or Spark DataSet". We will play with columns
function to filter columns by some expressions.
Spark Select Columns Which Contains
Try to filter only these columns which contains "r"
string in the columns name.
carsDf.select(carsDf.columns.filter(_.contains("r")).map(c => col(c)): _*).show()
Spark Select Columns Which Starts With
Try to filter only these columns which starts with "c"
string:
carsDf.select(carsDf.columns.filter(_.startsWith("c")).map(c => col(c)): _*).show()
Spark Select Columns Which Ends With
Try to filter only these columns which ends with "t"
string:
carsDf.select(carsDf.columns.filter(_.endsWith("t")).map(c => col(c)): _*).show()
Spark Select Column From DataFrame Using SelectExpr Function
Since version 2.0.0 Spark provides powerful function selectExpr()
which selects a set of SQL" expressions. This is a variant of select that accepts SQL" expressions.
In the below example I will select car
column and alias it by new column name which will be: car_name
carsDf.selectExpr("car as car_name").show()
Which provides results:
+--------------------+ | car_name| +--------------------+ | Ford Torino| |Chevrolet Monte C...| | BMW 2002| +--------------------+
The general approach is to provide expression which will be resolved by Spark internally.
// The following are equivalent: ds.selectExpr("colA", "colB as newName", "abs(colC)") ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
Spark Union And UnionByName In DataFrame
Spark Select With Union DataFrame
In Apache Spark", you can use the union
method to combine the rows of two DataFrame
s or Dataset
s. The union
method returns a new DataFrame
or Dataset
that contains the union of the rows in the two input DataFrame
s or Dataset
s.
Here is an example of using the union
method on DataFrame
s in Python":
df1 = spark.read.json("/path/to/data1.json") df2 = spark.read.json("/path/to/data2.json") union_df = df1.union(df2)
This example creates two DataFrame
s called df1
and df2
by reading JSON" files, and then creates a new DataFrame
called union_df
that contains the union of the rows in df1
and df2
.
Here is an example of using the union
method on Dataset
s in Scala":
case class Person(name: String, age: Int) val ds1: Dataset[Person] = Seq(Person("AAA", 10), Person("BBB", 20)).toDS val ds2: Dataset[Person] = Seq(Person("CCC", 30), Person("DDD", 40)).toDS val unionDs = ds1.union(ds2)
This example creates two Dataset
s called ds1
and ds2
of Person
objects, and then creates a new Dataset
called unionDs
that contains the union of the rows in ds1
and ds2
.
Spark Select UnionByName DataFrame
In Apache Spark", you can use the unionByName
method to combine the rows of two DataFrame
s or Dataset
s, while ensuring that the resulting DataFrame
or Dataset
has the same schema. The unionByName
method returns a new DataFrame
or Dataset
that contains the union of the rows in the two input DataFrame
s or Dataset
s, with the columns in the same order as the first DataFrame
or Dataset
.
Spark Union Vs UnionByName
In Apache Spark", the union
and unionByName
methods allow you to combine the rows of two DataFrame
s or Dataset
s. The main difference between the two methods is how they handle schema differences between the input DataFrame
s or Dataset
s.
The union
method simply combines the rows of the two input DataFrame
s or Dataset
s and returns a new DataFrame
or Dataset
with the combined rows. It does not check for schema differences between the input DataFrame
s or Dataset
s, so it is possible for the resulting DataFrame
or Dataset
to have a different schema than the input DataFrame
s or Dataset
s.
The unionByName
method, on the other hand, combines the rows of the two input DataFrame
s or Dataset
s and returns a new DataFrame
or Dataset
with the combined rows and the same schema as the first input DataFrame
or Dataset
. If the input DataFrame
s or Dataset
s have different schemas, the unionByName
method will rename the columns in the second DataFrame
or Dataset
to match the column" names in the first DataFrame
or Dataset
.
Full Code
package com.bigdataetl.sparktutorial.sql import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.col object SelectMethod extends App { val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("BigData-ETL.com") .getOrCreate() import spark.implicits._ val carsData = Seq( ("Ford Torino",140,3449,"US"), ("Chevrolet Monte Carlo",150,3761,"US"), ("BMW 2002",113,2234,"Europe") ) val columns = Seq("car","horsepower","weight","origin") val carsDf = carsData.toDF(columns:_*) carsDf.show(false) println("Spark Select Single Column") carsDf.select("car").show(false) carsDf.select(col("car")).show(false) println("Spark Select Multiple Columns") carsDf.select("car", "origin", "horsepower").show(false) val subColumns = Seq("origin", "horsepower") carsDf.select("car", subColumns: _*).show(false) carsDf.select(subColumns.map(c => col(c)): _*).show(false) println("Spark Select By Position Index In DataFrame") carsDf.select(carsDf.columns(2)).show() println("Spark Select Columns From Sequence") val colList = List("origin", "horsepower") carsDf.select(colList.map(c => col(c)): _*).show(false) val colSeq = Seq("origin", "horsepower") carsDf.select(colSeq.map(c => col(c)): _*).show(false) val colArray = Array("origin", "horsepower") carsDf.select(colArray.map(c => col(c)): _*).show(false) println("Spark Select N Columns From DataFrame") val myDfColumns: Array[String] = carsDf.columns println(myDfColumns.mkString(",")) carsDf.select(carsDf.columns.slice(0,2).map(c => col(c)): _*).show() carsDf.select(carsDf.columns.slice(1,3).map(c => col(c)): _*).show() println("Spark Select Columns By Regular Expression") carsDf.select(carsDf.colRegex("`^horse.*`")).show() println("Spark Select Columns Which Contains") carsDf.select(carsDf.columns.filter(_.contains("r")).map(c => col(c)): _*).show() println("Spark Select Columns Which Starts With") carsDf.select(carsDf.columns.filter(_.startsWith("c")).map(c => col(c)): _*).show() println("Spark Select Columns Which Ends With") carsDf.select(carsDf.columns.filter(_.endsWith("t")).map(c => col(c)): _*).show() println("Spark Select Column From DataFrame Using SelectExpr Function") carsDf.selectExpr("car as car_name").show() }
GitLab Repository
As usual, please find the full code on our GitLab repository!
Summary
In this tutorial" you have learned how to select one, multiple columns from DataFrame" using various methods which can be helpful in many situation when you have to deal with more complex code written in Spark.
Additionally I have described the differences between union
an unionByName
in Spark.
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!