Troubleshooting Spark Issues

When any Spark job or application fails, you should identify the errors and exceptions that cause the failure. You can access the Spark logs to identify errors and exceptions.

This topic provides information about the errors and exceptions that you might encounter when running Spark jobs or applications. You can resolve these errors and exceptions by following the respective workarounds.

You might want to use the Sparklens experimental open service tool that is available on http://sparklens.qubole.net to identify the potential opportunities for optimizations with respect to driver side computations, lack of parallelism, skew, etc. For more information about Sparklens, see the Sparklens blog.

Out of Memory Exceptions

Spark jobs might fail due to out of memory exceptions at the driver or executor end. When troubleshooting the out of memory exceptions, you should understand how much memory and cores the application requires, and these are the essential parameters for optimizing the Spark appication. Based on the resource requirements, you can modify the Spark application parameters to resolve the out of memory exceptions.

For more information about the resource allocation, Spark application parameters, and determining resource requirements, see An Introduction to Apache Spark Optimization in Qubole.

Driver Memory Exceptions

Exception due to Spark driver running out of memory

  • Description: When the Spark driver runs out of memory, exceptions similar to the following exception occur.

    Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table
    to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1
    or increase the spark driver memory by setting spark.driver.memory to a higher value
    
  • Resolution: Set an appropriate high value for the driver memory by using one of the following commands in Spark Submit Command Line Options on the Analyze page:

    • --conf spark.driver.memory= <XX>g

      OR

    • --driver-memory <XX>G

Job failures due to Application Master that launches the driver exceeds memory limits

  • Description: Spark job might fail when the Application Master (AM) that launches driver exceeds memory limit and is eventually terminated by Yarn. The following error occurs.

    Diagnostics: Container [pid=<XXXXX>,containerID=container_<XXXXXXXXXX>_<XXXX>_<XX>_<XXXXXX>] is running beyond physical memory limits.
    Current usage: <XX> GB of <XX> GB physical memory used; <XX> GB of <XX> GB virtual memory used. Killing container
    
  • Resolution: Set an appropriate high value for the driver memory by using one of the following commands in Spark Submit Command Line Options on the Analyze page:

    • --conf spark.driver.memory= <XX>g

      OR

    • --driver-memory <XX>G

    As a result, a higher value is set for the AM memory limit.

Executor Memory Exceptions

Exception due to executor running out of memory

  • Description: When the executor runs out of memory, the following exception might occur.

    Executor task launch worker for task XXXXXX ERROR Executor: Exception in task XX.X in stage X.X (TID XXXXXX)
    java.lang.OutOfMemoryError: GC overhead limit exceeded
    
  • Resolution: Set an appropriate high value for the executor memory by using one of the following commands in Spark Submit Command Line Options on the Analyze page:

    • --conf spark.executor.memory= <XX>g

      OR

    • --executor-memory <XX>G

FetchFailedException due to executor running out of memory

  • Description: When the executor runs out of memory, the following exception might occur.

    ShuffleMapStage XX (sql at SqlWrapper.scala:XX) failed in X.XXX s due to org.apache.spark.shuffle.FetchFailedException:
    failed to allocate XXXXX byte(s) of direct memory (used: XXXXX, max: XXXXX)
    
  • Resolution: From the Analyze page, perform the following steps in Spark Submit Command Line Options:

    1. Set an appropriate high value for the executor memory by using one of the following commands:

      • --conf spark.executor.memory= <XX>g

        OR

      • --executor-memory <XX>G

    2. Increase the number of shuffle partitions by using the following command: --spark.sql.shuffle.partitions

Executor container killed by YARN for exceeding memory limits

  • Description: When the container hosting the executor needs more memory for overhead tasks or executor tasks, the following error occurs.

    org.apache.spark.SparkException: Job aborted due to stage failure: Task X in stage X.X failed X times,
    most recent failure: Lost task X.X in stage X.X (TID XX, XX.XX.X.XXX, executor X): ExecutorLostFailure
    (executor X exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. XX.X GB
    of XX.X GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
    
  • Resolution: Set a higher value for spark.yarn.executor.memoryOverhead based on the requirements of the job. The executor memory overhead value increases with the executor size (approximately by 6-10%). As a best practice, modify the executor memory value accordingly.

    Set an appropriate high value for the executor memory overhead by using the following command in Spark Submit Command Line Options on the Analyze page: --conf spark.yarn.executor.memoryOverhead=XXXX

    Note

    For Spark 2.3 and later versions, use the new parameter spark.executor.memoryOverhead instead of spark.yarn.executor.memoryOverhead.

    If increasing the executor memory overhead value or executor memory value does not resolve the issue, you can either use a larger instance or reduce the number of cores by using the following command in Spark Submit Command Line Options on the Analyze page: --executor-cores=XX. Reducing number of cores might lead to memory wastage but the job is executed.

Spark job repeatedly fails

  • Description: When the cluster is fully scaled and the cluster is not able to manage the job size, spark job might fail repeatedly.
  • Resolution: Run the Sparklens tool to analyze the job execution and optimize the configuration accordingly. For more information about Sparklens, see the Sparklens blog.

FileAlreadyExistsException in Spark jobs

  • Description: The FileAlreadyExistsException error occurs in the following scenarios:

    • Failure of the previous task might leave some files that trigger the FileAlreadyExistsException errors as shown below.

      org.apache.spark.SparkException: Task failed while writing rows
      at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
      at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
      at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      ... 1 more
      Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: s3://xxxxxx/xxxxxxx/xxxxxx/analysis-results/datasets/model=361/dataset=encoded_unified/dataset_version=vvvvv.snappy.parquet already exists
      at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:806)
      at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:914)
      
    • When the executor runs out of memory, the individual tasks of that executor are scheduled on another executor. As a result, the FileAlreadyExistsException error occurs.

    • When any Spark executor fails, Spark retries to start the task, which might result into FileAlreadyExistsException error after the maximum number of retries.

      A sample original executor failure reason is shown below.

      Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 3 times,
      most recent failure: Lost task 1.3 in stage 2.0 (TID 7, ip-192-168-1- 1.ec2.internal, executor 4): ExecutorLostFailure
      (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits.
      14.3 GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
      

      Note

      In case of DirectFileOutputCommitter (DFOC) with Spark, if a task fails after writing files partially, the subsequent reattempts might fail with FileAlreadyExistsException (because of the partial files that are left behind). Therefore, the job fails. Set the spark.hadoop.mapreduce.output.textoutputformat.overwrite and spark.qubole.outputformat.overwriteFileInWrite flags to true to prevent such job failures.

  • Resolution:

    1. Identify the original executor failure reason that causes the FileAlreadyExistsException error.
    2. Verify size of the nodes in the clusters.
    3. Upgrade them to the next tier to increase the Spark executor’s memory overhead.

Spark Shell Command failure

  • Description: When a spark application is submitted through shell command on QDS, it might fail with the following error.

    Qubole > Shell Command failed, exit code unknown
    2018-08-02 12:43:18,031 WARNING shellcli.py:265 - run - Application failed or got killed
    

    In this case, the actual reason that kills the application is hidden and you might not able to find the reason on the logs directly.

  • Resolution:

    1. Navigate to the Analyze page
    2. Click on the Resources tab to analyze the errors and perform the appropriate action.
    3. Run the job again by using the Spark Submit Command Line Options on the Analyze page.

Error when the total size of results is greater than the Spark Driver Max Result Size value

  • Description: When the total size of results is greater than the Spark Driver Max Result Size value, the following error occurs.

    org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of x tasks (y MB) is bigger
    than spark.driver.maxResultSize (z MB)
    
  • Resolution: Increase the Spark Drive Max Result Size value by modifying the --conf spark.driver.maxResultSize value in Spark Submit Command Line Options on the Analyze page.

Too Large Frame error

  • Description: When the size of the shuffle data blocks exceeds the limit of 2 GB, which spark can handle, the following error occurs.

    org.apache.spark.shuffle.FetchFailedException: Too large frame: XXXXXXXXXX
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:513)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444)
    
    
    
    Caused by: java.lang.IllegalArgumentException: Too large frame: XXXXXXXXXX
    at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
    at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
    
  • Resolution: Perform one of the following steps to resolve this error:

    Solution 1:

    1. Run the job on Spark 2.2 or higher version because Spark 2.2 or higher handles this issue in a better way when compared to other lower versions of Spark. For information, see https://issues.apache.org/jira/browse/SPARK-19659.
    2. Use the following Spark configuration:
      1. Modify the value of spark.sql.shuffle.partitions from default 200 to a value greater than 2001.
      2. Set the spark.default.parallelism value as same as the spark.sql.shuffle.partitions value.

    Solution 2:

    1. Identify the DataFrame that is causing the issue.
      1. Add a spark action(for instance, df.count()) after creating a new DataFrame.
      2. Print anything to check the DataFrame.
      3. If the print statement is not executed for a DataFrame, then the issue is with that DataFrame.
    2. After the DataFrame is identified, repartition the DataFrame by using df.repartition() and then cache it by using df.cache().
    3. If there is skewness in the data and you are using Spark version earlier than 2.2, then modify the code.

Spark jobs fail due to compilation failures

When you run a Spark program or application from the Analyze page, the code is compiled and then submitted for the execution. If there are syntax errors, or the jars or classes are missing, then the jobs might fail during compilation or runtime.

  • Description: If there are any errors in the syntax then the job might fail even before the job is submitted because of compile issues.

    The following figure shows a syntax error in a Spark program written in Scala.

    ../../_images/ScalaSyntaxError.png
  • Resolution: Check the code for any syntax errors and rectify the syntax. Rerun the program.

    The following figure shows a Spark job that ran successfully and displayed results.

    ../../_images/ScalaPositiveResult.png
  • Description: class/jar not found error occurs when a Spark program that you run uses a functionality in a jar that is not available in the Spark program’s classpath. When the program is compiled if the class/jar is not found then the error occurs during the compilation time. When the program is compiled locally and then submitted for execution, then the missing class/jar results into runtime error.

    The following figure shows examples of class not found error.

    ../../_images/SparkClassNotFoundError.png
  • Resolution: Add the dependent classes and jars and rerun the program. See Specifying Dependent Jars for Spark Jobs