What Is Spark Streaming?

Spark Streaming allows you to use Spark for stream processing. You write a streaming job the same way as you would write a Map job. At execution time, Spark breaks the input stream into a series of small jobs and runs them in batches. Inputs can come from sources such as HDFS, Kafka, Kinesis, Flume, and others. A typical output destination would be a file system, a database, or a dashboard.

Running Spark Streaming on QDS

You can run Spark Streaming jobs on a Qubole Spark cluster. Here are some points to consider:

  • Qubole recommends Spark on Yarn in client mode for Streaming; this is the QDS default.

  • You should not run a Spark Streaming job directly from the Analyze page in QDS (because Analyze jobs have a 36-hour time limit); run it from a Notebook.

    • When working in a Notebook, wrap all the code in an object so that temporary class names do not break checkpointing.
    • To ensure your streaming application runs indefinitely (when run from a Notebook), set:
      • yarn.resourcemanager.app.timeout.minutes=1 as an Hadoop override
      • spark.qubole.idle.timeout to -1 as both Interpreter and Spark override
  • If your application needs to receive multiple streams of data in parallel, create multiple input DStreams. This will create multiple receivers which will simultaneously receive multiple data streams. (But note the points about sizing and resources that follow.)

  • QDS supports basic version of open-source streaming dynamic allocation for Spark streaming applications. The open-source streaming dynamic allocation is available only in Spark 2.0 and later versions. Based on the processing time required by the tasks, executors are added/downscaled. However, executors with receivers are never removed. By default, autoscaling is disabled on Spark streaming applications. Set spark.streaming.dynamicAllocation.enabled=true to enable autoscaling in Spark streaming applications.

    Note

    Ensure to set spark.dynamicAllocation.enabled=false when you set spark.streaming.dynamicAllocation.enabled=true as an appropriate error is thrown if both the settings are enabled.

    These practices are recommended for better autoscaling:

    • It is best to start with a fairly large cluster and number of executors and scale down if necessary. (An executor maps to a YARN container.)
    • The number of executors should be at least equal to the number of receivers.
    • Set the number of cores per executor such that the executor has some spare capacity over and above what’s needed to run the receiver.
    • The total number of cores must be greater than the number of receivers; otherwise the application will not be able to process the data it receives.
  • Setting spark.streaming.backpressure.enabled to true allows Spark Streaming to control the receiving rate (on the basis of current batch scheduling delays and processing times) so that the system receives data only as fast as it can process it.

  • For the best performance, consider using the Kryo Serializer to convert between serialized and deserialized representations of Spark data. This is not the Spark default, but you can change it explicitly: set the spark.serializer property to org.apache.spark.serializer.KryoSerializer.

  • Developers can reduce memory consumption by un-caching DStreams when they are no longer needed.