Understanding the Spark Job Server

Qubole provides a Spark Job Server that enables sharing of Resilient Distributed Datasets (RDDs) in a Spark application among multiple Spark jobs. This enables use cases where you spin up a Spark application, run a job to load the RDDs, then use those RDDs for low-latency data access across multiple query jobs. For example, you can cache multiple data tables in memory, then run Spark SQL queries against those cached datasets for interactive ad-hoc analysis.

Besides this, you can also use the Job Server to reduce end-to-end latencies of small unrelated Spark jobs. In our tests, we noticed that using the Job Server brought end-to-end latencies of very small Spark jobs down from more than a minute to less than 10 seconds. The major reason for this performance improvement is that in case of the Job Server, you already have a Spark application running to which you submit the SQL query or Scala/Python snippet. On the other hand, without the Job Server, each SQL query or Scala/Python snippet submitted to Qubole’s API would start its own application. This happens because the API was designed to run standalone applications.

The following section describes how you can interact with the Spark Job Server using Qubole’s Python SDK. Spark Job Server support has been added in SDK version 1.8.0. So, you must update the SDK to that version or to a later version.

Qubole’s Spark Job Server is backed by Apache Zeppelin. The main abstraction in the Spark Job Server API is an app. It is used to store the configuration for the Spark application. In Zeppelin terms, an app is a combination of a notebook plus an interpreter.

How to use the Spark Job Server

Note

Ensure to upgrade qds-sdk-py to the latest version before creating an APP.

Create a new Spark APP and test it as shown in this example.

cd venvs/qds-sdk-py/qds-sdk-py/bin

//Upgrade QDS Python SDK
pip install --upgrade qds-sdk

//Listing clusters
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv cluster list

//Creating an APP
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv app create --name spark1-app --config spark.executor.memory=3g

//Listing Apps
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv app list

//Testing SQL
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv sparkcmd run --sql 'SELECT count(*) FROM default_qubole_memetracker' --cluster-label spark --app-id 343

Example: Running a Scala Job for Calculating the Pi Value

The following examples shows how to split a Scala job into 2 jobs (p1.scala and p2.scala). The Spark Job Server uses the result from the p1.scala job to print the Pi value as part of the second job, p2.scala.

//Run this job and the Spark job server loads RDDs that are used for low-latency data access across multiple query jobs.
p1.scala
import scala.math.random
import org.apache.spark._
val slices = 6
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
//Run this job and the Job server will substitute the values from the RDDs loaded after executing the ``p1.scala`` job.
p2.scala
println("Pi is roughly " + 4.0 * count / n)

Call the two jobs as shown below.

//Calling p1.scala
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv sparkcmd run --script_location=scala-scripts/p1.scala
--cluster-label spark --app-id 346
//Calling p2.scala
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv sparkcmd run --script_location=scala-scripts/p2.scala
--cluster-label spark --app-id 346

When an app is created, it is in the DOWN state and is not associated with any cluster. So, it can be run on any cluster. An app’s state changes to UP when you submit a command to it and specify a cluster label on which to run the command. As long as the app is in UP state, it remains associated with the cluster on which it was started. You can submit a command to an app by specifying the globally unique app ID (creating an app returns the unique app ID) and the cluster label where the app is running or yet to be run. The following command is an example.

$ qds.py sparkcmd run --script_location=some-spark-snippet.py --cluster-label spark --app-id 3

When a command is run on an app, the state of the cluster and app can vary as mentioned below:

  • The state of the cluster and app can both be DOWN. In this case, Qubole starts the cluster and later starts the app on this running cluster, and submits the snippet to the app.
  • When the cluster is running and only the app is DOWN, Qubole starts the app on this running cluster and submits the snippet to the app.
  • When the cluster and app are both UP, Qubole submits the snippet to the app.
  • When the app is UP but on a different cluster, an error message is displayed.

You can continue to submit multiple commands to the app and get results quickly. For example, the following command can also be submitted.

$ qds.py sparkcmd run --sql 'select count(*) from default_qubole_memetracker' --cluster-label spark --app-id 3

When you are done with submitting commands, you can mark the app as DOWN using the following command:

$ qds.py app stop 3

The app will get restarted when you submit another command to it.

When a cluster is terminated, all apps associated with it are automatically marked as DOWN.

You can list all the apps in an account using the following command:

$ qds.py app list

Response

[
    {
        "status": "DOWN",
        "kind": "spark",
        "note_id": null,
        "name": "app-with-beefy-executors",
        "interpreter_id": null,
        "created_at": "2015-10-30T23:42:15Z",
        "qbol_user_id": 1157,
        "cluster_label": null,
        "config": "{\"spark.executor.memory\":\"20g\",\"spark.executor.cores\":\"4\"}",
        "id": 3
    },
    {
        "status": "UP",
        "kind": "spark",
        "note_id": "2B4S9FQKS1446057961459",
        "name": "concurrent-sql",
        "interpreter_id": "2B5NE7CKT1446057961437",
        "created_at": "2015-10-31T18:45:05Z",
        "qbol_user_id": 1157,
        "cluster_label": "spark",
        "config": "{\"zeppelin.spark.concurrentSQL\":\"true\"}",
        "id": 5
    }
]

You can view a particular app using the following command.

$ qds.py app show 3

Response

{
    "status": "DOWN",
    "kind": "spark",
    "note_id": null,
    "name": "app-with-beefy-executors",
    "interpreter_id": null,
    "created_at": "2015-10-30T23:42:15Z",
    "qbol_user_id": 1157,
    "cluster_label": null,
    "config": "{\"spark.executor.memory\":\"20g\",\"spark.executor.cores\":\"4\"}",
    "id": 3
}

You can list, stop, and delete a Spark App as shown below.

//List the Spark Apps
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv app list

//Stop a Spark App by specifying its ID
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv app stop 343

//Delete a Spark App by specifying its ID
qds.py --token=API-TOKEN --url=https://api.qubole.com/api --vv app delete 343

Performing ELB Log Analysis

Here is an example, which shows how you can use the Spark Job Server to do ELB log analysis:

  1. Create a new app, which allows concurrent execution of SQL queries.

    $ qds.py app create --name elb-log-analysis-demo --config zeppelin.spark.concurrentSQL=true
    
  2. Submit elb-parsing-definitions.py script to this app ID.

    $ qds.py sparkcmd run --script_location=elb-parsing-definitions.py --cluster-label spark --app-id 3
    
  3. Submit the elb-log-location.py script to this app ID. This specifies the ELB log location and registers the cached data as a temporary table. You can execute this step multiple times to to cache different data locations in memory as different tables.

    $ qds.py sparkcmd run --script_location=elb-log-location.py --cluster-label spark --app-id 3
    
  4. Now that there is data in memory, it can be analyzed by running the following queries:

    $ qds.py sparkcmd run --sql 'select ssl_protocol, count(*) as cnt from logs group by ssl_protocol order by cnt desc' --cluster-label spark --app-id 3
    
    $ qds.py sparkcmd run --sql 'select * from logs where backend_status_code="500"' --cluster-label spark --app-id 3
    
    $ qds.py sparkcmd run --sql 'select * from logs order by backend_processing_time desc limit 10' --cluster-label spark --app-id 3
    

All these queries would return quickly because they use in-memory data.

But it is important to note that you can run any other unrelated query or program and even that would also return quickly because it would execute against an already running app. So, for example, you can run the following command:

$ qds.py sparkcmd run --sql 'select count(*) from default_qubole_memetracker' --cluster-label spark --app-id 3

The above query would return quickly as well.