Submit a Spark Command
- POST /api/v1.2/commands/
Use this API to submit a Spark command.
Required Role
The following users can make this API call:
Users who belong to the system-user or system-admin group.
Users who belong to a group associated with a role that allows submitting a command. See Managing Groups and Managing Roles for more information.
Parameters
Note
Parameters marked in bold below are mandatory. Others are optional and have default values.
Parameter |
Description |
---|---|
program |
Provide the complete Spark Program in Scala, SQL, Command, R, or Python. |
language |
Specify the language of the program. The supported values are Note Values are case-sensitive. |
script_location |
Specify an S3 path where the Spark query (Scala, Python, SQL, R, and Command Line) script is stored. AWS storage credentials stored in the account are used to retrieve the script file. Note
|
arguments |
Specify the spark-submit command line arguments here. You must not specify this parameter if |
user_program_arguments |
Specify the arguments that the user program takes in. |
cmdline |
You can also provide the spark-submit command line itself. If you use this option, you cannot use any other parameters mentioned here. All required information and configuration should be specified in the command line itself. |
command_type |
SparkCommand |
label |
Specify the cluster label on which this command is to be run. |
retry |
Denotes the number of retries for a job. Valid values of |
retry_delay |
Denotes the time interval between the retries when a job fails. The unit of measurement is minutes. |
app_id |
ID of an app, which is a main abstraction of the Spark Job Server API. An app is used to store the configuraton for a Spark application. See Understanding the Spark Job Server for more information. |
name |
Add a name to the command that is useful while filtering commands from the command history. It does not accept & (ampersand), < (lesser than), > (greater than), “ (double quotes), and ‘ (single quote) special characters, and HTML tags as well. It can contain a maximum of 255 characters. |
tags |
Add a tag to a command so that it is easily identifiable and searchable from the commands list in the Commands History. Add a tag as a filter value while searching commands.
It can contain a maximum of 255 characters. A comma-separated list of tags can be associated with a single command. While adding a tag value, enclose it in square brackets. For example,
|
macros |
Denotes the macros that are valid assignment statements containing the variables and its expression as: |
pool |
Use this parameter to specify the Fairscheduler pool name for the command to use. |
timeout |
The timeout for command execution that you can set in seconds. Its default value is 129600 seconds (36 hours). QDS checks the timeout for a command every 60 seconds. If the timeout is set for 80 seconds, the command gets killed in the next minute that is after 120 seconds. By setting this parameter, you can avoid the command from running for 36 hours. |
Note
You can run Spark commands with large script file and large inline content.
You can use macros in script files for the Spark commands with subtypes
scala
(Scala),py
(Python),R
(R),command_line
(Command), andsql
(SQL). You can also use macros in large inline contents and large script files forscala
(Scala),py
(Python),R
(R), andsql
(SQL).
These features are not enabled for all users by default. Create a ticket with Qubole Support to enable these features on the QDS account.
Note
If you are submitting a Scala code that contains multiple lines, then you must escape every new line with the escape character \n
.
Examples
Examples are written in python and uses pyCurl. Using CURL directly is possible but hard as the program needs escaping. Also, JSON does not support new lines. To avoid confusion, these python API examples are provided which are clear and can be used directly.
Alternatively, you can use qds-sdk-py directly.
Example Python API Framework
import sys
import pycurl
import json
c= pycurl.Curl()
url="https://api.qubole.com/api/v1.2/commands"
auth_token = <provide auth token here>
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPHEADER, ["X-AUTH-TOKEN: "+ auth_token, "Content-Type:application/json", "Accept: application/json"])
c.setopt(pycurl.POST,1)
(After this, select any of the following examples depending on the requirement.)
The above code snippet can be used to make API calls. The following examples uses the above program as its base and shows various use-cases.
Example to Submit Spark Scala Program
prog = '''
import scala.math.random
import org.apache.spark._
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
'''
data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand"})
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
To submit a snippet to the Spark Job Server app, use the following data
payload instead of the above data
.
data=json.dumps({"program":prog,"language":"scala","arguments":"--class SparkPi", "command_type":"SparkCommand",
"label"="spark","app_id"="3"})
Where app_id
= Spark Job Server app ID. See Understanding the Spark Job Server for more information.
Example to Submit Spark Python Program
Here is the Spark Pi example in Python.
prog = '''
import sys
from random import random
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonPi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
sc.stop()
'''
data=json.dumps({"program":prog,"language":"python","command_type":"SparkCommand"})
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
Example to Add Spark Submit Options
Add arguments in JSON body to supply spark-submit options. You can pass remote files in an S3 location in
addition to the local files as values to the --py-files
argument.
data=json.dumps(
{"program":prog,
"language":"python", "arguments": "--num-executors 10 --max-executors 10 --executor-memory 5G --executor-cores 2"
"command_type":"SparkCommand"})
Example to Add Arguments to User Program
Add user_program_arguments in JSON body. Here is a sample program which takes in arguments (input and output location).
prog=
'''spark.range(args(0).toInt).collect.foreach (println)'''
data=json.dumps(
{"program":prog,
"language":"scala",
"user_program_arguments": "10",
"command_type":"SparkCommand",})
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
Example to Use Command Line Parameter
For power users, Qubole provides the ability to provide the spark-submit command line directly. This is explained in detail here.
Note
It is not recommended to run a Spark application as a Bash command under the Shell command options because automatic changes such as increase in the Application Coordinator memory based on the driver memory and debug options’ availability do not happen. Such automatic changes occur when you run a Spark application through the Command Line option.
In this case, you must compile the program (in case of Scala), create a jar, upload the file to S3 and invoke the command line. Note that Qubole’s deployment of Spark is available at the /usr/lib/spark directory:
/usr/lib/spark/bin/spark-submit [options] <app jar in s3 | python file> [app options]
Here is an example.
/usr/lib/spark/bin/spark-submit --class <classname> --max-executors 100 --num-executors 15 --driver-memory 10g
--executor-memory 3g --executor-cores 5 <jar_path_in-S3> <arguments>
Here is a REST API example to submit a Spark command in the command-line language.
curl -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{"cmdline":"/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi
--master yarn-client /usr/lib/spark/spark-examples-*", "language":"command_line", "command_type":"SparkCommand",
"label":"sparkcluster"}' \
"https://api.qubole.com/api/v1.2/commands"
Note
The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.
Example to Submit Spark Command in SQL
You can submit a Spark Command in SQL. Here is an example to submit a Spark Command in SQL.
curl -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
"sql":"select * from default_qubole_memetracker limit 10;",
"language":"sql","command_type":"SparkCommand", "label":"spark"
}' \
"https://api.qubole.com/api/v1.2/commands"
When submitting a Spark command in SQL, you can specify the location of a SparkSQL script in the script_location
parameter as shown in the following example.
curl -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{"script_location":"<S3 Path>", "language":"sql", "command_type":"SparkCommand", "label":"<cluster-label>"
}' \
"https://api.qubole.com/api/v1.2/commands"
Note
The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.
Example to Submit a Spark Command in SQL to a Spark Job Server App
You can submit a Spark command in SQL to an existing Spark Job Server app.
curl -i -X POST -H "X-AUTH-TOKEN: $AUTH_TOKEN" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
"sql":"select * from default_qubole_memetracker limit 10;",
"language":"sql","command_type":"SparkCommand", "label":"spark","app_id":"3"
}' \
"https://api.qubole.com/api/v1.2/commands"
Where app_id
= Spark Job Server app ID. See Understanding the Spark Job Server for more information.
Note
The above syntax uses https://api.qubole.com as the endpoint. Qubole provides other endpoints to access QDS that are described in Supported Qubole Endpoints on Different Cloud Providers.
Known Issue
The Spark Application UI might display an incorrect state of the application when Spot Instances are used. You can view the accurate status of the Qubole command in the Analyze or Notebooks page.
When the Spark application is running, if the coordinator node or the node that runs driver is lost, then the Spark Application UI might display an incorrect state of the application. The event logs are persisted to cloud storage from the HDFS location periodically for a running application. If the coordinator node is removed due to spot loss, then the cloud storage might not have the latest status of the application. As a result, the Spark Application UI might show the application in running state.
To avoid this issue, it is recommended to use an on-demand coordinator node.