Since version 2.0.0 the Spark withColumn
DataFrame" function was introduced. This method returns a new DataFrame or Dataset that includes a new column or replaces an existing column" with the same name.
Spark withColumn DataFrame
The phrase in the column must only relate to properties given by this Dataframe" or Dataset". Adding a column that refers to another DataFrame" or Dataset" is a mistake.
This method: withColumn() includes an internal projection. As a result, invoking it several times, for example, via loops to add additional columns, might build large plans, causing performance concerns and potentially a StackOverflowException. Use select() with many columns at once to avoid this.
The Spark withColumn()
method takes two parameters:
- colName:
String
=> the name of column" which we want to create or replace in DataFrame" - col:
Column
=> the Column object which will be a value for given column name
Spark withColumn
function is the Spark transformation. It means that Spark" will wait till the action to execute the logic.
def withColumn(colName: String, col: Column): DataFrame
Table of Contents
Introduction
SQL (Structured Query Language)
SQL" (Structured Query Language) is a programming" language designed for managing and manipulating data stored in relational databases. It is the standard language for interacting with databases and is used to create, read, update, and delete data stored in tables.
SQL" is a declarative language, which means that you specify what you want to do, and the database" system figures out how to do it. For example, you can use SQL" to select data from a table, filter the data using a WHERE
clause, and sort the data using an ORDER BY
clause.
SQL" has a wide range of features, including the ability to:
- Create and modify databases and tables
- Insert, update, and delete data from tables
- Select and filter data from tables
- Join data" from multiple tables
- Group and aggregate data
- Create and manipulate views
SQL" is used by a wide range of applications and is supported by most relational database" management systems (RDBMS), including MySQL", Oracle", and Microsoft SQL Server".
Apache Spark
Apache Spark" is a fast and flexible open-source data processing engine for big data" analytics. It was developed at UC Berkeley’s AMPLab in 2009 and has since become one of the most widely used big data" processing frameworks.
Spark allows you to process and analyze large datasets quickly and efficiently, using a wide range of data processing and machine learning" algorithms. It is designed to be easy to use and flexible, with a simple API that supports a wide range of programming" languages, including Python", R, Java", and Scala".
Spark has several key features that make it well-suited for big data" analytics:
- In-memory processing: Spark stores data in memory, which allows it to process data much faster than if it had to read and write to disk.
- Resilient Distributed Datasets (RDDs): RDDs are the basic building blocks of Spark", and they allow you to distribute data across a cluster of machines and process it in parallel.
- Lazy evaluation: Spark uses lazy evaluation, which means that it only processes data when it is needed. This allows it to optimize the execution of complex data pipelines and avoid unnecessary computations.
- DAG execution engine: Spark has a DAG" (Directed Acyclic Graph) execution engine, which allows it to optimize the execution of data pipelines and perform efficient distributed data processing.
Overall, Apache Spark" is a powerful and flexible tool for big data" analytics, and it is widely used in a variety of industries, including finance, healthcare, and e-commerce.
Spark DataFrame
A DataFrame" is a distributed collection of data that is organized into named columns and can be processed using Spark. It is similar to a table in a relational database or a data frame in R or Python".
You can create a DataFrame" from various sources, like a collection of objects, a CSV file, or a SQL database", and then use it to filter, aggregate, and Join data", or to perform SQL-like queries.
DataFrames are built on top of RDDs and provide a higher-level API for distributed data processing, and they support various data sources and formats. The DataFrame" API is a key part of Spark and is commonly used for data manipulation and analysis in various applications.
Spark DataSet
A Dataset" is a distributed collection of data that is organized into named columns and is similar to a DataFrame" in Spark. It was introduced in Spark 1.6 as a new API for constructing and manipulating data.
A key difference between a DataFrame" and a Dataset is that a Dataset" is strongly typed, meaning that each column has a specific data type, like Integer or String. This allows for type-safe data manipulation and can help catch errors early on.
A Dataset" can be created from various sources, like objects or a CSV" file, and it can be transformed using operations like map and filter, or queried using the Spark SQL" API. In Spark 2.0, the DataFrame" and Dataset" APIs were merged, with the Dataset" API becoming the primary way to work with structured data in Spark.
However, the DataFrame" API is still commonly used. The Dataset" API is a useful tool for structured data manipulation and analysis in Spark.
SQL Select Vs Spark .withColumn
In SQL", the SELECT
statement is used to select a subset of columns from a table. It has the following syntax:
SELECT col1, col2, col3 FROM table;
In Spark, the .withColumn
method is used to add a new column to a DataFrame" or to replace an existing column.
Overall, the SELECT
statement in SQL" and the .withColumn
method in Spark" serve similar purposes, but they have some key differences. The SELECT
statement is used to select a subset of columns from a table, while .withColumn
can be used to add a new column to a DataFrame" or to replace an existing column. .withColumn
can also apply a function to a column, whereas the SELECT
statement cannot.
Spark withColumn Example
Example Data
To present some examples firstly we must prepare some DataFrame" which will be changing by us during this this tutorial". So let’s create the well known DataFrame" with some data about cars.
val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("BigData-ETL.com") .getOrCreate() import spark.implicits._ val carsData = Seq( ("Ford Torino", 140, 3449, "US"), ("Chevrolet Monte Carlo", 150, 3761, "US"), ("BMW 2002", 113, 2234, "Europe") ) val columns = Seq("car", "horsepower", "weight", "origin") val carsDf = carsData.toDF(columns: _*)
Spark Replace Column Value In DataFrame
The first example is to replace existing column in DataFrame". The withColumn()
function can be used to update the value in existing column.
Let’s update the weight
column and set it for 2000
for all the records in DataFrame".
carsDf.withColumn("weight", lit(2000)) .show()
Spark Add New Column To DataFrame
To add new column to DataFrame" we can use the withColumn() methods as well. To create new column in DataFrame" or Dataset" we must pass the colName as the first argument and the column values as the second argument.
Let’s create new columns in DataFrame" which will be called: city
and continent
and add the value of unknown
for both:
carsDf .withColumn("city", lit("unknown")) .withColumn("continent", lit("unknown"))
Spark Acquire New Column Based On Existing Column In DataFrame
To create new column which will derive values from another column in DataFrame" we need to use this column withColumn()
function. It’s very easy and very common operation.
Let’s create new columns in DataFrame" with name: kilowatt_power
which will derive from horsepower
column:
carsDf .withColumn("kilowatt_power", col("horsepower") * lit(0.7457)) .show()
It gets the following results:
+--------------------+----------+------+------+------------------+ | car|horsepower|weight|origin| kilowatt_power| +--------------------+----------+------+------+------------------+ | Ford Torino| 140| 3449| US|104.39800000000001| |Chevrolet Monte C...| 150| 3761| US| 111.855| | BMW 2002| 113| 2234|Europe| 84.2641| +--------------------+----------+------+------+------------------+
Spark Change Column Data Type In DataFrame
The withColumn
function can be also used to change the column data type in DataFrame". In SQL" we often write:
SELECT cast(weight as int) as weight_as_int from some_table...
In Spark we can accomplish the same result using pure Spark API by writing expression in withColumn
method. To change column data type in DataFrame" from Integer
to Double
we can write:
carsDf .withColumn("horsepower", col("horsepower").cast(DoubleType)) .show()
Spark Rename Column In DataFrame
To rename existing column in Spark DataFrame" you can use the built-in function: withColumnRenamed
, which returns a new DataFrame" or Dataset" with a column renamed. This is a no-op if schema doesn’t contain existing name. This method takes two parameters which both are String type:.
In the following example we want to rename column car
to car_name
:
carsDf .withColumnRenamed("car", "car_name") .printSchema()
The new schema of DataFrame" is:
Spark Rename Column In DataFrame root |-- car_name: string (nullable = true) |-- horsepower: integer (nullable = false) |-- weight: integer (nullable = false) |-- origin: string (nullable = true)
Spark Update, Replace or Add Multiple Columns In DataFrame
To change multiple columns in one DataFrame" you can use the withColumns
methods from Spark API or create temporary view for some table in Spark Context and then access column by pure SQL" syntax.
In the following examples I will present you both ways. It’s up to you which one you will use / prefer. The most important thing is: no matter which one you will choose – from performance perspective both works in the same way, because both are analysed by Spark Catalyst library.
First example with Spark API and withColumns
function:
val colsMap = Map( "kilowatt_power" -> col("horsepower") * lit(0.7457), "double_kilowatt_power" -> col("horsepower") * lit(0.7457 * 2) ) carsDf .withColumns(colsMap) .show()
The second example which uses the SQL" and Spark temporary views:
carsDf.createOrReplaceTempView("cars") spark .sql( "SELECT horsepower * 0.7457 as kilowatt_power, horsepower * 0.7457 * 2 as double_kilowatt_power FROM cars" ) .show()
Spark Delete Column From DataFrame
To delete or drop column from DataFrame" or Dataset" you should use the drop
method from Spark API.
drop()
method returns a new Dataset" with a column dropped. This is a no-op if schema doesn’t contain column name. This method can only be used to drop top level columns. the colName string is treated literally without further interpretation.
In the following example we will drop car
column from carsDf DataFrame":
carsDf .drop("car") .show()
Spark Drop Multiple Columns From DataFrame
And now the last example how to drop multiple columns from existing DataFrame" or Dataset":
carsDf .drop("car", "horsepower") .show()
Full Code
package com.bigdataetl.sparktutorial.sql import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types.DoubleType object SparkWithColumn extends App { val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("BigData-ETL.com") .getOrCreate() import spark.implicits._ val carsData = Seq( ("Ford Torino", 140, 3449, "US"), ("Chevrolet Monte Carlo", 150, 3761, "US"), ("BMW 2002", 113, 2234, "Europe") ) val columns = Seq("car", "horsepower", "weight", "origin") val carsDf = carsData.toDF(columns: _*) println("Replace Column Value In DataFrame") carsDf .withColumn("weight", lit(2000)) .show() println("Add New Column To DataFrame") carsDf .withColumn("city", lit("unknown")) .withColumn("continent", lit("unknown")) println("Acquire New Column Based On Existing Column In DataFrame") carsDf .withColumn("kilowatt_power", col("horsepower") * lit(0.7457)) .show() println("Change Column Data Type In DataFrame") carsDf .withColumn("horsepower", col("horsepower").cast(DoubleType)) .show() println("Spark Rename Column In DataFrame") carsDf .withColumnRenamed("car", "car_name") .printSchema() println("Update, Replace or Add Multiple Columns In DataFrame") val colsMap = Map( "kilowatt_power" -> col("horsepower") * lit(0.7457), "double_kilowatt_power" -> col("horsepower") * lit(0.7457 * 2) ) carsDf .withColumns(colsMap) .show() carsDf.createOrReplaceTempView("cars") spark .sql( "SELECT horsepower * 0.7457 as kilowatt_power, horsepower * 0.7457 * 2 as double_kilowatt_power FROM cars" ) .show() println("Spark Delete Column From DataFrame") carsDf .drop("car") .show() }
GitLab Repository
As usual, please find the full code on our GitLab repository!
Performance Of Spark .withColumn vs .select
In Apache Spark", both the withColumn
and select
methods can be used to transform DataFrames. However, they may have different performance characteristics depending on the specific use case.
The withColumn
method is used to add a new column or replace the value of an existing column in a DataFrame". It takes two arguments: the name of the new column and a transformation function that defines the value of the new column. The transformation function is applied to each row of the DataFrame" to compute the value of the new column.
The select
method, on the other hand, is used to select a subset of columns from a DataFrame". It does not involve any transformations and simply selects the specified columns from the input DataFrame".
In general, the select
method is more efficient than the withColumn
method, as it does not involve any transformations and simply selects the specified columns from the input DataFrame". However, the specific performance characteristics may depend on the size and structure of the DataFrame", the complexity of the transformation function, and the resources available on the Spark cluster.
It is always a good idea to measure the performance of your Spark jobs and choose the method that performs best for your specific use case.
Summary
In Apache Spark", the withColumn
method is used to add a new column or replace the value of an existing column in a DataFrame". It takes two arguments: the name of the new column and a transformation function that defines the value of the new column.
In terms of performance, .withColumn
may be slightly slower than .select
because it requires an extra step to apply the function to the column. However, the difference in performance is typically not significant unless you are working with very large datasets.
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!