Qubole Operator Examples

For Qubole Operator API information, see Understanding the Qubole Operator API.

For a real ETL use case using Qubole Operator example, see Readme.

The following examples illustrate the use of the Qubole Operator.

# Importing Qubole Operator in DAG
from airflow.contrib.operators.qubole_operator import QuboleOperator

# Hive Command - Inline query, Bonus - Attaching command tags & qubole connection id
QuboleOperator(
  task_id='hive_inline',
  command_type='hivecmd',
  query='show tables',
  cluster_label='default',
  tags='aiflow_example_run',  # Attach tags to Qubole command, auto attaches 3 tags - dag_id, task_id, run_id
  qubole_conn_id='qubole_default',  # Connection ID to submit commands inside QDS, if not set **qubole_default** is used
  dag=dag)

# Hive Command - S3 Script location, Bonus - Qubole Macros, Email Notifications
QuboleOperator(
    task_id='hive_s3_location',
    command_type="hivecmd",
    script_location="s3://public-qubole/qbol-library/scripts/show_table.hql",
    notify=True, # Sends email on the command completion, either success or failure, notification settings as set in the Qubole account.
    # Escape the macro values.
    macros='[{"date": "\\"{{ ds }}\\""}, {"name" : "\\"abc\\""}]', # Applies Qubole Macros to s3 script.
    tags=['tag1', 'tag2'],
    dag=dag)

# Hive Command - Add Hive Resources
QuboleOperator(
    task_id='hive_add_jar',
    command_type='hivecmd',
    query='ADD JAR s3://paid-qubole/jars/json-serde/json-serde-1.1.7.jar',
    cluster_label='default',
    dag=dag)
# For Azure:
QuboleOperator(
    task_id='hive_add_jar',
    command_type='hivecmd',
    query='ADD JAR wasb://default-datasets@paidqubole.blob.core.windows.net/jars/json-serde/json-serde-1.1.7.jar',
    cluster_label='default',
    dag=dag)

# Shell Command - S3 Script Location with arguments
QuboleOperator(
    task_id='shell_cmd',
    command_type="shellcmd",
    script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
    parameters="param1 param2",
    dag=dag)

# Shell Command - Inline query with files to copy in working directory
QuboleOperator(
    task_id='shell_cmd',
    command_type="shellcmd",
    script="hadoop dfs -lsr s3://paid-qubole/",
    files="s3://paid-qubole/ShellDemo/data/excite-small.sh,s3://paid-qubole/ShellDemo/data/excite-big.sh",
    dag=dag)
# For Azure:
QuboleOperator(
   task_id='shell_cmd',
   command_type="shellcmd",
   script="hadoop dfs -lsr wasb://default-datasets@paidqubole.blob.core.windows.net",
   files="wasb://default-datasets@paidqubole.blob.core.windows.net/ShellDemo/data/excite-small.sh,wasb://default-datasets@paidqubole.blob.core.windows.net/ShellDemo/data/excite-big.sh",

# Pig Command with s3 script location and arguments
QuboleOperator(
    task_id='pig_cmd',
    command_type="pigcmd",
    script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
    parameters="key1=value1 key2=value2", # Note these are space separated
    dag=dag)

# Presto Command - Inline
QuboleOperator(
    task_id='presto_cmd',
    command_type='prestocmd',
    query='select * from default.default_qubole_airline_origin_destination limit 5',
    dag=dag)

# Hadoop Command - Inline Custom Jar command
QuboleOperator(
    task_id='hadoop_jar_cmd',
    command_type='hadoopcmd',
    sub_command='jar s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar -mapper wc -numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/data/3.tsv -output s3://paid-qubole/HadoopAPITests/data/3_wc',
    cluster_label='default',
    dag=dag)
# For Azure:
QuboleOperator(
     task_id='hadoop_jar_cmd',
     command_type='hadoopcmd',
     sub_command='jar wasb://default-datasets@paidqubole.blob.core.windows.net/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar -mapper wc -numReduceTasks 0 -input wasb://default-datasets@paidqubole.blob.core.windows.net/HadoopAPITests/data/3.tsv -output wasb://default-datasets@paidqubole.blob.core.windows.net/HadoopAPITests/data/3_wc',
     cluster_label='default',
     dag=dag)


# DbTap Query
QuboleOperator(
    task_id='db_query',
    command_type='dbtapquerycmd',
    query='show tables',
    db_tap_id="2064",
    dag=dag)

# Db Export Command in Mode 1 - Simple Mode
QuboleOperator(
    task_id='db_export',
    command_type='dbexportcmd',
    mode=1,
    hive_table='default_qubole_airline_origin_destination',
    db_table='exported_airline_origin_destination',
    partition_spec='dt=20110104-02',
    dbtap_id="2064",
    dag=dag)

# Db Export Command in Mode 2 - Advanced Mode
QuboleOperator(
    task_id='db_export_mode_2',
    command_type='dbexportcmd',
    mode=2,
    db_table='mydb.mydata',
    dbtap_id="10942",
    export_dir="s3://mybucket/mydata.csv",
    fields_terminated_by="\\0x9",
    dag=dag)
# For Azure:
  QuboleOperator(
    task_id='db_export_mode_2',
    command_type='dbexportcmd',
    mode=2,
    db_table='mydb.mydata',
    dbtap_id="10942",
    export_dir="wasb://<container_name>@<storage_account_name>/mydata.csv",
    fields_terminated_by="\\0x9",
    dag=dag)



# Db Import Command in Mode 1 - Simple Mode
QuboleOperator(
    task_id='db_import',
    command_type='dbimportcmd',
    mode=1,
    hive_table='default_qubole_airline_origin_destination',
    db_table='exported_airline_origin_destination',
    where_clause='id < 10',
    parallelism=2,
    dbtap_id="2064",
    dag=dag)

prog = '''
import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
    val x = random * 2 - 1
    val y = random * 2 - 1
    if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
    }
}
'''
# Spark Command - Scala Program
QuboleOperator(
    task_id='spark_cmd',
    command_type="sparkcmd",
    program=prog,
    language='scala',
    arguments='--class SparkPi',
    dag=dag)

# Spark Command - Run a Notebook
QuboleOperator(
     task_id='spark_cmd',
     command_type="sparkcmd",
     note_id="36995",
     qubole_conn_id='qubole_prod',
     arguments='{"name":"hello world"}',
     dag=dag)


# Db Import Command in Mode 2 - Advanced Mode
QuboleOperator(
    task_id = "db_import_mode_2" ,
    command_type = "dbimportcmd" ,
    mode = "2" ,
    extract_query = "select id, dt from mydb.mydata where $CONDITIONS and id < 10”,
    boundary_query="select min(id), max(id) from mydata",
    split_column=”id”,
    dbtap_id = "9531" ,
    hive_table = "myhivedb.mydata" ,
    parallelism = "1",
    db_table = "" , # Please note functionally this parameter is not needed in mode 2, but due to some bug this cannot be ignored, so you can set it as empty string
    dag = dag
)