In this short Apache Spark" tutorial" I will show you (Apache Spark Use DataFrame" efficiently during reading data) how to use the DataFrame API to increase the performance of the Spark application, while loading large, semi-structured data sets such as CSV, XML" and JSON".
Table of Contents
Introduction
Apache Spark DataFrame
An Apache Spark" DataFrame is a distributed collection of data organized into rows and columns that is similar to a table in a traditional relational database or a data frame in R or Python". DataFrames are a key data type in Spark, and are used to store and manipulate large amounts of structured data in a distributed manner.
DataFrames can be created from a variety of sources, including structured data files, tables in Hive", and external databases. They can also be created programmatically using APIs in Python", R, Scala", and Java".
DataFrames provide a number of operations and transformations that can be applied to manipulate and analyze the data. These operations include filtering, grouping, aggregating, joining, and sorting, as well as a wide range of built-in functions for data manipulation and analysis.
DataFrames are an important part of the Spark ecosystem, and are widely used in a variety of applications, including data engineering, data analytics, and machine learning". They are designed to be highly scalable and efficient, and are well-suited for large-scale data processing tasks.
PySpark InferSchema
In PySpark", the inferSchema
option is used to specify whether the schema of a DataFrame" should be automatically inferred when reading in data from a file or other data source. By default", this option is set to True
, which means that PySpark" will attempt to infer the schema of the data by analyzing the data itself.
For example, consider the following code that reads in a CSV" file and creates a DataFrame":
df = spark.read.format("csv") \ .option("inferSchema", "True") \ .load("data.csv")
In this case, PySpark" will analyze the data in the data.csv
file and try to infer the data types of each column" based on the values in the file.
The analyze step can be inefficient. That's the reason why we would like to provide schema to not to shift the responsibility to Spark / PySpark and lose performance.
Apache Spark Use DataFrame Efficiently During Reading Data
Define Static Schema of Data
In Spark DataFrame" API, you can define a static data schema. To achieve this, you must provide an object of class StructType that contains a list of StructField. Below I have presented two ways in which the data schema can be defined. (Apache Spark Use DataFrame" efficiently during reading data)
// The first way val schema = new StructType() .add("id", IntegerType, false) .add("Name", StringType, true) .add("Surname", StringType, true) .add("Age", IntegerType, true) // or the second way val schema = StructType.fromDDL ("id integer, name string, surname string, age integer")
#1 Tip: Disable the InferSchema Option And Use a Defined Schema
In the following example, you can find a snippet of code where you used the previously created data schema and shown how to disable the InferSchema option. By disabling the InferSchema option, Spark will not analyze the data set to deduce what the schema of the data is to use only the one that gave it.
// CSV file val data = spark.sqlContext.rad.format("CSV") .option("Header", "false") .option("InferSchema", "false") .schema(schema) .load("/path/to/csv_data") // JSON file val data = spark.read.schema(Schema).json("/path/to/json_data") // XML file val data = spark.read.format("com. databricks. Spark. xml") .options(rowTag = "book").load("/path/to/xml_data")
#2 Tip: Store Schemas Outside Application Code
A good approach is to not store schemas directly in your code. If you change the data schema, you must modify your code, build a new package, and deploy it. In this tutorial", I’ll show you how you can store schemas in files in HDFS" and the Hive" table. (Apache Spark Use DataFrame" efficiently during reading data)
Loading a Schema From a File on HDFS
// Load schema from file on HDFS val schemaAsTextFromFile = spark.sparkContext.textFile("/data/schemas/schema_from_hdfs.txt").toLocalIterator.toList(0) val schema = StructType.fromDDL (schemaAsTextFromFile)
Loading a Schema From a Table Into a Hive
First, we create a new database" and tables in which we will store their schemas. (Apache Spark Use DataFrame" efficiently during reading data)
CREATE DATABASE bigdata_etl; CREATE TABLE bigdata_etl.schemas ( Id int, dataset _name String, schema string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.orcSerde ';
Insert a row into the Hive" table with the specified data schema.
--Insert into table INSERT INTO bigdata_etl. Schemas (ID, dataset_name, Schema) VALUES (1, Test _dataset ', id integer, name string, surname string, age integer ');
Now in the application Spark code we can load the data from the table with Hive", then filter the interesting line containing the appropriate schema of the data and based on it create object class StrucType, which we can then use when loading Data. (Apache Spark Use DataFrame" efficiently during reading data)
val schemasFromHive = sqlcontext.table ("Bigdata_etl. Schemas") val DataSetSchema = schemasFromHive.filter(schemasFromHive ("dataset_name") = = = "Test_dataset").select ("Schema").head.mkString val schema = Structtype.fromDDL (DataSetSchema)
Hint #3: Use External Tables In Hive External Tables
You can also use the external table in Hive" to improve the execution time of the Spark application when reading data from files. The following example creates an outer table in Hive". The location parameter is key and determines where to HDFS" the data in the CSV format (in this example). Because the schema is defined in the table in Hive", Spark will not attempt to infer the schema (infer schema) from the files stored in that location. (Apache Spark Use DataFrame" efficiently during reading data)
CREATE EXTERNAL TABLE 'bigdata _etl. Test_dataset ' ( Id ' int, Name ' String, 'surname ' String, 'age ' int ) ROW FORMAT Delimited Fields terminated BY ', ' Stored AS textfile Location '/path/to/csv_data ';
Then, in the code in Spark, you can just read the data from that table.
val TestDataSetDF = spark.sqlContext.table("Bigdata_etl.Test_dataset")
Summary
In this tutorial", I’ve presented you how you can improve your code when you use the DataFrame" API. In this tutorial", I took some designs from a blog belonging to my friend, where a similar concept was presented to people who use the Python" API. Here you will find a reference to this entry (https://szczeles.github.io/Reading-JSON-CSV-and-XML-files-efficiently-in-Apache-Spark/). (Apache Spark Use DataFrame" efficiently during reading data)
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!