Spark Select – How To Select Columns From DataFrame – Check 11 Great Examples!

Spark Select - How To Select Columns From DataFrame
Share this post and Earn Free Points!

One of the most common operation which we can execute on DataFrame" is the Spark Select function. In this tutorial" I will show you how to select columns from DataFrame" or DataSet".

Spark select() function comes from Spark" SQL" library – it’s a part of DataSet" class in Spark code. We can select one or multiple columns form DataFrame" which can be additionally aliased by new names.

Spark select() function is the transformation which returns the new DataFrame" with selected columns.

The select() function has multiple variations. We can pass as the argument:

  • Set of Column's
  • columns as Set of Strings
// Set of Columns
def select(cols: Column*): ...

// Columns as Set of Strings
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

Introduction

Apache Spark

Apache Spark" is an open-source data processing engine designed for large-scale data processing. It is particularly well-suited for distributed computing, and can run on a variety of platforms, including standalone clusters, Hadoop" clusters, and the cloud.

Some of the key features of Apache Spark" include:

  • In-memory computing: Spark stores data in memory, which allows for fast processing of data.
  • Resilient Distributed Datasets (RDDs): Spark’s core data structure, which allows for fault-tolerant processing of data.
  • A wide variety of data sources: Spark can read data from a variety of sources, including HDFS", Cassandra, and HBase".
  • Multiple programming languages: Spark supports programming in Java", Scala", Python", and R.
  • Support for a wide range of data processing tasks: Spark can be used for batch processing, stream processing, machine learning", and interactive data analysis.

Spark DataFrame

In Apache Spark", a DataFrame" is a distributed collection of data organized into named columns. It is similar to a table in a relational database" or a data frame in R/Python, but with richer optimizations under the hood.

DataFrames can be created from a variety of sources, including structured data files, tables in Hive", external databases, or existing RDDs. They also support a wide range of transformations and actions, such as filtering, aggregation, and joining with other data sources.

One of the key benefits of using DataFrames is that they allow you to use the familiar SQL" syntax to manipulate the data. You can use SQL" commands to filter, group, and aggregate data, as well as to join data" from multiple DataFrames.

Spark DataSet

n Apache Spark", a Dataset is a data structure that represents a distributed collection of strongly-typed data. It is similar to a DataFrame, but the data is statically-typed, which means that every item in the dataset" has the same type. This allows the Dataset to be optimized for performance and enables the use of a wider range of functions on the data.

You can use the select method on a Dataset to retrieve a specific set of columns. The syntax for the select method is the same for DataFrame‘s and Dataset‘s.

SQL Select

In SQL", the SELECT statement is used to retrieve data from a database.

The SELECT clause specifies the columns that you want to retrieve. You can specify one or more columns, separated by commas. The FROM clause specifies the table that you want to retrieve data from. The WHERE clause is optional and is used to specify a condition that rows must satisfy in order to be included in the result set.

In SQL", Data Query Language (DQL) is a set of commands that is used to retrieve data from a database. It is different from Data Definition Language (DDL) and Data Modification Language (DML), which are used to define and modify the structure and data in a database, respectively.

The main DQL command is SELECT, which is used to retrieve data from one or more tables in a database.

Spark Select

In Apache Spark", the select function allows you to retrieve a specific set of columns from a DataFrame. It takes a variable number of column" names as arguments and returns a new DataFrame with only the specified columns. The select function can be used to select columns based on their names, or to apply SQL" functions or expressions to the data in the DataFrame.

How To Select Columns From DataFrame

Example Data

To do some exercises first of all we need to prepare some data. We will create DataFrame" from the Seq of Tuples. Check the tutorial where I described how to create DataFrame in Spark. One of the possibility is to convert Scala" object to DataFrame".

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("BigData-ETL.com")
    .getOrCreate()

  import spark.implicits._

  val carsData = Seq(
    ("Ford Torino",140,3449,"US"),
    ("Chevrolet Monte Carlo",150,3761,"US"),
    ("BMW 2002",113,2234,"Europe")
  )
  val columns = Seq("car","horsepower","weight","origin")
  val carsDf = carsData.toDF(columns:_*)
  carsDf.show(false)

And the result is:

+---------------------+----------+------+------+
|car                  |horsepower|weight|origin|
+---------------------+----------+------+------+
|Ford Torino          |140       |3449  |US    |
|Chevrolet Monte Carlo|150       |3761  |US    |
|BMW 2002             |113       |2234  |Europe|
+---------------------+----------+------+------+

Spark Select Single Column

As I mentioned to select single column in Spark you need to use select() function. Spark will do not change the existing DataFrame", but it creates the new one with selected column. It’s due to fact Spark is immutable .

  // DataFrame with one 'car' column
  carsDf.select("car").show(false)

  // or
  carsDf.select(col("car")).show(false)

And the result is:

+---------------------+
|car                  |
+---------------------+
|Ford Torino          |
|Chevrolet Monte Carlo|
|BMW 2002             |
+---------------------+

Select Multiple Columns Spark

To select multiple columns is similarly simple as select one column. We can just provide comma-separated columns into select() function

  // DataFrame with multiple columns
  carsDf.select("car", "origin", "horsepower").show(false)

  val subColumns = Seq("origin", "horsepower")
  carsDf.select("car", subColumns: _*).show(false)

  carsDf.select(subColumns.map(c => col(c)): _*).show(false)

And the result is:

+---------------------+------+----------+
|car                  |origin|horsepower|
+---------------------+------+----------+
|Ford Torino          |US    |140       |
|Chevrolet Monte Carlo|US    |150       |
|BMW 2002             |Europe|113       |
+---------------------+------+----------+

// carsDf.select(subColumns.map(c => col(c)): _*).show(false)
+------+----------+
|origin|horsepower|
+------+----------+
|US    |140       |
|US    |150       |
|Europe|113       |
+------+----------+

Spark Select By Position Index In DataFrame

To select column by position index in DataFrame" or DataSet" you can use the column function and select element from Array[String] by index:

carsDf.select(carsDf.columns(2)).show()

Spark Select Columns From Sequence

In previous example I presented you how you can convert any Scala" object to Seq of Spark Columns. It allows you select all columns from List, Seq or Array of Strings

  val colList = List("origin", "horsepower")
  carsDf.select(colList.map(c => col(c)): _*).show(false)

  val colSeq = Seq("origin", "horsepower")
  carsDf.select(colSeq.map(c => col(c)): _*).show(false)
  
  val colArray = Array("origin", "horsepower")
  carsDf.select(colArray.map(c => col(c)): _*).show(false)

Spark Select N Columns From DataFrame

Sometimes we would like to select only first few columns from DataFrame". We don’t want to specify the columns explicitly.

DataFrame" class provides columns methods which returns all column names as an array.

  val myDfColumns: Array[String] = carsDf.columns
  println(myDfColumns.mkString(","))

Which provides the following output:

car,horsepower,weight,origin

If would like to select only first two columns from DataFrame" we can use the slice method from ArrayOps Scala" class which returns the new Array[String] from and until boundaries (indexes):

carsDf.select(carsDf.columns.slice(0,2).map(c => col(c)): _*).show()

carsDf.select(carsDf.columns.slice(1,3).map(c => col(c)): _*).show()

Spark Select Columns By Regular Expression

We can limit columns in select statement by regular expression which we can provide to colRegex() function. It selects column based on the column name specified as a regex and returns it as Column.

In the following example we want to filter these columns which startWith "horse" string.

carsDf.select(carsDf.colRegex("`^horse.*`")).show()

Spark Select Columns From DataFrame – Other Examples

Now I will show you how you can using pure Scala" functions (not Spark function) play with Spark DataFrame" or Spark DataSet". We will play with columns function to filter columns by some expressions.

Spark Select Columns Which Contains

Try to filter only these columns which contains "r" string in the columns name.

carsDf.select(carsDf.columns.filter(_.contains("r")).map(c => col(c)): _*).show()

Spark Select Columns Which Starts With

Try to filter only these columns which starts with "c" string:

carsDf.select(carsDf.columns.filter(_.startsWith("c")).map(c => col(c)): _*).show()

Spark Select Columns Which Ends With

Try to filter only these columns which ends with "t" string:

carsDf.select(carsDf.columns.filter(_.endsWith("t")).map(c => col(c)): _*).show()

Spark Select Column From DataFrame Using SelectExpr Function

Since version 2.0.0 Spark provides powerful function selectExpr() which selects a set of SQL" expressions. This is a variant of select that accepts SQL" expressions.

In the below example I will select car column and alias it by new column name which will be: car_name

carsDf.selectExpr("car as car_name").show()

Which provides results:

+--------------------+
|            car_name|
+--------------------+
|         Ford Torino|
|Chevrolet Monte C...|
|            BMW 2002|
+--------------------+

The general approach is to provide expression which will be resolved by Spark internally.

   // The following are equivalent:
   ds.selectExpr("colA", "colB as newName", "abs(colC)")
   ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))

Spark Union And UnionByName In DataFrame

Spark Select With Union DataFrame

In Apache Spark", you can use the union method to combine the rows of two DataFrames or Datasets. The union method returns a new DataFrame or Dataset that contains the union of the rows in the two input DataFrames or Datasets.

Here is an example of using the union method on DataFrames in Python":

df1 = spark.read.json("/path/to/data1.json")
df2 = spark.read.json("/path/to/data2.json")
union_df = df1.union(df2)

This example creates two DataFrames called df1 and df2 by reading JSON" files, and then creates a new DataFrame called union_df that contains the union of the rows in df1 and df2.

Here is an example of using the union method on Datasets in Scala":

case class Person(name: String, age: Int)
val ds1: Dataset[Person] = Seq(Person("AAA", 10), Person("BBB", 20)).toDS
val ds2: Dataset[Person] = Seq(Person("CCC", 30), Person("DDD", 40)).toDS
val unionDs = ds1.union(ds2)

This example creates two Datasets called ds1 and ds2 of Person objects, and then creates a new Dataset called unionDs that contains the union of the rows in ds1 and ds2.

Spark Select UnionByName DataFrame

In Apache Spark", you can use the unionByName method to combine the rows of two DataFrames or Datasets, while ensuring that the resulting DataFrame or Dataset has the same schema. The unionByName method returns a new DataFrame or Dataset that contains the union of the rows in the two input DataFrames or Datasets, with the columns in the same order as the first DataFrame or Dataset.

Spark Union Vs UnionByName

In Apache Spark", the union and unionByName methods allow you to combine the rows of two DataFrames or Datasets. The main difference between the two methods is how they handle schema differences between the input DataFrames or Datasets.

The union method simply combines the rows of the two input DataFrames or Datasets and returns a new DataFrame or Dataset with the combined rows. It does not check for schema differences between the input DataFrames or Datasets, so it is possible for the resulting DataFrame or Dataset to have a different schema than the input DataFrames or Datasets.

The unionByName method, on the other hand, combines the rows of the two input DataFrames or Datasets and returns a new DataFrame or Dataset with the combined rows and the same schema as the first input DataFrame or Dataset. If the input DataFrames or Datasets have different schemas, the unionByName method will rename the columns in the second DataFrame or Dataset to match the column" names in the first DataFrame or Dataset.

Full Code

package com.bigdataetl.sparktutorial.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

object SelectMethod extends App {

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("BigData-ETL.com")
    .getOrCreate()

  import spark.implicits._

  val carsData = Seq(
    ("Ford Torino",140,3449,"US"),
    ("Chevrolet Monte Carlo",150,3761,"US"),
    ("BMW 2002",113,2234,"Europe")
  )
  val columns = Seq("car","horsepower","weight","origin")
  val carsDf = carsData.toDF(columns:_*)
  carsDf.show(false)

  println("Spark Select Single Column")
  carsDf.select("car").show(false)
  carsDf.select(col("car")).show(false)

  println("Spark Select Multiple Columns")
  carsDf.select("car", "origin", "horsepower").show(false)

  val subColumns = Seq("origin", "horsepower")
  carsDf.select("car", subColumns: _*).show(false)
  carsDf.select(subColumns.map(c => col(c)): _*).show(false)

  println("Spark Select By Position Index In DataFrame")
  carsDf.select(carsDf.columns(2)).show()

  println("Spark Select Columns From Sequence")
  val colList = List("origin", "horsepower")
  carsDf.select(colList.map(c => col(c)): _*).show(false)

  val colSeq = Seq("origin", "horsepower")
  carsDf.select(colSeq.map(c => col(c)): _*).show(false)

  val colArray = Array("origin", "horsepower")
  carsDf.select(colArray.map(c => col(c)): _*).show(false)

  println("Spark Select N Columns From DataFrame")
  val myDfColumns: Array[String] = carsDf.columns
  println(myDfColumns.mkString(","))

  carsDf.select(carsDf.columns.slice(0,2).map(c => col(c)): _*).show()
  carsDf.select(carsDf.columns.slice(1,3).map(c => col(c)): _*).show()

  println("Spark Select Columns By Regular Expression")
  carsDf.select(carsDf.colRegex("`^horse.*`")).show()

  println("Spark Select Columns Which Contains")
  carsDf.select(carsDf.columns.filter(_.contains("r")).map(c => col(c)): _*).show()

  println("Spark Select Columns Which Starts With")
  carsDf.select(carsDf.columns.filter(_.startsWith("c")).map(c => col(c)): _*).show()

  println("Spark Select Columns Which Ends With")
  carsDf.select(carsDf.columns.filter(_.endsWith("t")).map(c => col(c)): _*).show()

  println("Spark Select Column From DataFrame Using SelectExpr Function")
  carsDf.selectExpr("car as car_name").show()
}

GitLab Repository

As usual, please find the full code on our GitLab repository!

Summary

In this tutorial" you have learned how to select one, multiple columns from DataFrame" using various methods which can be helpful in many situation when you have to deal with more complex code written in Spark.

Additionally I have described the differences between union an unionByName in Spark.

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

How useful was this post?

Click on a star to rate it!

Average rating 5 / 5. Vote count: 2

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?