Spark Best Practices

This topic describes best practices for running Spark jobs.

Spark Configuration Recommendations

  • Set --max-executors. Other parameters are ideally not required to be set as the default parameters are sufficient.
  • Try to avoid setting too many job-level parameters.

Ensuring Jobs Get their Fair Share of Resources

To prevent jobs from blocking each other, you should use the YARN Fair Scheduler to ensure that each job gets its fair share of resources. For example, if each interpreter can use a maximum of 100 executors, and ten users are running one interpreter each, the YARN Fair Scheduler can ensure that each user’s job gets about ten executors. As the number of running jobs rises or declines, the number of executors each gets will decline or rise inversely– 100 jobs will get one executor each; if there is only one job, it will get all 100 executors.

To configure this, configure notebooks and interpreters for the running Spark cluster as follows. (For information about using notebooks and interpreters, see Running Spark Applications in Notebooks.)

Note

Do not do this if you have enabled interpreter user mode; in that case, QDS configures the Fair Scheduler for you.

  1. Assign an interpreter for each user and note. This ensures that each job is submitted to YARN as a separate application.
  2. Edit the interpreter to use a fairly low value for spark.executor.instances, so that an interpreter that is not in use does not hold on to too many executors.

Specifying Dependent Jars for Spark Jobs

You can specify dependent jars using these two options:

  • In the QDS UI’s Analyze query composer for a Spark command, add a Spark Submit argument such as the following to add jars at the job level:

    --jars s3://bucket/dir/x.jar,s3n://bucket/dir2/y.jar --packages "com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M1"

    Replace s3:// or s3n:// with wasb://, adl://, or abfs[s]:// for Azure; or oci:// for Oracle OCI.

    The following figure shows dependent jars added as a Spark Submit command-line option for AWS:

    ../../../_images/SparkSubmitOptions.png
  • Another option for specifying jars is to download jars to /usr/lib/spark/lib via the node bootstrap script. Here is an AWS example of specifying jars in the node bootstrap script.

    hdfs dfs -get s3://bucket/path/app.jar /usr/lib/spark/lib/
    hdfs dfs -get s3://bucket/path/dep1.jar /usr/lib/spark/lib/
    hdfs dfs -get s3://bucket/path/dep2.jar /usr/lib/spark/lib/
    

Replace s3:// with wasb://, adl://, or abfs[s]:// for Azure; or oci:// for Oracle OCI.

Improving the Speed of Data Writes Cloud Directories

DirectFileOutputCommitter (DFOC) helps improve the speed of data writes to Cloud directories. It is enabled by default on all Spark clusters and supported Spark versions.

The basic write operations in Spark such as saveAsTextFile and save() accept the mapreduce DFOC parameters.

Note

Using DFOC also requires setting spark.speculation to false. It is set to false by default.

The following parameters are DFOC settings configured as cluster-level Spark configuration.

Note

For the syntax for setting a Spark cluster override, see Overriding the Spark Default Configuration.

spark.hadoop.mapred.output.committer.class org.apache.hadoop.mapred.DirectFileOutputCommitter
spark.hadoop.mapreduce.use.directfileoutputcommitter true
spark.hadoop.spark.sql.parquet.output.committer.class org.apache.spark.sql.parquet.DirectParquetOutputCommitter

To disable DFOC on a particular Spark cluster, you can set the configuration override as below.

spark.hadoop.mapreduce.use.directfileoutputcommitter false
spark.hadoop.mapred.output.committer.class org.apache.hadoop.mapred.FileOutputCommitter
spark.hadoop.spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter

The following parameters are DFOC settings configured by default in a Spark job.

Note

For the syntax for setting a Spark job configuration, see Autoscaling within a Spark Job.

spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectFileOutputCommitter
spark.hadoop.mapreduce.use.directfileoutputcommitter=true
spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter

Note

While using DFOC with Spark, if a task fails after writing files partially, the subsequent reattempts might fail with FileAlreadyExistsException (because of the partial files that are left behind). Therefore, the job fails. You can set the spark.hadoop.mapreduce.output.textoutputformat.overwrite and spark.qubole.outputformat.overwriteFileInWrite flags to true to prevent such job failures.

To disable DFOC for a particular Spark job, you can set the following configuration in the Spark job.

spark.hadoop.mapreduce.use.directfileoutputcommitter=false
spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.FileOutputCommitter
spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.parquet.hadoop.ParquetOutputCommitter

For more information on how to override the default Spark cluster configuration, see Configuring a Spark Cluster.

Multipart Upload Based File Output Committer in Spark on Qubole (AWS)

Multipart Upload Based File Output Committer (MFOC) in Spark on Qubole leverages Multipart Upload design offered by S3. MFOC improves the task commit performance when compared to FOC v1 and v2, and provides better result consistency in terms of result file visibility compared to DFOC, which is the default FOC in Spark on Qubole.

MFOC is not enabled by default. You can enable this feature from the Control Panel >> Account Features page. For more information about enabling features, see Managing Account Features.

MFOC is supported in the following use cases:

  • When Spark writes data on S3 for a pure data source table with static partitioning.
  • When Spark writes data on S3 for a pure data source table with dynamic partitioning enabled.

MFOC is not supported in the following use cases:

  • Writing to Hive data source tables.
  • Writing to non S3 cloud stores.

The following table lists the advantages and disadvantages of using MFOC.

Advantages Disadvantages
  • As MFOC is based on AWS’s Multipart Upload feature, the control on visibility of the output files is more integrated with the success or failure of tasks and jobs.
  • When the job is successful, the finalize command of the Multipart Upload on uploaded files makes them visible, which until then is not visible.
  • The clean-up of failed jobs is controlled through Bucket Lifecycle Policy.
  • Performance of MFOC is faster than the File Output Commit protocol v1 and v2.
  • In a high load use case, upload using Multipart Upload could be slow or might fail. In such a case, it is recommended to adjust few tuning parameters that help smoothen the upload. See Troubleshooting information.
  • MFOC is slower than Qubole’s prescribed DirectWrite. The measured performance difference is less than 10%, for an internal test benchmark for a billion row insert operation.

Handling Skew in the Join

To handle skew in the join keys, you can specify the hint ` /*+ SKEW ('<table_name>') */ ` for a join that describes the column and the values upon which skew is expected. Based on that information, the engine automatically ensures that the skewed values are handled appropriately.

You can specify the hint in the following formats:

  • Format 1:

    /*+ SKEW('<tableName>') */
    

    This shows that all the columns in a given table are skewed and the value on which they are skewed is not known. With this hint, the Spark optimizer tries to identify the values on which the column involved in the join is skewed. This operation is performed when the Spark optimizer identifies that a column is involved in the join and then it samples the data on the table.

    Example: In a query, suppose there is a table t1 where all columns involved in the join are skewed. But the skew values are unknown. In this case, you can specify the skew hint as ` /*+ SKEW('t1') */ `.

  • Format 2:

    /*+ SKEW ('<tableName>', (<COLUMN-HINT>), (<ANOTHER-COLUMN-HINT>)) */
    

    <COLUMN-HINT> can be either a column name (example, column1) or a column name and list of values on which the column is skewed (example - column1, ('a', 'b', 'c')). The Spark optimizer identifies the skew values from the hint. As a result, the sampling of data is not required.

    Example: Suppose there is a table t1 with 4 columns - c1, c2, c3, c4. Consider that c1 is skewed on value ‘a’ and ‘b’, c2 and c3 are also skewed but the skew values are unknown, and c4 is not a skewed column. In this case, you can specify the hint as ` /*+ SKEW('t1', ('c1', ('a', 'b')), ('c2'), ('c3')) */ `.

Example Query

SELECT /*+ SKEW('t1', ('c1', ('a', 'b')), ('c2'), ('c3')) */ *

FROM

(SELECT t2.c1 as temp_col1 from t1 join t2 on t1.c1 = t2.c1) temp_table1 JOIN

(SELECT t3.c2 as temp_col2 from t1 join t3 on t1.c2 = t3.c2) temp_table2

 WHERE temp_table1.temp_col1 = temp_table2.temp_col2 .

Optimizing Query Execution with Adaptive Query Execution

Spark on Qubole supports Adaptive Query Execution on Spark 2.4.3 and later versions, with which query execution is optimized at the runtime based on the runtime statistics.

At runtime, the adaptive execution mode can change shuffle join to broadcast join if the size of one table is less than the broadcast threshold. Spark on Qubole Adaptive execution also supports handling skew in input data, and optimizes the joins using Qubole skew join optimization. In general, adaptive execution decreases the effort involved in tuning SQL query parameters, and improves the execution performance by selecting a better execution plan and parallelism at runtime.

Handling Eventual Consistency Failures in Spark FileOutputCommitter Jobs (AWS)

Spark does not honor DFOC when appending Parquet files, and thus it is forced to use FileOutputCommitter. To handle eventual consistency errors during FileOutputCommiter data writes to S3, and increase the speed of data writes, Qubole provides the configurable option spark.hadoop.mapreduce.use.parallelmergepaths for Spark jobs. Set the option to true and FileOutputCommitter version to 1.

The equivalent parameter to set in Hadoop jobs with Parquet data is mapreduce.use.parallelmergepaths. For more information, see Handling Eventual Consistency Failures in Hadoop FileOutputCommitter Jobs (AWS).

Configuring the Spark External Shuffle Service

The Spark external shuffle service is an auxiliary service which runs as part of the Yarn NodeManager on each worker node in a Spark cluster. When enabled, it maintains the shuffle files generated by all Spark executors that ran on that node.

Spark executors write the shuffle data and manage it. If the Spark external shuffle service is enabled, the shuffle service manages the shuffle data, instead of the executors. This helps in downscaling the executors, because the shuffle data is not lost when the executors are removed. It also helps improve the behavior of the Spark application in case of error because the shuffle data does not need to be re-processed when an executor crashes.

In open-source Spark, Spark job-level autoscaling (also known as Spark Dynamic Allocation) works in tandem with the external shuffle service and the shuffle service is mandatory for autoscaling to work. See Spark Shuffle Behavior for more information. In Spark on Qubole, on the other hand, the external shuffle service is optional and Qubole-based Spark job-level autoscaling works whether or not the shuffle service is enabled. (If the external shuffle service is disabled, the executors are not removed until the shuffle data goes away.)

Qubole provides the Spark external shuffle service in in Spark 1.5.1 and later supported Spark versions.

The external shuffle service is enabled by default in Spark 1.6.2 and later versions. To disable it, set spark.shuffle.service.enabled to false.

Spark external shuffle service is not enabled by default in Spark 1.5.1 and Spark 1.6.0. To enable it for one of these versions, configure it as follows:

  • Override Hadoop Configuration Variables

    Before starting a Spark cluster, pass the following Hadoop overrides to start Spark external shuffle service:

    yarn.nodemanager.aux-services=mapreduce_shuffle,spark_shuffle
    yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService
    

    See Advanced Configuration: Modifying Hadoop Cluster Settings for setting the Hadoop override configuration variables in the QDS UI.

  • Spark Configuration Variable

    Set the configuration to enable external shuffle service on a Spark application, a Spark cluster or a Spark notebook.

    Enabling External Shuffle Service on a Spark Cluster

    Set the following configuration in the Override Spark Configuration Variables text box of the cluster configuration page:

    spark-defaults.conf:
    
    spark.shuffle.service.enabled    true
    

    See Configuring a Spark Cluster for more information.

    Note

    If you set spark.shuffle.service.enabled to false, then the Spark application does not use the external shuffle service.

    Enabling External Shuffle Service on a Spark Command

    Configure the following setting as a Spark-submit option in the command/query composer while composing a Spark application:

    --conf spark.shuffle.service.enabled=true

    See Composing Spark Commands in the Analyze Page for more information.

    For example, sqlContext.sql("select count(*) from default_qubole_memetracker").collect() generates a lot of shuffle data. So, set --conf spark.shuffle.service.enabled=true in the bin/spark-shell

    Enabling External Shuffle Service on a Spark Notebook

    Add spark.shuffle.service.enabled as an interpreter setting and add its Value as true in a Spark notebook’s Interpreter. Bind the Spark Interpreter settings to the notebook that you use if it is not bound already. See Running Spark Applications in Notebooks and Understanding Spark Notebooks and Interpreters for more information.

External shuffle service logs are part of the NodeManager logs located at /media/ephemeral0/logs/yarn/yarn-nodemanager*.log. NodeManager logs are present on each worker node in the cluster.

Continuously Running Spark Streaming Applications

You can continuously run Spark streaming applications by setting the following parameters:

  • Set yarn.resourcemanager.app.timeout.minutes=-1 as an Hadoop override at the Spark cluster level.
  • To avoid all Spark streaming applications on a specific cluster from being timed out, set spark.qubole.idle.timeout -1 as a Spark configuration variable in the Override Spark Configuration Variables text field of the Spark cluster configuration UI page. See Configuring a Spark Cluster for more information.

Using UDFs in Spark SQL

An UDF (user-defined function) is a way of adding a function to Spark SQL. It operates on distributed DataFrames and works row-by-row unless it is created as an user-defined aggregation function. Open-source Spark provides two alternative methods:

  • Using Hive functions
  • Using Scala functions

The following example uses Hive functions to add an UDF and use it in Spark SQL.

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

class SimpleUDFExample extends UDF {
  def evaluate(input: Text) : Text = {
    if (input == null) return null
    return new Text("Hello " + input.toString)
  }
}
object sqltest {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf())
    val sqlContext = new  org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.sql("create temporary function hello as 'SimpleUDFExample'")
    val result = sqlContext.sql("""
        select hello(name) from products_avro order by month, name, price
       """)
    result.collect.foreach(println)
  }
}

For an example using Scala functions, see UDF Registration.

Improving Query Performance with Amazon S3 Select

Amazon S3 Select is integrated with Spark on Qubole to read S3-backed tables created on CSV and JSON files for improved performance.

This feature provides the following capabilities:

  • Automatic conversion: Spark on Qubole automatically converts Spark native tables or Spark datasets in CSV and JSON formats to S3 Select optimized format for faster and efficient data access.
  • Manual creation of tables: You can use S3 Select datasource to create tables on specific CSV and JSON data to improve performance.

To enable S3 Select automatic conversion, set the following flags to true at query level or cluster level:

  • For Hive tables: spark.sql.qubole.convertHiveToS3Select
  • For Spark tables: spark.sql.qubole.convertSparkToS3Select

Enabling the Flags on a Cluster

  1. Navigate to the Clusters page.

  2. Click the Edit button next to required Spark cluster.

  3. On the Edit Cluster Settings page navigate to Advanced Configuration.

  4. Under the SPARK SETTINGS section, enter the following values in the Override Spark Configuration field:

    spark.sql.qubole.convertHiveToS3Select=true
    spark.sql.qubole.convertSparkToS3Select=true
    
  5. Click Update to save the changes.

Enabling the Flags for a Query

Depending on the UI you use for running the query, perform the appropriate action:

  • Analyze

    Enter the following values in the Spark Default Submit Command Line Options field.

    spark.sql.qubole.convertHiveToS3Select=true
    spark.sql.qubole.convertSparkToS3Select=true
    
  • Notebooks

    Enter the following values in the paragraph:

    sql("SET spark.sql.qubole.convertHiveToS3Select=true")
    sql("SET spark.sql.qubole.convertSparkToS3Select=true")
    

Note

If you want to enable S3 Select automatic conversion on your account, create a ticket with Qubole Support.

Creating Tables or Dataframes using S3 Select Datasource

If you want to use S3 Select only for some tables, you can use the S3 Select datasource to create tables on CSV and JSON data for improved performance by using the following commands.

SQL

CREATE TABLE s3select USING "org.apache.spark.sql.execution.datasources.s3select" LOCATION s3://bucket/filename OPTIONS()

Scala

spark.read.format("org.apache.spark.sql.execution.datasources.s3select").load()

Generic Options

Option Name Default Value Other Possible Values
format CSV JSON
compression none GZIP, BZIP2

CSV Specific Options

Option Name Default Value
header false
delimiter ,
lineDelimiter \n
quote '"'
escape '\'
comment "#"

JSON Specific Options

Option Name Default Value Other Possible Values
jsonType Document Line

The following example shows how to read data using S3 Select.

val df = spark.read.format("org.apache.spark.sql.execution.datasources.s3select").option("fileFormat", "csv").load("s3://bucket/table/")
df.select($"id").take(10)

Limitations using S3 Select

  • Compressed tables or files are not supported for automatic conversion of CSV and JSON data.
  • File splitting is not supported.
  • S3 Select does not support complex types such as arrays and objects in JSON.
  • If serde or compression of the data changes over the partitions, S3 Select fails as it is costly to check all the partitions serdes or check for compression of all the files.