How to Apache Spark Break DAG lineage – Do you know these 3 cool methods?

You are currently viewing How to Apache Spark Break DAG lineage – Do you know these 3 cool methods?
Photo by Ayla Verschueren on Unsplash
Share This Post, Help Others, And Earn My Heartfelt Appreciation! :)
4.9
(1034)

In this post, I will introduce you to 3 methods how to Apache Spark Break DAG lineage. It’s very possible that 1 of them you weren’t even aware of! Check if you know all 3 methods which, depending on the conditions and requirements, can save you a lot of time!

What is DAG?

Apache Spark Break DAG lineage: (Directed Acyclic Graph) DAG in Apache Spark is a visual representation in the form of a graph of how our spark job will be executed. It consists of two main shapes:

  • Edges – operations which will be executed on RDD
  • Vertices – represents RDDs

Apache Spark Break DAG Lineage

DAG lineage is the sequence of these operations (edges) on RDD. When you call any Spark Action the DAG is taken by DAG Scheduler and as a result the stages and tasks are created.

Why do we need to break DAG Lineage?

Sometimes the DAG is so big that it doesn’t fit in the memory anymore, or the number of stages and tasks is very large, which causes that we may get an error:

java.lang.StackOverflowError

or 

java.lang.OutOfMemoryError

Where to see the DAG graph?

To see the DAG lineage, it is best to go to Spark UI, where in the jobs section you can click on the link: DAG Visualization.

Apache Spark: Break DAG lineage - Please check the 3 methods!
How to Apache Spark Break DAG lineage

How do break DAG Lineage?

You can use the following three methods to break DAG lineage. Each of them has its pros and cons. Which one you choose and which works best for you can often result from the environmental conditions (e.g. cluster) in which you perform your Spark jobs.

#1: Checkpoint

The first method is checkpoint. When you run your jobs on Hadoop cluster the during checkpoint the DataFrame will be stored on HDFS. In the event that an executor is killed during processing, the data will be taken from HDFS by another executor. It’s fault tolerant.

val dfWithBrokenLineage = previousDf.checkpoint()

Remember that when the cluster you are running your jobs on has a slow network, consider using another method. Additionally, remember to reduce the number of shuffle partitions before the checkpoint, as a large number of them will result in many small files on the HDFS, which may make the checkpoint operation slow as NameNode can be a bottleneck. (Apache Spark Break DAG lineage)

Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext#setCheckpointDir.
Since: 2.1.0

#2: LocalCheckpoint

It works like a checkpoint, but the data is not transferred to HDFS, but saved to the executor’s local filesystem. In the event that an executor is killed during processing, the data will also be lost and it may be that Spark will not be able to recreate this DataFrame from the DAG.

val dfWithBrokenLineage = previousDf.localCheckpoint()

Locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. Local checkpoints are written to executor storage and despite potentially faster they are unreliable and may compromise job completion.

Since 2.3.0

#3: ReCreate DataFrame / DataSet

The third option is to simply create a new DataFrame / DataSet based on the current one. Thanks to this, we are able to break the DAG lineage and by the way we do not cause any traffic on the network and we do not burden HDFS and NameNodes. From performance perspective I didn’t notice too much influence on time. (How to Apache Spark Break DAG lineage)

// For DataFrame
val dfWithBrokenLineage = session.createDataFrame(previousDf.rdd, previousDf.schema)

// For DataSet
val dsWithBrokenLineage = session.createDataset(previousDs.rdd)

:: DeveloperApi :: Creates a DataFrame from an RDD containing Rows using the given schema. It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. ::

Experimental :: Creates a Dataset from an RDD of a given type. This method requires an encoder (to convert a JVM object of type T to and from the internal Spark SQL representation) that is generally created automatically through implicits from a SparkSession, or can be created explicitly by calling static methods on Encoders

How to Apache Spark Break DAG lineage

That’s all about topic: Apache Spark Break DAG lineage. Enjoy!

Apache Spark Databricks

If you are more interesting in Spark I recommend you to read about Apache Spark Databricks.

Big Data Analytics, spark checkpoint localCheckpoint, spark long lineage issue

If you enjoyed this post please add the comment below and share this post on your Facebook, Twitter, LinkedIn or another social media webpage.
Thanks in advanced!

How useful was this post?

Click on a star to rate it!

Average rating 4.9 / 5. Vote count: 1034

No votes so far! Be the first to rate this post.

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments