You are currently viewing Read Multiple Text Files Into Single RDD By Spark – 4 Cool Examples!
Could You Please Share This Post? I Appreciate It And Thank YOU! :) Have A Nice Day!
4
(1)

Using the textFile() and wholeTextFiles() methods provided by Spark core’s SparkContext class, we may read multiple text files into Single RDD by Spark as well as CSV files and also read all files in a directory and files matching a specified pattern.

Spark Methods

As I described above we can use these two methods: textFile() and wholeTextFiles() which comes from SparkContext implementation.

textFile() Method

textFile() – An RDD of Strings is returned after reading a text file from HDFS, a local file system (accessible on all nodes), or any Hadoop-supported file system URI. The text files need to be UTF-8 encoded.

Params:

Returns:

  • RDD of lines of the text file
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

wholeTextFiles() Method

wholeTextFiles() – Read a collection of text files from HDFS, a local file system (accessible on all nodes), or any other file system URI that is supported by Hadoop. Every file is read as a single record and returned as a key-value pair, with the path of each file serving as the key and the content of each file serving as the value. The text files need to be UTF-8 encoded.

Params:

  • path – Directory to the input data files, the path can be comma separated paths as the list of inputs.
  • minPartitions – A suggestion value of the minimal splitting number for input data.

Returns:

  • RDD representing tuples of file path and the corresponding file content
  def wholeTextFiles(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
    assertNotStopped()
    val job = NewHadoopJob.getInstance(hadoopConfiguration)
    // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
    // comma separated files as input. (see SPARK-7155)
    NewFileInputFormat.setInputPaths(job, path)
    val updateConf = job.getConfiguration
    new WholeTextFileRDD(
      this,
      classOf[WholeTextFileInputFormat],
      classOf[Text],
      classOf[Text],
      updateConf,
      minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
  }

Example Data

Before we start showing examples we need to prepare some test data. For this post we created the TXT and CSV files. Txt file are in sub directories to presents more variations how data can be read in Spark using textFile() and wholeTextFiles() methods.

The example data can be found on our GitLab repository!

Read Multiple Text Files Into Single RDD by Spark

Spark Read All Text Files From A Directory Into A Single RDD

By passing the path of the directory to the textFile() function in Spark, all text files are read and a single RDD is created. If you will provide wrong path or there will not be any files in the path you will get the error:

Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input Pattern file:.../bigdata-etl/apache-spark/spark-tutorials/src/main/resources/ReadMultipleTextFiles/txt/subDirX/* matches 0 files
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)

Ok, let’s now read some data from TXT files under subDir1:

Please keep in mind that on localhost when the executor is on the same machine as driver you don't have to call the collect() method before foreach. 

Writing cluster app you must execute the same code with collect() before foreach: dataAsRDD.collect().foreach(data => println(data))
  val dataAsRDD = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/*")
  dataAsRDD.foreach(data => println(data))

In produces the following output:

txt_data_1,1,subDir1
txt_data_2,2,subDir1
txt_data_3,3,subDir1

No we will read the same data using the wholeTextFiles() method. This methods returns the RDD[Tuple2] object where:

  • _1 in tuple is the file name
  • _2 in tuple is the data (content)
  val dataAsRDDByWhole = spark.sparkContext.wholeTextFiles("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*")
  dataAsRDDByWhole.foreach(f => {
    println(s"In file:[${f._1}], found data:[${f._2}]")
  })

In produces the following output:

In file:[file:src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_003.txt], found data:[txt_data_3,3,subDir1]
In file:[file:src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_001.txt], found data:[txt_data_1,1,subDir1]
In file:[file:src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_002.txt], found data:[txt_data_2,2,subDir1]

Spark Read Several Text Files Into A Single RDD

If you exactly know the files name which you want to read, just enter them all with a comma separator and Spark will read multiple text files into a single RDD.

  val dataAsRDD2 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_001.txt,src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_002.txt")
  dataAsRDD2.foreach(data => println(data))

In produces the following output:

txt_data_1,1,subDir1
txt_data_2,2,subDir1

Spark Read All Files Which Match The Pattern To Single RDD

As in the previous examples you can use the textFile() method and put the wildcard pattern as input files which must be read.

  val dataAsRDD3 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_*.txt")
  dataAsRDD3.foreach(data => println(data))

In produces the following output:

txt_data_2,2,subDir1
txt_data_1,1,subDir1
txt_data_3,3,subDir1

Spark Read Files From Multiple Directories

Based on the knowledge from previous examples we can easily adopt the textFile() method to read data from multiple directories. To do this we can use the wildcard (*) and provide multiple comma separated paths which point to specific directories.

  val dataAsRDD4 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*,src/main/resources/ReadMultipleTextFiles/txt/subDir2/*")
  dataAsRDD4.foreach(data => println(data))

In produces the following output:

txt_data_2,2,subDir2
txt_data_3,3,subDir1
txt_data_2,2,subDir1
txt_data_3,3,subDir2
txt_data_1,1,subDir1
txt_data_1,1,subDir2

Full Code

package com.bigdataetl.sparktutorial.rdd

import org.apache.spark.sql.SparkSession

object ReadMultipleTextFiles extends App {

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("BigData-ETL.com")
    .getOrCreate()

  println("Spark Read All Text Files From A Directory Into A Single RDD")
  val dataAsRDD = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*")
  dataAsRDD.foreach(data => println(data))


  println("Read the same data using the wholeTextFiles() method. This methods returns the RDD[Tuple2] object")
  val dataAsRDDByWhole = spark.sparkContext.wholeTextFiles("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*")
  dataAsRDDByWhole.foreach(f => {
    println(s"In file:[${f._1}], found data:[${f._2}]")
  })

  println("Spark Read Several Text Files Into A Single RDD")
  val dataAsRDD2 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_001.txt,src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_002.txt")
  dataAsRDD2.foreach(data => println(data))

  println("Spark Read All Files Which Match The Pattern To Single RDD")
  val dataAsRDD3 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/file_*.txt")
  dataAsRDD3.foreach(data => println(data))


  println("Spark Read Files From Multiple Directories")
  val dataAsRDD4 = spark.sparkContext.textFile("src/main/resources/ReadMultipleTextFiles/txt/subDir1/*,src/main/resources/ReadMultipleTextFiles/txt/subDir2/*")
  dataAsRDD4.foreach(data => println(data))
}

GitLab Repository

As usual, please find the full code on our GitLab repository!

Summary

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

BigData-ETL: image 7
YOU MIGHT ALSO LIKE

How useful was this post?

Click on a star to rate it!

Average rating 4 / 5. Vote count: 1

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?