Autoscaling in Spark

Each Spark cluster contains a configured maximum and minimum number of nodes. A cluster starts with the minimum number of nodes and can scale up to maximum. Later, it can scale back to the minimum, depending on the cluster workload. This topic explains how a Spark cluster and job applications autoscale, and discusses various settings to fine-tune autoscaling. See Autoscaling in Qubole Clusters for a broader discussion.

Advantages of Autoscaling

Autoscaling clusters provides the following benefits:

  • Adds nodes when the load is high

  • Contributes to good cost management as the cluster capacity is dynamically scaled up and down as required

Autoscaling Spark jobs provides the following benefits:

  • Decides on the optimum number of executors required for a Spark job based on the load

Understanding Spark Autoscaling Properties

The following table describes the autoscaling properties of Spark.

Note

Qubole supports open-source dynamic allocation properties in Spark 1.6.1 and later versions.

Property Name

Default Value

Description

spark.qubole.autoscaling.enabled

true

Enables autoscaling. Not applicable to Spark 1.6.1 and later versions.

spark.dynamicAllocation.enabled

true

Enables autoscaling. Only applicable to Spark 1.6.1 and later versions.

spark.dynamicAllocation.maxExecutors

If it is not set, default is spark.executor.instances.

The maximum number of executors to be used. Its Spark submit option is --max-executors.

spark.executor.instances

If it is not set, default is 2.

The minimum number of executors. Its Spark submit option is --num-executors.

spark.qubole.autoscaling.stagetime

2 * 60 * 1000 milliseconds

If expectedRuntimeOfStage is greater than this value, increase the number of executors.

spark.qubole.autoscaling.memorythreshold

0.75

If memory used by the executors is greater than this value, increase the number of executors.

spark.qubole.autoscaling.memory.downscaleCachedExecutors

true

Executors with cached data are also downscaled by default. Set its value to false if you do not want downscaling in presence of cached data. It is not applicable to Spark 1.6.1 and later versions..

spark.dynamicAllocation.cachedExecutorIdleTimeout

Infinity

Timeout in seconds. If an executor with cached data has been idle for more than this configured timeout, it gets removed. It is applicable only to Spark 1.6.1, 1.6.2 and later versions.

Note

The spark.qubole.max.executors parameter is deprecated, however, it continues to work. If you specify both spark.qubole.max.executors and spark.dynamicAllocation.maxExecutors parameters, then spark.dynamicAllocation.maxExecutors overrides spark.qubole.max.executors.

Spark Configuration Recommendations

These are a few points to remember related to Spark cluster and job configuration in general:

  • Set --max-executors. Other parameters are ideally not required to be set as the default parameters are sufficient.

  • --num-executors or spark.executor.instances acts as a minimum number of executors with a default value of 2. The minimum number of executors does not imply that the Spark application waits for the specific minimum number of executors to launch, before it starts. The specific minimum number of executors only applies to autoscaling. For example, during the application start-up:

    1. If YARN is unable to schedule resources for --num-executors or spark.executor.instances, the Spark application starts with as many executors as it can schedule.

    2. Once --num-executors or spark.dynamicAllocation.minExecutors executors are allocated, it never goes below that number.

  • Try to avoid setting too many job-level parameters.

Note

--max-executors is the Spark submit option for spark.dynamicAllocation.maxExecutors and --num-executors is the Spark submit option for spark.executor.instances.

In Spark, autoscaling can be done at both the cluster level and the job level. See the following topics for more information:

Spark on Qubole’s capabilities include fine-grained downscaling, downscaling of cached executors after idle timeout, and support for open-source dynamic allocation configurations.

Autoscaling in Spark Clusters

A Spark cluster spins up with the configured minimum number nodes and can scale up to the maximum depending on the load. Once the load drops, the cluster scales down towards the minimum.

Qubole runs Spark on YARN: each Spark application is submitted as a YARN application. By default, Spark uses a static allocation of resources. That is, when you submit a job, exact resource requirements are specified. The application requests containers and YARN allocates the containers.

Here is an example of a Spark 2.0.0 cluster:

Property Name

Property Value

minimum nodes

2

maximum nodes

10

node type

(Choose a large instance type; for example 8 cores, 30G memory)

spark.dynamicAllocation.enabled

true

yarn.nodemanager.resource.memory

26680 MB

spark.yarn.executor.memoryOverhead

1024 MB

spark.executor.memory

12 GB

spark.executor.cores

4

If a job with a minimum number of executors set to 4 is submitted to the cluster, YARN schedules two containers in the first worker node and the other two containers in the second worker node. The ApplicationMaster takes up an additional container.

Here is the logic to find the number of executors per node from the above example of a Spark 2.0.0 cluster.

Total memory = 30 GB
yarn.nodemanager.resource.memory = 26680 MB
If number of executor per node = 2

Total resource memory = number of executors per node * (spark.executor.memory + spark.yarn.executor.memoryOverhead)
That is 2 * (12 GB + 1 GB) = 26 GB

Which is equivalent to the value of yarn.nodemanager.resource.memory

Here is the logic to check whether the number of cores per executor is correct from the above example of a Spark 2.0.0 cluster.

Total number of cores = 8
If spark.executor.cores = 4 and number of executor per node = 2

Total number of cores = spark.executor.cores * number of executors per node

In the above table, spark.executor.cores = 4 and number of executors per node = 2
Hence, total number of cores = 4 * 2
Thus, the total number of cores = 8

Now, if you submit a new job to the same cluster in parallel, YARN does not have enough resources to run it, and this triggers Qubole’s YARN-level autoscaling: YARN figures out that two more nodes are required for the new job to run and requests the two nodes. These nodes are added to the current cluster, for a total of four nodes.

When the job completes, YARN recovers the resources. If the added nodes are idle and there is no active job, the cluster scales back to the minimum number of nodes.

Note

A node is available for downscaling under these conditions.

Autoscaling within a Spark Job

A Spark job uses a set of resources based on the number of executors. These executors are long-running Java Virtual Machines (JVMs) that are up during a Spark job’s lifetime. Statically determining the number of executors required by a Spark application may not get the best results. When you use the autoscaling feature within a Spark application, QDS monitors job progress at runtime and decides the optimum number of executors using SLA-based autoscaling.

By default, autoscaling within a Spark Job is enabled, with the following parameter set to true:

spark.qubole.autoscaling.enabled=true in Spark 1.6.0 and earlier versions

or

spark.dynamicAllocation.enabled=true in Spark 1.6.1 and later versions (including all versions supported by Azure and Oracle OCI.

Note

These settings become active only when you configure spark.dynamicAllocation.maxExecutors.

When the first Spark job is submitted, the Spark cluster starts with two nodes, the configured minimum. In the configuration described above, each node can have two executors. When the first Spark job is submitted, the cluster spins up with two large instances as worker nodes.

Depending on the job progress, or when new jobs are submitted, the Spark job-level autoscaler decides to add or release executors at runtime. The cluster starts with eight executors (running on two large instances) and can autoscale up to 20 executors (running on ten large instances). It downscales back towards the minimum eight executors if the workload declines.

Handling Spot Node Loss in Spark Clusters (AWS)

Spark on Qubole proactively identifies the nodes that undergo Spot loss, and stops scheduling tasks on the corresponding executors.

When the Spark AM receives the spot loss notification from the RM, it notifies the Spark driver. The driver then performs the following actions:

  1. Identifies all the executors affected by the upcoming node loss.

  2. Moves all of the affected executors to a decommissioning state, and no new tasks are scheduled on these executors.

  3. Waits for the next 120 seconds for the executors to finish the running tasks.

  4. Kills these executors after 120 seconds; instead of waiting for heartbeat timeout from those executors, it proactively identifies executor loss.

  5. Starts the failed tasks (if any) on other executors.

This feature is supported on Spark versions 2.1.0, 2.1.1, 2.2-latest, and later versions, and is controlled using the spark configuration spark.qubole.spotloss.handle.

By default, the Spark configuration spark.qubole.spotloss.handle is set to true.

For information about disabling this feature, see Handling Spot Node Loss in Spark Clusters (AWS).

For more information about spot node loss handling in Qubole, see Exploiting AWS Spot Instance Termination Notifications in Qubole.

Handling Spot Node Loss and Spot Blocks in Spark Clusters

Spark on Qubole handles Spot Node Loss and Spot Blocks in Spark clusters by using YARN status of Graceful-Decommission.

Note

This feature is not enabled by default. Create a ticket with Qubole Support to enable it.

When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks) notification from the RM, it notifies the Spark driver. The driver then performs the following actions:

  1. Identifies all the executors affected by the upcoming node loss.

  2. Moves all of the affected executors to a decommissioning state, and no new tasks are scheduled on these executors.

  3. Kills all the executors after reaching 50% of the termination time.

  4. Starts the failed tasks (if any) on other executors.

  5. For these nodes, it removes all the entries of the shuffle data from the map output tracker on driver after reaching 90% of the termination time. This helps in preventing the shuffle-fetch failures due to spot loss.

  6. Recomputes the shuffle data from the lost node by stage resubmission and at the time shuffles data of spot node if required.

This feature is supported on Spark 2.4.0 and later versions, and is controlled using the spark configuration spark.graceful_decommission_enable.

Changing from Qubole Dynamic Allocation Strategy

Qubole supports open-source dynamic allocation strategy in addition to Qubole’s dynamic allocation strategy which is the default, that is spark.dynamicAllocation.strategy=org.apache.spark.dynamicallocation.QuboleAllocationStrategy.

To change the Qubole dynamic allocation strategy to open source dynamic allocation strategy, set spark.dynamicAllocation.strategy=org.apache.spark.dynamicallocation.DefaultAllocationStrategy. With this, you can as is use all open-source dynamic allocation configurations such as spark.dynamicAllocation.maxExecutors, spark.dynamicAllocation.minExecutors, and spark.dynamicAllocation.initialExecutors.

Autoscaling Examples

The following section describes different scenarios of autoscaling in Spark.

Autoscaling Nodes Running in a Single Cluster

For Spark clusters, autoscaling is enabled by default. QDS increases the number of nodes, up to the cluster’s maximum size, if multiple big jobs are submitted to the cluster.

Conversely, QDS reduces the number of nodes, down to the cluster’s minimum size, as the workload declines.

Upscaling a Single Memory Intensive Spark Job

You can set a limit on the executor memory a job can use by setting spark.executor.memory.

For example, in the cluster described above, if the executor memory is configured to be 25G and the worker nodes have 30GB of memory, only one executor can run on one node. The first Spark job starts with two executors (because the minimum number of nodes is set to two in this example).

The cluster can autoscale to a maximum of ten executors (because the maximum number of nodes is set to ten).

Running Many Jobs on a Single Cluster

You can set a limit on the maximum number of executors a job can use by setting the property spark.dynamicAllocation.maxExecutors. This configuration is usually preferred when there are many jobs in parallel and sharing the cluster resources becomes a necessity.

If the cluster resources are being fully used, new jobs either upscale the cluster if it is not yet at its maximum size, or wait until current jobs complete.

Autoscaling Executors in a Spark Job

By default, autoscaling of executors is enabled in a Spark job. The number of executors increases up to the maximum if the Spark job is long-running or memory-intensive.

Configuring Autoscaling Parameters for a Spark Job Stage Runtime

You can set a threshold for the job’s expected stage runtime by setting the property, spark.qubole.autoscaling.stagetime. Executors are added to the Spark job if the expected stage runtime is greater than the spark.qubole.autoscaling.stagetime value.

Note

The expected stage runtime is calculated only after the first task’s completion.

Adding Executors in a Single Spark Job with Memory-intensive Executors

You can set a threshold for the job’s expected stage runtime by setting the property, spark.qubole.autoscaling.memorythreshold, which is an autoscaling memory alogrithm. Executors are added to the Spark job if the executor memory exceeds spark.qubole.autoscaling.memorythreshold.