In this post, I will introduce you to 3 methods how to Apache Spark Break DAG lineage. It’s very possible that 1 of them you weren’t even aware of! Check if you know all 3 methods which, depending on the conditions and requirements, can save you a lot of time!
What is DAG?
Apache Spark Break DAG lineage: (Directed Acyclic Graph) DAG in Apache Spark is a visual representation in the form of a graph of how our spark job will be executed. It consists of two main shapes:
Apache Spark Break DAG Lineage
DAG lineage is the sequence of these operations (edges) on RDD. When you call any Spark Action the DAG is taken by DAG Scheduler and as a result the stages and tasks are created.
Why do we need to break DAG Lineage?
Sometimes the DAG is so big that it doesn’t fit in the memory anymore, or the number of stages and tasks is very large, which causes that we may get an error:
java.lang.StackOverflowError or java.lang.OutOfMemoryError
Where to see the DAG graph?
To see the DAG lineage, it is best to go to Spark UI, where in the jobs section you can click on the link: DAG Visualization.
How do break DAG Lineage?
You can use the following three methods to break DAG lineage. Each of them has its pros and cons. Which one you choose and which works best for you can often result from the environmental conditions (e.g. cluster) in which you perform your Spark jobs.
The first method is checkpoint. When you run your jobs on Hadoop cluster the during checkpoint the DataFrame will be stored on HDFS. In the event that an executor is killed during processing, the data will be taken from HDFS by another executor. It’s fault tolerant.
val dfWithBrokenLineage = previousDf.checkpoint()
Remember that when the cluster you are running your jobs on has a slow network, consider using another method. Additionally, remember to reduce the number of shuffle partitions before the checkpoint, as a large number of them will result in many small files on the HDFS, which may make the checkpoint operation slow as NameNode can be a bottleneck. (Apache Spark Break DAG lineage)
Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext#setCheckpointDir.
It works like a checkpoint, but the data is not transferred to HDFS, but saved to the executor’s local filesystem. In the event that an executor is killed during processing, the data will also be lost and it may be that Spark will not be able to recreate this DataFrame from the DAG.
val dfWithBrokenLineage = previousDf.localCheckpoint()
Locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. Local checkpoints are written to executor storage and despite potentially faster they are unreliable and may compromise job completion.
#3: ReCreate DataFrame / DataSet
The third option is to simply create a new DataFrame / DataSet based on the current one. Thanks to this, we are able to break the DAG lineage and by the way we do not cause any traffic on the network and we do not burden HDFS and NameNodes. From performance perspective I didn’t notice too much influence on time. (how to Apache Spark Break DAG lineage)
// For DataFrame val dfWithBrokenLineage = session.createDataFrame(previousDf.rdd, previousDf.schema) // For DataSet val dsWithBrokenLineage = session.createDataset(previousDs.rdd)
:: DeveloperApi :: Creates a DataFrame from an RDD containing Rows using the given schema. It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. ::
Experimental :: Creates a Dataset from an RDD of a given type. This method requires an encoder (to convert a JVM object of type T to and from the internal Spark SQL representation) that is generally created automatically through implicits from a SparkSession, or can be created explicitly by calling static methods on Encoders
how to Apache Spark Break DAG lineage
That’s all about topic: Apache Spark Break DAG lineage. Enjoy!
Apache Spark Databricks
If you are more interesting in Spark I recommend you to read about Apache Spark Databricks.
Big Data Analytics, spark checkpoint localCheckpoint, spark long lineage issue
Could You Please Share This Post? I appreciate It And Thank YOU! :) Have A Nice Day!
YOU MIGHT ALSO LIKE
We are sorry that this post was not useful for you!
Let us improve this post!
Tell us how we can improve this post?