Submit a Spark Command

POST /api/v1.2/commands/

Use this API to submit a Spark command.

Required Role

The following users can make this API call:

  • Users who belong to the system-user or system-admin group.
  • Users who belong to a group associated with a role that allows submitting a command. See Managing Groups and Managing Roles for more information.

Parameters

Note

Parameters marked in bold below are mandatory. Others are optional and have default values.

Parameter Description
program Provide the complete Spark Program in Scala, SQL, Command, R, or Python.
language

Specify the language of the program. The supported values are scala (Scala), sql (SQL), command_line (Command), R (R), or py (Python). Required only when a program is used.

Note

Values are case-sensitive.

script_location

Specify an S3 path where the Spark query (Scala, Python, SQL, R, and Command Line) script is stored. AWS storage credentials stored in the account are used to retrieve the script file.

Note

*.cmdline or *.command_line files are supported for Spark query as Command Line.

arguments Specify the spark-submit command line arguments here. You must not specify this parameter if cmdline is used because all the required arguments and configurations are passed in the command itself.
user_program_arguments Specify the arguments that the user program takes in.
cmdline You can also provide the spark-submit command line itself. If you use this option, you cannot use any other parameters mentioned here. All required information and configuration should be specified in the command line itself.
command_type SparkCommand
label Specify the cluster label on which this command is to be run.
retry Denotes the number of retries for a job. Valid values of retry are 1, 2, and 3.
retry_delay Denotes the time interval between the retries when a job fails. The unit of measurement is minutes.
app_id ID of an app, which is a main abstraction of the Spark Job Server API. An app is used to store the configuraton for a Spark application. See Understanding the Spark Job Server for more information.
name Add a name to the command that is useful while filtering commands from the command history. It does not accept & (ampersand), < (lesser than), > (greater than), ” (double quotes), and ‘ (single quote) special characters, and HTML tags as well. It can contain a maximum of 255 characters.
tags Add a tag to a command so that it is easily identifiable and searchable from the commands list in the Commands History. Add a tag as a filter value while searching commands. It can contain a maximum of 255 characters. A comma-separated list of tags can be associated with a single command. While adding a tag value, enclose it in square brackets. For example, {"tags":["<tag-value>"]}.
macros Denotes the macros that are valid assignment statements containing the variables and its expression as: macros: [{"<variable>":<variable-expression>}, {..}]. You can add more than one variable. For more information, see Macros.
pool Use this parameter to specify the Fairscheduler pool name for the command to use.
timeout The timeout for command execution that you can set in seconds. Its default value is 129600 seconds (36 hours). QDS checks the timeout for a command every 60 seconds. If the timeout is set for 80 seconds, the command gets killed in the next minute that is after 120 seconds. By setting this parameter, you can avoid the command from running for 36 hours.

Note

  • You can run Spark commands with large script file and large inline content.
  • You can use macros in script files for the Spark commands with subtypes scala (Scala), py (Python), R (R), command_line (Command), and sql (SQL). You can also use macros in large inline contents and large script files for scala (Scala), py (Python), R (R), and sql (SQL).

These features are not enabled for all users by default. Create a ticket with Qubole Support to enable these features on the QDS account.

Note

If you are submitting a Scala code that contains multiple lines, then you must escape every new line with the escape character \n .

Examples

Examples are written in python and uses pyCurl. Using CURL directly is possible but hard as the program needs escaping. Also, JSON does not support new lines. To avoid confusion, these python API examples are provided which are clear and can be used directly.

Alternatively, you can use qds-sdk-py directly.

Example Python API Framework

import sys
import pycurl
import json
c= pycurl.Curl()
url="https://api.qubole.com/api/v1.2/commands"
auth_token = <provide auth token here>
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPHEADER, ["X-AUTH-TOKEN: "+ auth_token, "Content-Type:application/json", "Accept: application/json"])
c.setopt(pycurl.POST,1)

(After this, select any of the following examples depending on the requirement.)

The above code snippet can be used to make API calls. The following examples uses the above program as its base and shows various use-cases.

Example to Submit Spark Scala Program

prog = '''
import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
'''
data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand"})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

To submit a snippet to the Spark Job Server app, use the following data payload instead of the above data.

data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand",
"label"="spark","app_id"="3"})

Where app_id = Spark Job Server app ID. See Understanding the Spark Job Server for more information.

Example to Submit Spark Python Program

Here is the Spark Pi example in Python.

prog = '''
import sys
from random import random
from operator import add

from pyspark import SparkContext


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    sc = SparkContext(appName="PythonPi")
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0

    count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
    print "Pi is roughly %f" % (4.0 * count / n)

    sc.stop()
'''
data=json.dumps({"program":prog,"language":"python","command_type":"SparkCommand"})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

Example to Add Spark Submit Options

Add arguments in JSON body to supply spark-submit options. You can pass remote files in an S3 location in addition to the local files as values to the --py-files argument.

data=json.dumps(
{"program":prog,
"language":"python", "arguments": "--num-executors 10 --max-executors 10 --executor-memory 5G --executor-cores 2"
"command_type":"SparkCommand"})

Example to Add Arguments to User Program

Add user_program_arguments in JSON body. Here is a sample program which takes in arguments (input and output location).

prog=
'''spark.range(args(0).toInt).collect.foreach (println)'''

data=json.dumps(
{"program":prog,
"language":"scala",
"user_program_arguments": "10",
"command_type":"SparkCommand",})

c.setopt(pycurl.POSTFIELDS, data)
c.perform()

Example to Use Command Line Parameter

For power users, Qubole provides the ability to provide the spark-submit command line directly. This is explained in detail here.

Note

It is not recommended to run a Spark application as a Bash command under the Shell command options because automatic changes such as increase in the Application Coordinator memory based on the driver memory and debug options’ availability do not happen. Such automatic changes occur when you run a Spark application through the Command Line option.

In this case, you must compile the program (in case of Scala), create a jar, upload the file to S3 and invoke the command line. Note that Qubole’s deployment of Spark is available at the /usr/lib/spark directory:

/usr/lib/spark/bin/spark-submit [options] <app jar in s3 | python file> [app options]

Here is an example.

/usr/lib/spark/bin/spark-submit --class <classname>  --max-executors 100 --num-executors 15 --driver-memory 10g
--executor-memory 3g --executor-cores 5 <jar_path_in-S3> <arguments>

Here is a REST API example to submit a Spark command in the command-line language.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{"cmdline":"/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi
     --master yarn-client /usr/lib/spark/spark-examples-*", "language":"command_line", "command_type":"SparkCommand",
     "label":"sparkcluster"}' \
      "https://api.qubole.com/api/v1.2/commands"

Note

The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.

Example to Submit Spark Command in SQL

You can submit a Spark Command in SQL. Here is an example to submit a Spark Command in SQL.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
      "sql":"select * from default_qubole_memetracker limit 10;",
      "language":"sql","command_type":"SparkCommand", "label":"spark"
    }' \
"https://api.qubole.com/api/v1.2/commands"

When submitting a Spark command in SQL, you can specify the location of a SparkSQL script in the script_location parameter as shown in the following example.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{"script_location":"<S3 Path>", "language":"sql", "command_type":"SparkCommand", "label":"<cluster-label>"
    }' \
"https://api.qubole.com/api/v1.2/commands"

Note

The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.

Example to Submit a Spark Command in SQL to a Spark Job Server App

You can submit a Spark command in SQL to an existing Spark Job Server app.

curl  -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
      "sql":"select * from default_qubole_memetracker limit 10;",
      "language":"sql","command_type":"SparkCommand", "label":"spark","app_id":"3"
    }' \
"https://api.qubole.com/api/v1.2/commands"

Where app_id = Spark Job Server app ID. See Understanding the Spark Job Server for more information.

Note

The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.

Known Issue

The Spark Application UI might display an incorrect state of the application when Spot Instances are used. You can view the accurate status of the Qubole command in the Analyze or Notebooks page.

When the Spark application is running, if the coordinator node or the node that runs driver is lost, then the Spark Application UI might display an incorrect state of the application. The event logs are persisted to cloud storage from the HDFS location periodically for a running application. If the coordinator node is removed due to spot loss, then the cloud storage might not have the latest status of the application. As a result, the Spark Application UI might show the application in running state.

To avoid this issue, it is recommended to use an on-demand coordinator node.