You are currently viewing Spark Create DataFrame From RDD, File And RDBMS – 4 Data Sources
Could You Please Share This Post? I Appreciate It And Thank YOU! :) Have A Nice Day!
5
(1)

In Spark, the createDataFrame() and toDF() methods are used to manually build in Spark to Create a DataFrame. You can using Spark Create DataFrame from:

  • already existing RDD, DataFrame or DataSet
  • an RDD of Product (e.g. case classes, tuples)
  • a local Seq of Product
  • an RDD containing Rows using the given schema
  • a JavaRDD containing Rows using the given schema
  • from a java.util.List containing Rows using the given schema

I this tutorial I will presents you some examples how to create DataFrame in Spark.

What Is DataFrame And DataSet?

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset to represent a DataFrame.

https://spark.apache.org/docs/latest/sql-programming-guide.html

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (mapflatMapfilter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

https://spark.apache.org/docs/latest/sql-programming-guide.html

Spark Create DataFrame From Existing RDD

As the first example we will consider the creation of DataFrame from existing RDD object. If you are interested more read this article about: How to create RDD using parallelize() method?

I will just do this and create firstly RDD and then DataFrame.

  val data = Seq(("Paul", 30), ("Tom", 34), ("George", 55))
  val dataAsRDD = spark.sparkContext.parallelize(data)

 // Let's add the strong typing to variable:
 // val dataAsRDD: RDD[(String, Int)] = spark.sparkContext.parallelize(data) 

Spark Create DataFrame Using toDF() Method

In the fist example we will use the toDF() function to Spark create DataFrame from existing RDD. In other words you can say: how to convert existing RDD to DataFrame?

Firstly we must import spark.implicits._ which is (Scala-specific) Implicit methods available in Scala for converting common Scala objects into DataFrames.

  import spark.implicits._
  val toDfResults = dataAsRDD.toDF()
  toDfResults.show()
  toDfResults.printSchema()

And the result is:

+------+---+
|    _1| _2|
+------+---+
|  Paul| 30|
|   Tom| 34|
|George| 55|
+------+---+

root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)

See that column names are strange: _1 and _2. We don’t want such names and we can explicitly provide the columns names to toDF() as argument.

  val toDfResultsWithColNames = dataAsRDD.toDF("name", "age")
  toDfResultsWithColNames.show()
  toDfResultsWithColNames.printSchema()

And now the result contains the column names:

+------+---+
|  name|age|
+------+---+
|  Paul| 30|
|   Tom| 34|
|George| 55|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

Alternatively you can create the Seq() of columns which you will provide as argument to toDF() function. The result will be the same.

  val columns = Seq("name", "age")
  val toDfResultsWithColNames = dataAsRDD.toDF(columns: _*)
  toDfResultsWithColNames.show()
  toDfResultsWithColNames.printSchema()

Spark Create DataFrame Using createDataFrame() Method From SparkSession

Since Spark 2.0.0 you can use the createDataFrame() method from SparkSession class which takes object as parameter – in our case it’s RDD and create DataFrame. In other words creates a DataFrame from an RDD of Product (e.g. case classes, tuples).

val createDataFrameResults = spark.createDataFrame(dataAsRDD).toDF(columns: _*)

Spark Create DataFrame With Schema

createDataFrame() function allows you to provide the schema which you would like to apply to data. As in previous example we will convert RDD to DataFrame, but in this case we will also provide the schema.

To create the schema in Spark we will use the StructType class which object can be constructed by StructType(fields: Seq[StructField])

For a StructType object, one or multiple StructFields can be extracted by names. If multiple StructFields are extracted, a StructType object will be returned. If a provided name does not have a matching field, it will be ignored. For the case of extracting a single StructField, a null will be returned.

But before we have to convert the existing object of RDD[T] to RDD[Row], because there is no function to convert from RDD[T] to DataFrame. And it’s obviously and logically correct.

  val schema = StructType(Array(
    StructField("name", StringType, false),
    StructField("age", IntegerType, true)
  ))
  val rowRDD: RDD[Row] = dataAsRDD.map(element => Row(element._1, element._2))
  val createDataFrameWithSchema = spark.createDataFrame(rowRDD, schema)
  createDataFrameWithSchema.printSchema()

And the results is:

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = true)

Alternatively you recreate te DataFrame from existing DataFrame. It’s very useful when you would like to break DAG lineage.

  println("Recreate DataFrame to break DAG lineage")
  val recreateDataFrame = spark.createDataFrame(createDataFrameWithSchema.rdd, schema)

Spark Create DataFrame From Files External Sources

Since Spark 2.0.0 we have multiple built-in functions to read data from multiple external data sources like: JSON, CSV, XML TEXT file, RDBMS and event Kafka, Cassandra, Snowflake and more and more.

We will go through multiple external data sources and I will show you how to create DataFrame from all of them! In all cases under the spark variable is object of SparkSession class.

Spark Create DataFrame from JSON File

Loads a JSON file (one item per line), resulting in a DataFrame. To identify the schema, it traverses the full dataset once.

val dfFromJson =  spark.read.json("path/to/dataAs.json")

Spark Create DataFrame from CSV File

Loads CSV files and delivers the results in the form of a DataFrame. If inferSchema is enabled, this function will run over the input once to identify the input schema. To avoid going through all of the data at once, deactivate the inferSchema option or explicitly provide the schema using schema.
The CSV-specific settings for reading CSV files may be found under the Data Source Option in the version you use.

See also article how to read data from CSV efficiently using Spark!

val dfFromCSV =  spark.read.csv("path/to/dataAs.csv")

Spark Create DataFrame from XML File

To read XML file you need to use external library which will allow you to use SparkSession object to read such data.

Maven

<!-- https://mvnrepository.com/artifact/com.databricks/spark-xml -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-xml_2.12</artifactId>
    <version>0.15.0</version>
</dependency>

Gradle

// https://mvnrepository.com/artifact/com.databricks/spark-xml
implementation group: 'com.databricks', name: 'spark-xml_2.12', version: '0.15.0'

And then read the XML data:

val dfFromXml = spark.read
      .format("com.databricks.spark.xml")
      .option("rowTag", "car")
      .xml("path/to/carDataAs.xml")

Spark Create DataFrame from TEXT File

.text() function loads text files and provides a DataFrame with a schema that includes a string column named “value” and, if present, partitioned columns.

val dfFromText =  spark.read.text("path/to/dataAs.txt")

Optionally you can use the the textFile(), but loads text files and returns a Dataset of String instead of DataFrame.

val dsFromText =  spark.read.textFile("path/to/dataAs.txt")

Spark Create DataFrame From RBDMS External Sources

In Spark you can create DataFrame from any RDBMS table. Spark does not contains all the required libraries for all the RDBMS types, so before you need to add it to you project.

To read from another RDBMS types you basically need to provide correct driver name and corresponding library must be included in your project. Please find the table with the driver name related to most common external sources

RDBMSDriver Name
MySQLcom.mysql.jdbc.Driver
MariaDBorg.mariadb.jdbc.Driver
Oraclecom.oracle.database.jdbc
Teradatacom.ncr.teradata.TeraDriver
DB2com.ibm.db2.jcc.DB2Driver
Snowflakecom.snowflake.client.jdbc.SnowflakeDriver
Cassandraorg.apache.spark.sql.cassandra

Spark Create DataFrame From MariaDB Table

val dfFromMariaDB = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:mysql://hostname:port/db”)
   .option(“driver”, “org.mariadb.jdbc.Driver”)
   .option(“dbtable”, “your_table_name”) 
   .option(“user”, “your_username”) 
   .option(“password”, “your_password”) 
   .load()

Or you can use the alternative approach:

  val connectionProperties = new Properties()
  connectionProperties.put("user", "your_username")
  connectionProperties.put("password", "your_password")
  connectionProperties.put("driver", "org.mariadb.jdbc.Driver")

  spark.read
    .jdbc("jdbc:mysql://hostname:port/db_name", "your_table_name", connectionProperties)

Full Code

package com.bigdataetl.sparktutorial.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object CreateDataFrame extends App {

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("BigData-ETL.com")
    .getOrCreate()

  val data = Seq(("Paul", 30), ("Tom", 34), ("George", 55))
    val dataAsRDD = spark.sparkContext.parallelize(data)
//  val dataAsRDD: RDD[(String, Int)] = spark.sparkContext.parallelize(data)

  println("Create DataFrame Using toDF() Method")
  import spark.implicits._
  val toDfResults = dataAsRDD.toDF()
  toDfResults.show()
  toDfResults.printSchema()

//  val toDfResultsWithColNames = dataAsRDD.toDF("name", "age")
  val columns = Seq("name", "age")
  val toDfResultsWithColNames = dataAsRDD.toDF(columns: _*)
  toDfResultsWithColNames.show()
  toDfResultsWithColNames.printSchema()


  println("Create DataFrame Using createDataFrame() Method From SparkSession")
  val createDataFrameResults = spark.createDataFrame(dataAsRDD).toDF(columns: _*)
  createDataFrameResults.printSchema()

  println("Create DataFrame With Schema")
  val schema = StructType(Array(
    StructField("name", StringType, false),
    StructField("age", IntegerType, true)
  ))
  val rowRDD: RDD[Row] = dataAsRDD.map(element => Row(element._1, element._2))
  val createDataFrameWithSchema = spark.createDataFrame(rowRDD, schema)
  createDataFrameWithSchema.printSchema()

  println("Recreate DataFrame to break DAG lineage")
  val recreateDataFrame = spark.createDataFrame(createDataFrameWithSchema.rdd, schema)
  recreateDataFrame.printSchema()

  val dfFromCSV =  spark.read.csv("path/to/dataAs.csv")
  val dfFromJson =  spark.read.json("path/to/dataAs.json")
  val dsFromText =  spark.read.textFile("path/to/dataAs.txt")
}

GitLab Repository

As usual, please find the full code on our GitLab repository!

Summary

In this article you had chance to learn how to Spark Create DataFrame from various sources like files, RDD or any RDBMS.

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

BigData-ETL: image 7
YOU MIGHT ALSO LIKE

How useful was this post?

Click on a star to rate it!

Average rating 5 / 5. Vote count: 1

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?