Read Multiple Text Files Into Single RDD By Spark – 4 Cool Examples!

Read Multiple Text Files Into Single RDD By Spark
Share this post and Earn Free Points!

Using the textFile() and wholeTextFiles() methods provided by Spark" core’s SparkContext" class, we may read multiple text files into Single RDD by Spark as well as CSV files and also read all files in a directory and files matching a specified pattern.

Introduction

Apache Spark" is a powerful open-source data processing engine for big data" workloads. One of the common tasks in Spark" is reading data from various sources, such as file systems (e.g. HDFS, S3, local file system), databases, and cloud storage.

To read a file in Spark, you can use the SparkSession.read method, which returns a DataFrameReader object. This object provides several methods for reading data in different formats, such as CSV", JSON", and text files.

Here is an example of how to read a CSV file in Spark:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("csv").option("header", "true").load("path/to/file.csv")

This will create a DataFrame" df that contains the data from the CSV file. The format method specifies the file format, which in this case is CSV. The option method is used to set options for the reader, such as whether the file has a header row. Finally, the load method is used to load the file into a DataFrame.

You can also read data from other file formats, such as JSON" and text files, using similar methods. For example, to read a JSON" file, you can use the json format:

val df = spark.read.format("json").load("path/to/file.json")

And to read a text file, you can use the text format:

val df = spark.read.format("text").load("path/to/file.txt")

You can find more information about reading data in Spark in the official documentation: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

Spark Methods

As I described above we can use these two methods: textFile() and wholeTextFiles() which comes from SparkContext" implementation.

textFile() Method

textFile() – An RDD" of Strings is returned after reading a text file from HDFS, a local file system (accessible on all nodes), or any Hadoop-supported file system URI. The text files need to be UTF-8 encoded.

Params:

  • path – path to the text file on a supported file system
  • minPartitions – suggested minimum number of partitions for the resulting RDD"

Returns:

  • RDD" of lines of the text file
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

wholeTextFiles() Method

wholeTextFiles()Read a collection of text files from HDFS", a local file system (accessible on all nodes), or any other file system URI that is supported by Hadoop". Every file is read as a single record and returned as a key-value pair, with the path of each file serving as the key and the content of each file serving as the value. The text files need to be UTF-8 encoded.

Params:

  • path – Directory to the input data files, the path can be comma separated paths as the list of inputs.
  • minPartitions – A suggestion value of the minimal splitting number for input data.

Returns:

  • RDD representing tuples of file path and the corresponding file content
  def wholeTextFiles(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
    assertNotStopped()
    val job = NewHadoopJob.getInstance(hadoopConfiguration)
    // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
    // comma separated files as input. (see SPARK-7155)
    NewFileInputFormat.setInputPaths(job, path)
    val updateConf = job.getConfiguration
    new WholeTextFileRDD(
      this,
      classOf[WholeTextFileInputFormat],
      classOf[Text],
      classOf[Text],
      updateConf,
      minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
  }

Example Data

Before we start showing examples we need to prepare some test data. For this post we created the TXT and CSV files. Txt file are in sub directories to presents more variations how data can be read in Spark using textFile() and wholeTextFiles() methods.

The example data can be found on our GitLab repository!

Read Multiple Text Files Into Single RDD by Spark

Spark Read All Text Files From A Directory Into A Single RDD

By passing the path of the directory to the textFile() function in Spark", all text files are read and a single RDD" is created. If you will provide wrong path or there will not be any files in the path you will get the error:

Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input Pattern file:.../bigdata-etl/apache-spark/spark-tutorials/src/main/resources/ReadMultipleTextFiles/txt/subDirX/* matches 0 files
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)

Ok, let’s now read some data from TXT files under subDir1:

Please keep in mind that on localhost when the executor is on the same machine as driver you don't have to call the collect() method before foreach. 

Writing cluster app you must execute the same code with collect() before foreach: dataAsRDD.collect().foreach(data => println(data))
  val dataAsRDD = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/*")
  dataAsRDD.foreach(data => println(data))

In produces the following output:

txt_data_1,1,subDir1
txt_data_2,2,subDir1
txt_data_3,3,subDir1

No we will read the same data using the wholeTextFiles() method. This methods returns the RDD[Tuple2] object where:

  • _1 in tuple is the file name
  • _2 in tuple is the data (content)
  val dataAsRDDByWhole = spark.sparkContext.wholeTextFiles("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*")
  dataAsRDDByWhole.foreach(f => {
    println(s"In file:[${f._1}], found data:[${f._2}]")
  })

In produces the following output:

In file:[file:src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_003.txt], found data:[txt_data_3,3,subDir1]
In file:[file:src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_001.txt], found data:[txt_data_1,1,subDir1]
In file:[file:src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_002.txt], found data:[txt_data_2,2,subDir1]

Spark Read Several Text Files Into A Single RDD

If you exactly know the files name which you want to read, just enter them all with a comma separator and Spark will read multiple text files into a single RDD".

  val dataAsRDD2 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_001.txt,src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_002.txt")
  dataAsRDD2.foreach(data => println(data))

In produces the following output:

txt_data_1,1,subDir1
txt_data_2,2,subDir1

Spark Read All Files Which Match The Pattern To Single RDD

As in the previous examples you can use the textFile() method and put the wildcard pattern as input files which must be read.

  val dataAsRDD3 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_*.txt")
  dataAsRDD3.foreach(data => println(data))

In produces the following output:

txt_data_2,2,subDir1
txt_data_1,1,subDir1
txt_data_3,3,subDir1

Spark Read Files From Multiple Directories

Based on the knowledge from previous examples we can easily adopt the textFile() method to read data from multiple directories. To do this we can use the wildcard (*) and provide multiple comma separated paths which point to specific directories.

  val dataAsRDD4 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*,src/main/resources/ReadMultipleTextFiles/txt/subDir2/*")
  dataAsRDD4.foreach(data => println(data))

In produces the following output:

txt_data_2,2,subDir2
txt_data_3,3,subDir1
txt_data_2,2,subDir1
txt_data_3,3,subDir2
txt_data_1,1,subDir1
txt_data_1,1,subDir2

Full Code

package com.bigdataetl.sparktutorial.rdd

import org.apache.spark.sql.SparkSession

object ReadMultipleTextFiles extends App {

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("BigData-ETL.com")
    .getOrCreate()

  println("Spark Read All Text Files From A Directory Into A Single RDD")
  val dataAsRDD = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*")
  dataAsRDD.foreach(data => println(data))


  println("Read the same data using the wholeTextFiles() method. This methods returns the RDD[Tuple2] object")
  val dataAsRDDByWhole = spark.sparkContext.wholeTextFiles("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*")
  dataAsRDDByWhole.foreach(f => {
    println(s"In file:[${f._1}], found data:[${f._2}]")
  })

  println("Spark Read Several Text Files Into A Single RDD")
  val dataAsRDD2 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_001.txt,src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_002.txt")
  dataAsRDD2.foreach(data => println(data))

  println("Spark Read All Files Which Match The Pattern To Single RDD")
  val dataAsRDD3 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_*.txt")
  dataAsRDD3.foreach(data => println(data))


  println("Spark Read Files From Multiple Directories")
  val dataAsRDD4 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*,src/main/resources/ReadMultipleTextFiles/txt/subDir2/*")
  dataAsRDD4.foreach(data => println(data))
}

GitLab Repository

As usual, please find the full code on our GitLab repository!

Summary

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

How useful was this post?

Click on a star to rate it!

Average rating 4.5 / 5. Vote count: 2

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?