Spark Create DataFrame From RDD, File And RDBMS – 4 Data Sources

Spark Create DataFrame From RDD, File And RDBMS - 4 Data Sources
Share this post and Earn Free Points!

In Spark, the createDataFrame() and toDF() methods are used to manually build in Spark" to Create a DataFrame". You can using Spark" Create DataFrame" from:

  • already existing RDD", DataFrame or DataSet"
  • an RDD" of Product (e.g. case classes, tuples)
  • a local Seq of Product
  • an RDD" containing Rows using the given schema
  • a JavaRDD containing Rows using the given schema
  • from a java".util.List containing Rows using the given schema

I this tutorial I will presents you some examples how to create DataFrame" in Spark.

Introduction

Apache Spark

Apache Spark" is a popular open-source tool used for analyzing large amounts of data, known as big data". It was created to be fast, simple to use, and flexible. It is written in Scala", a programming language that can be run on the Java" Virtual Machine.

The Spark community, made up of developers, has developed numerous libraries and extensions for tasks like machine learning", stream processing, and graph processing. Spark can operate on its own cluster, in the cloud, or on top of a Hadoop" cluster. Its ability to process data in memory rather than on disk sets it apart from other big data" processing engines and makes it faster.

RDD

An RDD" (Resilient Distributed Dataset) is a distributed collection of data in Apache Spark" that is designed to be fault-tolerant and immutable. RDDs are the fundamental data structures of Spark and are used to perform distributed operations on data. RDDs are immutable, which means that once they are created, they cannot be modified. Instead, any operations on an RDD create a new RDD".

RDDs are fault-tolerant because they can recover from failures by recomputing lost data based on their lineage, or the sequence of transformations that were applied to create them. This allows Spark" to recover from failures without losing data or having to restart from the beginning.

RDDs can be created from data in external storage systems, such as HDFS", or by transforming existing RDDs using operations such as map, filter, and reduce. They can also be persisted in memory or on disk for faster access. RDDs are a key part of the Spark API and are widely used in Spark applications for distributed data processing and analytics.

What Is DataFrame And DataSet?

In Apache Spark", a DataFrame" is a large, distributed collection of data organized into rows and columns, similar to a table in a traditional database or a data frame in R or Python". It can be created from various sources, like an RDD", a Hive" table, or a data source.

A Dataset is a specific type of DataFrame" that is strongly typed and can be used with functional programming in Scala and Java". It offers extra functionality compared to a DataFrame", such as the ability to use functional transformations to operate on the data.

Both DataFrames and Datasets can be used for various data processing and analytics in Spark, including filtering, grouping, aggregating, and joining data. They are a key part of the Spark API and are commonly used in Spark applications.

Spark Create DataFrame From Existing RDD

As the first example we will consider the creation of DataFrame" from existing RDD" object. If you are interested more read this article about: How to create RDD using parallelize() method?

I will just do this and create firstly RDD" and then DataFrame".

  val data = Seq(("Paul", 30), ("Tom", 34), ("George", 55))
  val dataAsRDD = spark.sparkContext.parallelize(data)

 // Let's add the strong typing to variable:
 // val dataAsRDD: RDD[(String, Int)] = spark.sparkContext.parallelize(data) 

Spark Create DataFrame Using toDF() Method

In the fist example we will use the toDF() function to Spark create DataFrame" from existing RDD". In other words you can say: how to convert existing RDD" to DataFrame"?

Firstly we must import spark.implicits._ which is (Scala-specific) Implicit methods available in Scala for converting common Scala objects into DataFrames.

  import spark.implicits._
  val toDfResults = dataAsRDD.toDF()
  toDfResults.show()
  toDfResults.printSchema()

And the result is:

+------+---+
|    _1| _2|
+------+---+
|  Paul| 30|
|   Tom| 34|
|George| 55|
+------+---+

root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)

See that column names are strange: _1 and _2. We don’t want such names and we can explicitly provide the columns names to toDF() as argument.

  val toDfResultsWithColNames = dataAsRDD.toDF("name", "age")
  toDfResultsWithColNames.show()
  toDfResultsWithColNames.printSchema()

And now the result contains the column" names:

+------+---+
|  name|age|
+------+---+
|  Paul| 30|
|   Tom| 34|
|George| 55|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

Alternatively you can create the Seq() of columns which you will provide as argument to toDF() function. The result will be the same.

  val columns = Seq("name", "age")
  val toDfResultsWithColNames = dataAsRDD.toDF(columns: _*)
  toDfResultsWithColNames.show()
  toDfResultsWithColNames.printSchema()

Spark Create DataFrame Using createDataFrame() Method From SparkSession

Since Spark 2.0.0 you can use the createDataFrame() method from SparkSession" class which takes object as parameter – in our case it’s RDD" and create DataFrame". In other words creates a DataFrame" from an RDD" of Product (e.g. case classes, tuples).

val createDataFrameResults = spark.createDataFrame(dataAsRDD).toDF(columns: _*)

Spark Create DataFrame With Schema

createDataFrame() function allows you to provide the schema which you would like to apply to data. As in previous example we will convert RDD" to DataFrame", but in this case we will also provide the schema.

To create the schema in Spark we will use the StructType class which object can be constructed by StructType(fields: Seq[StructField])

For a StructType object, one or multiple StructFields can be extracted by names. If multiple StructFields are extracted, a StructType object will be returned. If a provided name does not have a matching field, it will be ignored. For the case of extracting a single StructField, a null will be returned.

But before we have to convert the existing object of RDD[T] to RDD[Row], because there is no function to convert from RDD[T] to DataFrame". And it’s obviously and logically correct.

  val schema = StructType(Array(
    StructField("name", StringType, false),
    StructField("age", IntegerType, true)
  ))
  val rowRDD: RDD[Row] = dataAsRDD.map(element => Row(element._1, element._2))
  val createDataFrameWithSchema = spark.createDataFrame(rowRDD, schema)
  createDataFrameWithSchema.printSchema()

And the results is:

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = true)

Alternatively you recreate te DataFrame" from existing DataFrame". It’s very useful when you would like to break DAG lineage.

  println("Recreate DataFrame to break DAG lineage")
  val recreateDataFrame = spark.createDataFrame(createDataFrameWithSchema.rdd, schema)

Spark Create DataFrame From Files External Sources

Since Spark 2.0.0 we have multiple built-in functions to read data from multiple external data sources like: JSON", CSV, XML" TEXT file, RDBMS and event Kafka", Cassandra, Snowflake and more and more.

We will go through multiple external data sources and I will show you how to create DataFrame" from all of them! In all cases under the spark variable is object of SparkSession class.

Spark Create DataFrame from JSON File

Loads a JSON" file (one item per line), resulting in a DataFrame". To identify the schema, it traverses the full dataset once.

val dfFromJson =  spark.read.json("path/to/dataAs.json")

Spark Create DataFrame from CSV File

Loads CSV files and delivers the results in the form of a DataFrame". If inferSchema is enabled, this function will run over the input once to identify the input schema. To avoid going through all of the data at once, deactivate the inferSchema option or explicitly provide the schema using schema.
The CSV-specific settings for reading CSV" files may be found under the Data Source Option in the version you use.

See also article how to read data from CSV efficiently using Spark!

val dfFromCSV =  spark.read.csv("path/to/dataAs.csv")

Spark Create DataFrame from XML File

To read XML" file you need to use external library which will allow you to use SparkSession" object to read such data.

Maven

<!-- https://mvnrepository.com/artifact/com.databricks/spark-xml -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-xml_2.12</artifactId>
    <version>0.15.0</version>
</dependency>

Gradle

// https://mvnrepository.com/artifact/com.databricks/spark-xml
implementation group: 'com.databricks', name: 'spark-xml_2.12', version: '0.15.0'

And then read the XML" data:

val dfFromXml = spark.read
      .format("com.databricks.spark.xml")
      .option("rowTag", "car")
      .xml("path/to/carDataAs.xml")

Spark Create DataFrame from TEXT File

.text() function loads text files and provides a DataFrame" with a schema that includes a string column named “value” and, if present, partitioned columns.

val dfFromText =  spark.read.text("path/to/dataAs.txt")

Optionally you can use the the textFile(), but loads text files and returns a Dataset of String instead of DataFrame".

val dsFromText =  spark.read.textFile("path/to/dataAs.txt")

Spark Create DataFrame From RBDMS External Sources

In Spark you can create DataFrame" from any RDBMS table. Spark does not contains all the required libraries for all the RDBMS types, so before you need to add it to you project.

To read from another RDBMS types you basically need to provide correct driver name and corresponding library must be included in your project. Please find the table with the driver name related to most common external sources

RDBMSDriver Name
MySQL"com.mysql.jdbc.Driver
MariaDB"org.mariadb.jdbc.Driver
Oracle"com.oracle.database.jdbc
Teradata"com.ncr.teradata.TeraDriver
DB2com.ibm.db2.jcc.DB2Driver
Snowflakecom.snowflake.client.jdbc.SnowflakeDriver
Cassandraorg.apache.spark.sql.cassandra

Spark Create DataFrame From MariaDB Table

val dfFromMariaDB = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:mysql://hostname:port/db”)
   .option(“driver”, “org.mariadb.jdbc.Driver”)
   .option(“dbtable”, “your_table_name”) 
   .option(“user”, “your_username”) 
   .option(“password”, “your_password”) 
   .load()

Or you can use the alternative approach:

  val connectionProperties = new Properties()
  connectionProperties.put("user", "your_username")
  connectionProperties.put("password", "your_password")
  connectionProperties.put("driver", "org.mariadb.jdbc.Driver")

  spark.read
    .jdbc("jdbc:mysql://hostname:port/db_name", "your_table_name", connectionProperties)

Full Code

package com.bigdataetl.sparktutorial.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object CreateDataFrame extends App {

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("BigData-ETL.com")
    .getOrCreate()

  val data = Seq(("Paul", 30), ("Tom", 34), ("George", 55))
    val dataAsRDD = spark.sparkContext.parallelize(data)
//  val dataAsRDD: RDD[(String, Int)] = spark.sparkContext.parallelize(data)

  println("Create DataFrame Using toDF() Method")
  import spark.implicits._
  val toDfResults = dataAsRDD.toDF()
  toDfResults.show()
  toDfResults.printSchema()

//  val toDfResultsWithColNames = dataAsRDD.toDF("name", "age")
  val columns = Seq("name", "age")
  val toDfResultsWithColNames = dataAsRDD.toDF(columns: _*)
  toDfResultsWithColNames.show()
  toDfResultsWithColNames.printSchema()


  println("Create DataFrame Using createDataFrame() Method From SparkSession")
  val createDataFrameResults = spark.createDataFrame(dataAsRDD).toDF(columns: _*)
  createDataFrameResults.printSchema()

  println("Create DataFrame With Schema")
  val schema = StructType(Array(
    StructField("name", StringType, false),
    StructField("age", IntegerType, true)
  ))
  val rowRDD: RDD[Row] = dataAsRDD.map(element => Row(element._1, element._2))
  val createDataFrameWithSchema = spark.createDataFrame(rowRDD, schema)
  createDataFrameWithSchema.printSchema()

  println("Recreate DataFrame to break DAG lineage")
  val recreateDataFrame = spark.createDataFrame(createDataFrameWithSchema.rdd, schema)
  recreateDataFrame.printSchema()

  val dfFromCSV =  spark.read.csv("path/to/dataAs.csv")
  val dfFromJson =  spark.read.json("path/to/dataAs.json")
  val dsFromText =  spark.read.textFile("path/to/dataAs.txt")
}

GitLab Repository

As usual, please find the full code on our GitLab repository!

Summary

In this article you had chance to learn how to Spark Create DataFrame" from various sources like files, RDD" or any RDBMS.

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

How useful was this post?

Click on a star to rate it!

Average rating 5 / 5. Vote count: 1

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?