Spark Best Practices
This topic describes best practices for running Spark jobs.
Multipart Upload Based File Output Committer in Spark on Qubole (AWS)
Handling Eventual Consistency Failures in Spark FileOutputCommitter Jobs (AWS)
Spark Configuration Recommendations
Set
--max-executors
. Other parameters are ideally not required to be set as the default parameters are sufficient.Try to avoid setting too many job-level parameters.
Specifying Dependent Jars for Spark Jobs
You can specify dependent jars using these two options:
In the QDS UI’s Analyze query composer for a Spark command, add a Spark Submit argument such as the following to add jars at the job level:
--jars s3://bucket/dir/x.jar,s3n://bucket/dir2/y.jar --packages "com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M1"
Replace
s3://
ors3n://
withwasb://
,adl://
, orabfs[s]://
for Azure; oroci://
for Oracle OCI.The following figure shows dependent jars added as a Spark Submit command-line option for AWS:
Another option for specifying jars is to download jars to
/usr/lib/spark/lib
via the node bootstrap script. Here is an AWS example of specifying jars in the node bootstrap script.hdfs dfs -get s3://bucket/path/app.jar /usr/lib/spark/lib/ hdfs dfs -get s3://bucket/path/dep1.jar /usr/lib/spark/lib/ hdfs dfs -get s3://bucket/path/dep2.jar /usr/lib/spark/lib/
Replace s3://
with wasb://
, adl://
, or abfs[s]://
for Azure; or oci://
for Oracle OCI.
Improving the Speed of Data Writes Cloud Directories
DirectFileOutputCommitter (DFOC) helps improve the speed of data writes to Cloud directories. It is enabled by default on all Spark clusters and supported Spark versions.
The basic write operations in Spark such as saveAsTextFile and save() accept the mapreduce DFOC parameters.
Note
Using DFOC also requires setting spark.speculation
to false
. It is set to false
by default.
The following parameters are DFOC settings configured as cluster-level Spark configuration.
Note
For the syntax for setting a Spark cluster override, see Overriding the Spark Default Configuration.
spark.hadoop.mapred.output.committer.class org.apache.hadoop.mapred.DirectFileOutputCommitter
spark.hadoop.mapreduce.use.directfileoutputcommitter true
spark.hadoop.spark.sql.parquet.output.committer.class org.apache.spark.sql.parquet.DirectParquetOutputCommitter
To disable DFOC on a particular Spark cluster, you can set the configuration override as below.
spark.hadoop.mapreduce.use.directfileoutputcommitter false
spark.hadoop.mapred.output.committer.class org.apache.hadoop.mapred.FileOutputCommitter
spark.hadoop.spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter
The following parameters are DFOC settings configured by default in a Spark job.
Note
For the syntax for setting a Spark job configuration, see Autoscaling within a Spark Job.
spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectFileOutputCommitter
spark.hadoop.mapreduce.use.directfileoutputcommitter=true
spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter
Note
While using DFOC with Spark, if a task fails after writing files partially, the subsequent reattempts might fail with FileAlreadyExistsException (because of the partial files that are left behind). Therefore, the job fails. You can set the spark.hadoop.mapreduce.output.textoutputformat.overwrite
and spark.qubole.outputformat.overwriteFileInWrite
flags to true
to prevent such job failures.
To disable DFOC for a particular Spark job, you can set the following configuration in the Spark job.
spark.hadoop.mapreduce.use.directfileoutputcommitter=false
spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.FileOutputCommitter
spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.parquet.hadoop.ParquetOutputCommitter
For more information on how to override the default Spark cluster configuration, see Configuring a Spark Cluster.
Multipart Upload Based File Output Committer in Spark on Qubole (AWS)
Multipart Upload Based File Output Committer (MFOC) in Spark on Qubole leverages Multipart Upload design offered by S3. MFOC improves the task commit performance when compared to FOC v1 and v2, and provides better result consistency in terms of result file visibility compared to DFOC, which is the default FOC in Spark on Qubole.
MFOC is not enabled by default. You can enable this feature from the Control Panel >> Account Features page. For more information about enabling features, see Managing Account Features.
MFOC is supported in the following use cases:
When Spark writes data on S3 for a pure data source table with static partitioning.
When Spark writes data on S3 for a pure data source table with dynamic partitioning enabled.
MFOC is not supported in the following use cases:
Writing to Hive data source tables.
Writing to non S3 cloud stores.
The following table lists the advantages and disadvantages of using MFOC.
Advantages |
Disadvantages |
---|---|
|
|
Handling Skew in the Join
To handle skew in the join keys, you can specify the hint ` /*+ SKEW ('<table_name>') */ `
for a join that describes the column and the values upon which skew is expected.
Based on that information, the engine automatically ensures that the skewed values are handled appropriately.
You can specify the hint in the following formats:
Format 1:
/*+ SKEW('<tableName>') */
This shows that all the columns in a given table are skewed and the value on which they are skewed is not known. With this hint, the Spark optimizer tries to identify the values on which the column involved in the join is skewed. This operation is performed when the Spark optimizer identifies that a column is involved in the join and then it samples the data on the table.
Example: In a query, suppose there is a table t1 where all columns involved in the join are skewed. But the skew values are unknown. In this case, you can specify the skew hint as
` /*+ SKEW('t1') */ `
.Format 2:
/*+ SKEW ('<tableName>', (<COLUMN-HINT>), (<ANOTHER-COLUMN-HINT>)) */
<COLUMN-HINT> can be either a column name (example,
column1
) or a column name and list of values on which the column is skewed (example -column1
,('a', 'b', 'c')
). The Spark optimizer identifies the skew values from the hint. As a result, the sampling of data is not required.Example: Suppose there is a table t1 with 4 columns - c1, c2, c3, c4. Consider that c1 is skewed on value ‘a’ and ‘b’, c2 and c3 are also skewed but the skew values are unknown, and c4 is not a skewed column. In this case, you can specify the hint as
` /*+ SKEW('t1', ('c1', ('a', 'b')), ('c2'), ('c3')) */ `
.
Example Query
SELECT /*+ SKEW('t1', ('c1', ('a', 'b')), ('c2'), ('c3')) */ *
FROM
(SELECT t2.c1 as temp_col1 from t1 join t2 on t1.c1 = t2.c1) temp_table1 JOIN
(SELECT t3.c2 as temp_col2 from t1 join t3 on t1.c2 = t3.c2) temp_table2
WHERE temp_table1.temp_col1 = temp_table2.temp_col2 .
Optimizing Query Execution with Adaptive Query Execution
Spark on Qubole supports Adaptive Query Execution on Spark 2.4.3 and later versions, with which query execution is optimized at the runtime based on the runtime statistics.
At runtime, the adaptive execution mode can change shuffle join to broadcast join if the size of one table is less than the broadcast threshold. Spark on Qubole Adaptive execution also supports handling skew in input data, and optimizes the joins using Qubole skew join optimization. In general, adaptive execution decreases the effort involved in tuning SQL query parameters, and improves the execution performance by selecting a better execution plan and parallelism at runtime.
Handling Eventual Consistency Failures in Spark FileOutputCommitter Jobs (AWS)
Spark does not honor DFOC when appending Parquet files, and thus it is forced to use FileOutputCommitter. To handle
eventual consistency errors during FileOutputCommiter data writes to S3, and increase the speed of data writes,
Qubole provides the configurable option spark.hadoop.mapreduce.use.parallelmergepaths
for Spark jobs. Set
the option to true
and FileOutputCommitter version to 1
.
The equivalent parameter to set in Hadoop jobs with Parquet data is mapreduce.use.parallelmergepaths
. For more
information, see Handling Eventual Consistency Failures in Hadoop FileOutputCommitter Jobs (AWS).
Configuring the Spark External Shuffle Service
The Spark external shuffle service is an auxiliary service which runs as part of the Yarn NodeManager on each worker node in a Spark cluster. When enabled, it maintains the shuffle files generated by all Spark executors that ran on that node.
Spark executors write the shuffle data and manage it. If the Spark external shuffle service is enabled, the shuffle service manages the shuffle data, instead of the executors. This helps in downscaling the executors, because the shuffle data is not lost when the executors are removed. It also helps improve the behavior of the Spark application in case of error because the shuffle data does not need to be re-processed when an executor crashes.
In open-source Spark, Spark job-level autoscaling (also known as Spark Dynamic Allocation) works in tandem with the external shuffle service and the shuffle service is mandatory for autoscaling to work. See Spark Shuffle Behavior for more information. In Spark on Qubole, on the other hand, the external shuffle service is optional and Qubole-based Spark job-level autoscaling works whether or not the shuffle service is enabled. (If the external shuffle service is disabled, the executors are not removed until the shuffle data goes away.)
Qubole provides the Spark external shuffle service in in Spark 1.5.1 and later supported Spark versions.
The external shuffle service is enabled by default in Spark 1.6.2 and later versions.
To disable it, set spark.shuffle.service.enabled
to false
.
Spark external shuffle service is not enabled by default in Spark 1.5.1 and Spark 1.6.0. To enable it for one of these versions, configure it as follows:
Override Hadoop Configuration Variables
Before starting a Spark cluster, pass the following Hadoop overrides to start Spark external shuffle service:
yarn.nodemanager.aux-services=mapreduce_shuffle,spark_shuffle yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService
See Advanced Configuration: Modifying Hadoop Cluster Settings for setting the Hadoop override configuration variables in the QDS UI.
Spark Configuration Variable
Set the configuration to enable external shuffle service on a Spark application, a Spark cluster or a Spark notebook.
Enabling External Shuffle Service on a Spark Cluster
Set the following configuration in the Override Spark Configuration Variables text box of the cluster configuration page:
spark-defaults.conf: spark.shuffle.service.enabled true
See Configuring a Spark Cluster for more information.
Note
If you set
spark.shuffle.service.enabled
tofalse
, then the Spark application does not use the external shuffle service.Enabling External Shuffle Service on a Spark Command
Configure the following setting as a Spark-submit option in the command/query composer while composing a Spark application:
--conf spark.shuffle.service.enabled=true
See Composing Spark Commands in the Analyze Page for more information.
For example,
sqlContext.sql("select count(*) from default_qubole_memetracker").collect()
generates a lot of shuffle data. So, set--conf spark.shuffle.service.enabled=true
in the bin/spark-shellEnabling External Shuffle Service on a Spark Notebook
Add
spark.shuffle.service.enabled
as an interpreter setting and add its Value astrue
in a Spark notebook’s Interpreter. Bind the Spark Interpreter settings to the notebook that you use if it is not bound already. See Running Spark Applications in Notebooks and Understanding Spark Notebooks and Interpreters for more information.
External shuffle service logs are part of the NodeManager logs located at /media/ephemeral0/logs/yarn/yarn-nodemanager*.log
.
NodeManager logs are present on each worker node in the cluster.
Continuously Running Spark Streaming Applications
You can continuously run Spark streaming applications by setting the following parameters:
Set
yarn.resourcemanager.app.timeout.minutes=-1
as an Hadoop override at the Spark cluster level.To avoid all Spark streaming applications on a specific cluster from being timed out, set
spark.qubole.idle.timeout -1
as a Spark configuration variable in the Override Spark Configuration Variables text field of the Spark cluster configuration UI page. See Configuring a Spark Cluster for more information.
Using UDFs in Spark SQL
An UDF (user-defined function) is a way of adding a function to Spark SQL. It operates on distributed DataFrames and works row-by-row unless it is created as an user-defined aggregation function. Open-source Spark provides two alternative methods:
Using Hive functions
Using Scala functions
The following example uses Hive functions to add an UDF and use it in Spark SQL.
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
class SimpleUDFExample extends UDF {
def evaluate(input: Text) : Text = {
if (input == null) return null
return new Text("Hello " + input.toString)
}
}
object sqltest {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf())
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("create temporary function hello as 'SimpleUDFExample'")
val result = sqlContext.sql("""
select hello(name) from products_avro order by month, name, price
""")
result.collect.foreach(println)
}
}
For an example using Scala functions, see UDF Registration.
Improving Query Performance with Amazon S3 Select
Amazon S3 Select is integrated with Spark on Qubole to read S3-backed tables created on CSV and JSON files for improved performance.
This feature provides the following capabilities:
Automatic conversion: Spark on Qubole automatically converts Spark native tables or Spark datasets in CSV and JSON formats to S3 Select optimized format for faster and efficient data access.
Manual creation of tables: You can use S3 Select datasource to create tables on specific CSV and JSON data to improve performance.
To enable S3 Select automatic conversion, set the following flags to true
at query level or cluster level:
For Hive tables:
spark.sql.qubole.convertHiveToS3Select
For Spark tables:
spark.sql.qubole.convertSparkToS3Select
Enabling the Flags on a Cluster
Navigate to the Clusters page.
Click the Edit button next to required Spark cluster.
On the Edit Cluster Settings page navigate to Advanced Configuration.
Under the SPARK SETTINGS section, enter the following values in the Override Spark Configuration field:
spark.sql.qubole.convertHiveToS3Select=true spark.sql.qubole.convertSparkToS3Select=true
Click Update to save the changes.
Enabling the Flags for a Query
Depending on the UI you use for running the query, perform the appropriate action:
Analyze
Enter the following values in the Spark Default Submit Command Line Options field.
spark.sql.qubole.convertHiveToS3Select=true spark.sql.qubole.convertSparkToS3Select=true
Notebooks
Enter the following values in the paragraph:
sql("SET spark.sql.qubole.convertHiveToS3Select=true") sql("SET spark.sql.qubole.convertSparkToS3Select=true")
Note
If you want to enable S3 Select automatic conversion on your account, create a ticket with Qubole Support.
Creating Tables or Dataframes using S3 Select Datasource
If you want to use S3 Select only for some tables, you can use the S3 Select datasource to create tables on CSV and JSON data for improved performance by using the following commands.
SQL
CREATE TABLE s3select USING "org.apache.spark.sql.execution.datasources.s3select" LOCATION s3://bucket/filename OPTIONS()
Scala
spark.read.format("org.apache.spark.sql.execution.datasources.s3select").load()
Generic Options
Option Name |
Default Value |
Other Possible Values |
---|---|---|
format |
CSV |
JSON |
compression |
none |
GZIP, BZIP2 |
CSV Specific Options
Option Name |
Default Value |
---|---|
header |
false |
delimiter |
|
lineDelimiter |
|
quote |
|
escape |
|
comment |
|
JSON Specific Options
Option Name |
Default Value |
Other Possible Values |
---|---|---|
jsonType |
Document |
Line |
The following example shows how to read data using S3 Select.
val df = spark.read.format("org.apache.spark.sql.execution.datasources.s3select").option("fileFormat", "csv").load("s3://bucket/table/")
df.select($"id").take(10)
Limitations using S3 Select
Compressed tables or files are not supported for automatic conversion of CSV and JSON data.
File splitting is not supported.
S3 Select does not support complex types such as arrays and objects in JSON.
If serde or compression of the data changes over the partitions, S3 Select fails as it is costly to check all the partitions serdes or check for compression of all the files.