Submit a Spark Command

POST /api/v1.2/commands/

Use this API to submit a Spark command.

Required Role

The following roles can make this API call:

  • A user who is part of the system-user/system-admin group.
  • A user invoking this API must be part of a group associated with a role that allows submitting a command. See Managing Groups and Managing Roles for more information.

Parameters

Note

Parameters marked in bold below are mandatory. Others are optional and have default values.

Parameter Description
program Provide the complete Spark Program in Scala, SQL, Command, R, or Python.
language Specify the language of the program, Scala, SQL, Command, R, or Python. Required only when a program is used.
script_location Specify an S3 path where the SparkSQL script is stored. AWS storage credentials stored in the account are used to retrieve the script file.
arguments Specify the spark-submit command line arguments here.
user_program_arguments Specify the arguments that the user program takes in.
cmdline Alternatively, you can provide the spark-submit command line itself. If you use this option, you cannot use any other parameters mentioned here. All required information is captured in command line itself.
command_type SparkCommand
label Specify the cluster label on which this command is to be run.
app_id ID of an app, which is a main abstraction of the Spark Job Server API. An app is used to store the configuraton for a Spark application. See Understanding the Spark Job Server for more information.
name Add a name to the command that is useful while filtering commands from the command history. It does not accept & (ampersand), < (lesser than), > (greater than), ” (double quotes), and ‘ (single quote) special characters, and HTML tags as well. It can contain a maximum of 255 characters.
tags Add a tag to a command so that it is easily identifiable and searchable from the commands list in the Commands History. Add a tag as a filter value while searching commands. It can contain a maximum of 255 characters. A comma-separated list of tags can be associated with a single command. While adding a tag value, enclose it in square brackets. For example, {"tags":["<tag-value>"]}.
macros Denotes the macros that are valid assignment statements containing the variables and its expression as: macros: [{"<variable>":<variable-expression>}, {..}]. You can add more than one variable. For more information, see Using Macros in Qubole.
pool Use this parameter to specify the Fairscheduler pool name for the command to use.
timeout It is a timeout for command execution that you can set in seconds. Its default value is 129600 seconds (36 hours). QDS checks the timeout for a command every 60 seconds. If the timeout is set for 80 seconds, the command gets killed in the next minute that is after 120 seconds. By setting this parameter, you can avoid the command from running for 36 hours.

Examples

Examples are written in python and uses pyCurl. Using CURL directly is possible but hard as the program needs escaping. Also, JSON does not support new lines. To avoid confusion, these python API examples are provided which are clear and can be used directly.

Alternatively, you can use qds-sdk-py directly.

Example Python API Framework

import sys
import pycurl
import json
c= pycurl.Curl()
url="https://api.qubole.com/api/v1.2/commands"
auth_token = <provide auth token here>
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPHEADER, ["X-AUTH-TOKEN: "+ auth_token, "Content-Type:application/json", "Accept: application/json"])
c.setopt(pycurl.POST,1)

(After this, select any of the following examples depending on the requirement.)

The above code snippet can be used to make API calls. The following examples uses the above program as its base and shows various use-cases.

Example to Submit Spark Scala Program

prog = '''
import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
'''
data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand"})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

To submit a snippet to the Spark Job Server app, use the following data payload instead of the above data.

data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand",
"label"="spark","app_id"="3"})

Where app_id = Spark Job Server app ID. See Understanding the Spark Job Server for more information.

Example to Submit Spark Python Program

Here is the Spark Pi example in Python.

prog = '''
import sys
from random import random
from operator import add

from pyspark import SparkContext


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    sc = SparkContext(appName="PythonPi")
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0

    count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
    print "Pi is roughly %f" % (4.0 * count / n)

    sc.stop()
'''
data=json.dumps({"program":prog,"language":"python","command_type":"SparkCommand"})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

Example to Add Spark Submit Options

Add arguments in JSON body to supply spark-submit options. You can pass remote files in an S3 location in addition to the local files as values to the --py-files argument.

data=json.dumps(
{"program":prog,
"language":"python", "arguments": "--num-executors 10 --max-executors 10 --executor-memory 5G --executor-cores 2"
"command_type":"SparkCommand"})

Example to Add Arguments to User Program

Add user_program_arguments in JSON body. Here is a sample program which takes in arguments (input and output location).

prog='''
import org.apache.spark._
object testprogram {
  def main(args: Array[String]) {
      var sc = new SparkContext(new SparkConf())
      var fileToRead = args(0)//passed as args in user program
      var fileToWrite = args(1)
      var output = sc.textFile(fileToRead).saveAsTextFile(fileToWrite)
      println(output)
  }
}'''
data=json.dumps(
{"program":prog,
"language":"scala",
"arguments" "--class testprogram",
"user_program_arguments": "s3://bucket/path/to/source s3://bucket/path/to/destination",
"command_type":"SparkCommand",})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

Example to Use Command Line Parameter

For power users, Qubole provides the ability to provide the spark-submit command line directly. This is explained in detail here.

Note

It is not recommended to run a Spark application as a Bash command under the Shell command options because automatic changes such as increase in the Application Master memory based on the driver memory and debug options’ availability do not happen. Such automatic changes occur when you run a Spark application through the Command Line option.

In this case, you must compile the program (in case of Scala), create a jar, upload the file to S3 and invoke the command line. Note that Qubole’s deployment of Spark is available at the /usr/lib/spark directory:

/usr/lib/spark/bin/spark-submit [options] <app jar in s3 | python file> [app options]

Here is an example.

/usr/lib/spark/bin/spark-submit --class <classname>  --max-executors 100 --num-executors 15 --driver-memory 10g
--executor-memory 3g --executor-cores 5 <jar_path_in-S3> <arguments>

Here is a REST API example to submit a Spark command in the command-line language.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json"
-d '{"cmdline":"/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi
     --master yarn-client /usr/lib/spark/spark-examples-*", "language":"command_line", "command_type":"SparkCommand",
     "label":"sparkcluster"}' "https://api.qubole.com/api/v1.2/commands"

Note

The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.

Example to Submit Spark Command in SQL

You can submit a Spark Command in SQL. Here is an example to submit a Spark Command in SQL.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json"
-d '{
      "sql":"select * from default_qubole_memetracker limit 10;",
      "language":"sql","command_type":"SparkCommand", "label":"spark"
    }'
"https://api.qubole.com/api/v1.2/commands"

When submitting a Spark command in SQL, you can specify the location of a SparkSQL script in the script_location parameter as shown in the following example.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json"
-d '{"script_location":"<S3 Path>", "language":"sql", "command_type":"SparkCommand", "label":"<cluster-label>"
    }'
"https://api.qubole.com/api/v1.2/commands"

Note

The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.

Example to Submit a Spark Command in SQL to a Spark Job Server App

You can submit a Spark command in SQL to an existing Spark Job Server app.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json"
-d '{
      "sql":"select * from default_qubole_memetracker limit 10;",
      "language":"sql","command_type":"SparkCommand", "label":"spark","app_id":"3"
    }'
"https://api.qubole.com/api/v1.2/commands"

Where app_id = Spark Job Server app ID. See Understanding the Spark Job Server for more information.

Note

The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.

Known Issue

The Spark Application UI might display an incorrect state of the application when Spot Instances are used. You can view the accurate status of the Qubole command in the Analyze or Notebooks page.

When the Spark application is running, if the master node or the node that runs driver is lost, then the Spark Application UI might display an incorrect state of the application. The event logs are persisted to cloud storage from the HDFS location periodically for a running application. If the master node is removed due to spot loss, then the cloud storage might not have the latest status of the application. As a result, the Spark Application UI might show the application in running state.

To avoid this issue, it is recommended to use an on-demand master node.