Spark Best Practices

This topic describes best practices for running Spark jobs.

Spark Configuration Recommendations

  • Set --max-executors. Other parameters are ideally not required to be set as the default parameters are sufficient.
  • Try to avoid setting too many job-level parameters.

Ensuring Jobs Get their Fair Share of Resources

To prevent jobs from blocking each other, you should use the YARN Fair Scheduler to ensure that each job gets its fair share of resources. For example, if each interpreter can use a maximum of 100 executors, and ten users are running one interpreter each, the YARN Fair Scheduler can ensure that each user’s job gets about ten executors. As the number of running jobs rises or declines, the number of executors each gets will decline or rise inversely– 100 jobs will get one executor each; if there is only one job, it will get all 100 executors.

To configure this, configure notebooks and interpreters for the running Spark cluster as follows. (For information about using notebooks and interpreters, see Running Spark Applications in Notebooks.)

Note

Do not do this if you have enabled interpreter user mode; in that case, QDS configures the Fair Scheduler for you.

  1. Assign an interpreter for each user and note. This ensures that each job is submitted to YARN as a separate application.
  2. Edit the interpreter to use a fairly low value for spark.executor.instances, so that an interpreter that is not in use does not hold on to too many executors.

Specifying Dependent Jars for Spark Jobs

You can specify dependent jars using these two options:

  • In the Analyze UI query composer for a Spark command, add a Spark Submit argument such as the following to add jars at the job level:

    --jars s3://bucket/dir/x.jar,s3n://bucket/dir2/y.jar --packages "com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M1"

    Replace s3:// or s3n:// with wasb:// or adl:// for Azure; or``oraclebmc://`` for Oracle OCI.

    The following figure shows dependent jars added as a Spark Submit command-line option for AWS:

    ../../_images/SparkSubmitOptions.png
  • Another option for specifying jars is to download jars to /usr/lib/spark/lib in the node bootstrap script. Here is an AWS example of specifying jars in the node bootstrap script.

    hdfs dfs -get s3://bucket/path/app.jar /usr/lib/spark/lib/
    hdfs dfs -get s3://bucket/path/dep1.jar /usr/lib/spark/lib/
    hdfs dfs -get s3://bucket/path/dep2.jar /usr/lib/spark/lib/
    
Replace s3:// with wasb:// or adl:// for Azure; or oraclebmc:// for Oracle OCI.

Improving the Speed of Data Writes Cloud Directories (AWS and Azure)

DirectFileOutputCommitter (DFOC) helps improve the speed of data writes to Cloud directories. It is enabled by default on all Spark clusters and supported Spark versions.

The basic write operations in Spark such as saveAsTextFile and save() accept the mapreduce DFOC parameters.

Note

Using DFOC also requires setting spark.speculation to false. It is set to false by default.

The following parameters are DFOC settings configured as cluster-level Spark configuration.

Note

For the syntax for setting a Spark cluster override, see Overriding the Spark Default Configuration.

spark.hadoop.mapred.output.committer.class org.apache.hadoop.mapred.DirectFileOutputCommitter
spark.hadoop.mapreduce.use.directfileoutputcommitter true
spark.hadoop.spark.sql.parquet.output.committer.class org.apache.spark.sql.parquet.DirectParquetOutputCommitter

To disable DFOC on a particular Spark cluster, you can set the configuration override as below.

spark.hadoop.mapreduce.use.directfileoutputcommitter false
spark.hadoop.mapred.output.committer.class org.apache.hadoop.mapred.FileOutputCommitter
spark.hadoop.spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter

The following parameters are DFOC settings configured by default in a Spark job.

Note

For the syntax for setting a Spark job configuration, see Auto-scaling within a Spark Job.

spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectFileOutputCommitter
spark.hadoop.mapreduce.use.directfileoutputcommitter=true
spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter

Note

While using DFOC with Spark, if a task fails after writing files partially, the subsequent reattempts might fail with FileAlreadyExistsException (because of the partial files that are left behind). Therefore, the job fails. You can set the spark.hadoop.mapreduce.output.textoutputformat.overwrite and spark.qubole.outputformat.overwriteFileInWrite flags to true to prevent such job failures.

To disable DFOC for a particular Spark job, you can set the following configuration in the Spark job.

spark.hadoop.mapreduce.use.directfileoutputcommitter=false
spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.FileOutputCommitter
spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.parquet.hadoop.ParquetOutputCommitter

For more information on how to override the default Spark cluster configuration, see Configuring a Spark Cluster.

Handling Eventual Consistency Failures in Spark FileOutputCommitter Jobs (AWS)

Spark does not honor DFOC when appending Parquet files, and thus it is forced to use FileOutputCommitter. To handle eventual consistency errors during FileOutputCommiter data writes to S3, and increase the speed of data writes, Qubole provides the configurable option spark.hadoop.mapreduce.use.parallelmergepaths for Spark jobs. Set the option to true and FileOutputCommitter version to 1.

The equivalent parameter to set in Hadoop jobs with Parquet data is mapreduce.use.parallelmergepaths. For more information, see Handling Eventual Consistency Failures in Hadoop FileOutputCommitter Jobs (AWS).

Configuring the Spark External Shuffle Service

The Spark external shuffle service is an auxiliary service which runs as part of the Yarn NodeManager on each Slave node in a Spark cluster. When enabled, it maintains the shuffle files generated by all Spark executors that ran on that node.

Spark executors write the shuffle data and manage it. If the Spark external shuffle service is enabled, the shuffle service manages the shuffle data, instead of the executors. This helps in downscaling the executors, because the shuffle data is not lost when the executors are removed. It also helps improve the behavior of the Spark application in case of error because the shuffle data does not need to be re-processed when an executor crashes.

In open-source Spark, Spark job-level auto-scaling (also known as Spark Dynamic Allocation) works in tandem with the external shuffle service and the shuffle service is mandatory for auto-scaling to work. See Spark Shuffle Behavior for more information. In Qubole Spark, on the other hand, the external shuffle service is optional and Qubole-based Spark job-level auto-scaling works whether or not the shuffle service is enabled. (If the external shuffle service is disabled, the executors are not removed until the shuffle data goes away.)

Qubole provides the Spark external shuffle service in in Spark 1.5.1 and later supported Spark versions.

The external shuffle service is enabled by default in Spark 1.6.2 and later versions. To disable it, set spark.shuffle.service.enabled to false.

Spark external shuffle service is not enabled by default in Spark 1.5.1 and Spark 1.6.0. To enable it for one of these versions, configure it as follows:

  • Override Hadoop Configuration Variables

    Before starting a Spark cluster, pass the following Hadoop overrides to start Spark external shuffle service:

    yarn.nodemanager.aux-services=mapreduce_shuffle,spark_shuffle
    yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService
    

    See Advanced configuration: Modifying Hadoop Cluster Settings for setting the Hadoop override configuration variables in the QDS UI.

  • Spark Configuration Variable

    Set the configuration to enable external shuffle service on a Spark application, a Spark cluster or a Spark notebook.

    Enabling External Shuffle Service on a Spark Cluster

    Set the following configuration in the Override Spark Configuration Variables text box of the cluster configuration page:

    spark-defaults.conf:
    
    spark.shuffle.service.enabled    true
    

    See Configuring a Spark Cluster for more information.

    Note

    If you set spark.shuffle.service.enabled to false, then the Spark application does not use the external shuffle service.

    Enabling External Shuffle Service on a Spark Command

    Configure the following setting as a Spark-submit option in the command/query composer while composing a Spark application:

    --conf spark.shuffle.service.enabled=true

    See Composing a Spark Command for more information.

    For example, sqlContext.sql("select count(*) from default_qubole_memetracker").collect() generates a lot of shuffle data. So, set --conf spark.shuffle.service.enabled=true in the bin/spark-shell

    Enabling External Shuffle Service on a Spark Notebook

    Add spark.shuffle.service.enabled as an interpreter setting and add its Value as true in a Spark notebook’s Interpreter. Bind the Spark Interpreter settings to the notebook that you use if it is not bound already. See Running Spark Applications in Notebooks and Understanding Spark Notebooks and its Interpreters for more information.

External shuffle service logs are part of the NodeManager logs located at /media/ephemeral0/logs/yarn/yarn-nodemanager*.log. NodeManager logs are present on each slave node in the cluster.

Continuously Running Spark Streaming Applications

You can continuously run Spark streaming applications by setting the following parameters:

  • Set yarn.resourcemanager.app.timeout.minutes=-1 as an Hadoop override at the Spark cluster level.
  • To avoid all Spark streaming applications on a specific cluster from being timed out, set spark.qubole.idle.timeout -1 as a Spark configuration variable in the Override Spark Configuration Variables text field of the Spark cluster configuration UI page. See Configuring a Spark Cluster for more information.

Using UDFs in Spark SQL

An UDF (user-defined function) is a way of adding a function to Spark SQL. It operates on distributed DataFrames and works row-by-row unless it is created as an user-defined aggregation function. Open-source Spark provides two alternative methods:

  • Using Hive functions
  • Using Scala functions

The following example uses Hive functions to add an UDF and use it in Spark SQL.

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

class SimpleUDFExample extends UDF {
  def evaluate(input: Text) : Text = {
    if (input == null) return null
    return new Text("Hello " + input.toString)
  }
}
object sqltest {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf())
    val sqlContext = new  org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.sql("create temporary function hello as 'SimpleUDFExample'")
    val result = sqlContext.sql("""
        select hello(name) from products_avro order by month, name, price
       """)
    result.collect.foreach(println)
  }
}

For an example using Scala functions, see UDF Registration.