Spark enhancements for elasticity and resiliency on Amazon EMR
To avoid some of these issues and help customers take full advantage of Amazon EMR’s elasticity features with Spark, Amazon EMR has customizations to open-source Spark that make it more resilient to node loss. Recomputation is minimized, and jobs can recover faster from node failures and EC2 instance termination. These improvements are in Amazon EMR release version 5.9.0 and later.
This blog post provides an overview of the issues with how open-source Spark handles node loss and the improvements in Amazon EMR to address the issues.
How Spark handles node loss
When a node goes down during an active Spark job, it has the following risks:
- Tasks that are actively running on the node might fail to complete and have to run on another node.
- Cached RDDs (resilient distributed dataset) on the node might be lost. While this does impact performance, it does not cause failures or impact the stability of the application.
- Shuffle output files in memory, or those written to disk on the node, would be lost. Because Amazon EMR enables the External Shuffle Service by default, the shuffle output is written to disk. Losing shuffle files can bring the application to a halt until they are recomputed on another active node, because future tasks might depend on them. For more information about shuffle operations, see Shuffle operations.
To recover from node loss, Spark should be able to do the following:
- If actively running tasks are lost, they must be scheduled on another node. In addition, computing for the unscheduled remaining tasks must resume.
- Shuffle output that was computed on the lost node must be recomputed by re-executing the tasks that produced those shuffle blocks.
The following is the sequence of events for Spark to recover when a node is lost:
- Spark considers actively running tasks on the node as failed and reruns them on another active node.
- If the node had shuffle output files that are needed by future tasks, the target executors on other active nodes get a FetchFailedException while trying to fetch missing shuffle blocks from the failed node.
- When the FetchFailedException happens, the target executors retry fetching the blocks from the failed node for a time determined by the spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait configuration values. After all the retry attempts are exhausted, the failure is propagated to the driver.
- When the driver receives the FetchFailedException, it marks the currently running shuffle stage during which the failure occurred as failed and stops its execution. It also marks the shuffle output on the node or executors from which shuffle blocks could not be fetched as unavailable/lost, so that they can be recomputed. This triggers the previous Map stage to re-attempt recomputing those missing shuffle blocks.
- After the missing shuffle output is computed, a re-attempt of the failed shuffle stage is triggered to resume the job from where it stopped. It then runs tasks that failed or had not been scheduled yet.
Issues with Spark’s handling of node loss
Spark’s recovery process helps it recover random executor and node failures that can occur in any cloud environment. However, the recovery process begins only after the node has already failed and Spark gets a FetchFailedException while trying to fetch shuffle blocks. This causes some of the issues described in this section.
Amazon EMR can begin the recovery early, as it knows when and which nodes are going down because of a manual resize, an EC2-triggered Spot instance termination, or an automatic scaling event. It can inform Spark immediately about these nodes, so that Spark can take pro-active actions to gracefully handle loss of nodes and start recovery early. However, Spark currently does not have any mechanism through which it can be notified that a node is going down, such as YARN decommissioning. Therefore, it can not take immediate and relevant actions to help recover faster. As a result, here are some of the issues with Spark’s recovery:
- The node goes down in the middle of the Map stage, as shown in the following diagram:
In this scenario, the shuffle stage is scheduled unnecessarily, and the application must wait for the FetchFailedException before recomputing the lost shuffle. This takes a lot of time. Instead, it would be better if all lost shuffles could be immediately recomputed in the Map stage before even proceeding to the shuffle stage.
- The node goes down in the middle of a shuffle stage, as shown in the following diagram:
If there was way to immediately inform Spark about node loss, instead of it depending on FetchFailedException and retry fetching, that would save on recovery time.
- The Spark driver starts recomputation when it gets the first FetchFailedException. It considers the shuffle files on the lost node as missing. However, if multiple nodes go down at the same time, in its first re-attempt of the previous Map stage, the Spark driver recomputes only the shuffle output for the first node from which it received a FetchFailedException. In the short time between receiving the first fetch failure and starting the re-attempt, it is possible that the driver receives fetch failures from other failed nodes. As a result, it can recompute shuffles for multiple lost nodes in the same re-attempt, but there is no guarantee.In most cases, even though nodes go down at the same time, Spark requires multiple re-attempts of the map and shuffle stages to recompute all of the lost shuffle output. This can easily cause a job to be blocked for a significant amount of time. Ideally, Spark could recompute in only one retry the shuffle output on all nodes that were lost around the same time.
- As long as it can reach a node that is about to go down, Spark can continue to schedule more tasks on it. This causes more shuffle outputs to be computed, which may eventually need to be recomputed. Ideally, these tasks can be redirected to healthy nodes to prevent recomputation and improve recovery time.
- Spark has a limit on the number of consecutive failed attempts allowed for a stage before it aborts a job. This is configurable with spark.stage.maxConsecutiveAttempts. When a node fails and a FetchFailedException occurs, Spark marks running shuffle stage as failed and triggers a re-attempt after computing the missing shuffle outputs. Frequent scaling of nodes during shuffle stages can easily cause stage failures to reach the threshold and abort the jobs. Ideally, when a stage fails for valid reasons such as a manual scale in, an automatic scaling event, or an EC2-triggered Spot instance termination, there should be a way to tell Spark not to count this toward spark.stage.maxConsecutiveAttempts for that stage.
How Amazon EMR resolves these issues
This section describes the three main enhancements that Amazon EMR has done to its Spark to resolve the issues described in the previous section.
Integrate with YARN’s decommissioning mechanism
Spark on Amazon EMR uses YARN as the underlying manager for cluster resources. Amazon EMR has its own implementation of a graceful decommissioning mechanism for YARN that provides a way to gracefully shut down YARN node managers by not scheduling new containers on a node in the Decommissioning state. Amazon EMR does this by waiting for the existing tasks on running containers to complete, or time out, before the node is decommissioned. This decommissioning mechanism has recently been contributed back to open source Hadoop.
We integrated Spark with YARN’s decommissioning mechanism so that the Spark driver is notified when a node goes through Decommissioning or Decommissioned states in YARN. This is shown in the following diagram:
This notification allows the driver to take appropriate actions and start the recovery early, because all nodes go through the decommissioning process before being removed.
Extend Spark’s blacklisting mechanism
YARN’s decommissioning mechanism works well for Hadoop MapReduce jobs by not launching any more containers on decommissioning nodes. This prevents more Hadoop MapReduce tasks from being scheduled on that node. However, this does not work well for Spark jobs because in Spark each executor is assigned a YARN container that is long-lived and keeps receiving tasks.
Preventing new containers from being launched only prevents more executors from being assigned to the node. Already active executors/containers continue to schedule new tasks until the node goes down, and they can end up failing and have to be rerun. Also, if these tasks write shuffle output, they would also be lost. This increases the recomputation and the time that it takes for recovery.
To address this, Amazon EMR extends Spark’s blacklisting mechanism to blacklist a node when the Spark driver receives a YARN decommissioning signal for it. This is shown in the following diagram:
This prevents new tasks from being scheduled on the blacklisted node. Instead they are scheduled on healthy nodes. As soon as tasks already running on the node are complete, the node can be safely decommissioned without the risk of task failures or losses. This also speeds up the recovery process by not producing more shuffle output on a node that is going down. This reduces the number of shuffle outputs to be recomputed. If the node comes out of the Decommissioningstate and is active again, Amazon EMR removes the node from the blacklists so that new tasks can be scheduled on it.
This blacklisting extension is enabled by default in Amazon EMR with the spark.blacklist.decommissioning.enabledproperty set to true. You can control the time for which the node is blacklisted using the spark.blacklist.decommissioning.timeout property, which is set to 1 hour by default, equal to the default value for yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs. We recommend setting spark.blacklist.decommissioning.timeout to a value equal to or greater than yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs to make sure that Amazon EMR blacklists the node for the entire decommissioning period.
Actions for decommissioned nodes
After a node is decommissioning, no new tasks are getting scheduled, and the active containers become idle (or the timeout expires), the node gets decommissioned. When the Spark driver receives the decommissioned signal, it can take the following additional actions to start the recovery process sooner rather than waiting for a fetch failure to occur:
- All of the shuffle outputs on the decommissioned node are unregistered, thus marking them as unavailable. Amazon EMR enables this by default with the setting spark.resourceManager.cleanupExpiredHost set to true. This has the following advantages:
- If a node is lost in the middle of a map stage and gets decommissioned, Spark initiates recovery and recomputes the lost shuffle outputs on the decommissioned node, before proceeding to the next Stage. This prevents fetch failures in the shuffle stage, because Spark has all of the shuffle blocks computed and available at the end of map stage, which significantly speeds up recovery.
- If a node is lost in the middle of a shuffle stage, the target executors trying to get shuffle blocks from the lost node immediately notice that the shuffle output is unavailable. It then sends the failure to the driver instead of retrying and failing multiple times to fetch them. The driver then immediately fails the stage and starts recomputing the lost shuffle output. This reduces the time spent trying to fetch shuffle blocks from lost nodes.
- The most significant advantage of unregistering shuffle outputs is when a cluster is scaled in by a large number of nodes. Because all of the nodes go down around the same time, they all get decommissioned around the same time, and their shuffle outputs are unregistered. When Spark schedules the first re-attempt to compute the missing blocks, it notices all of the missing blocks from decommissioned nodes and recovers in only one attempt. This speeds up the recovery process significantly over the open-source Spark implementation, where stages might be rescheduled multiple times to recompute missing shuffles from all nodes, and prevent jobs from being stuck for hours failing and recomputing.
- When a stage fails because of fetch failures from a node being decommissioned, by default, Amazon EMR does not count the stage failure toward the maximum number of failures allowed for a stage as set by spark.stage.maxConsecutiveAttempts. This is determined by the setting spark.stage.attempt.ignoreOnDecommissionFetchFailure being set to true. This prevents a job from failing if a stage fails multiple times because of node failures for valid reasons such as a manual resize, an automatic scaling event, or an EC2-triggered Spot instance termination.
This post described how Spark handles node loss and some of the issues that can occur if a cluster is scaled in during an active Spark job. It also showed the customizations that Amazon EMR has built on Spark, and the configurations available to make Spark on Amazon EMR more resilient, so that you can take full advantage of the elasticity features offered by Amazon EMR.