Qubole Operator Examples

For Qubole Operator API information, see Qubole Operator API.

For a real ETL use case using Qubole Operator example, see Readme.

The following examples illustrate the use of the Qubole Operator.

# Importing Qubole Operator in DAG
from airflow.contrib.operators.qubole_operator import QuboleOperator

# Hive Command - Inline query, Bonus - Attaching command tags & qubole connection id
QuboleOperator(
  task_id='hive_inline',
  command_type='hivecmd',
  query='show tables',
  cluster_label='default',
  tags='aiflow_example_run',  # Attach tags to Qubole command, auto attaches 3 tags - dag_id, task_id, run_id
  qubole_conn_id='qubole_default',  # Connection ID to submit commands inside QDS, if not set **qubole_default** is used
  dag=dag)


# Hive Command - S3 Script location, Bonus - Qubole Macros, Email Notifications
QuboleOperator(
    task_id='hive_s3_location',
    command_type="hivecmd",
    script_location="s3://public-qubole/qbol-library/scripts/show_table.hql",
    notify=True, # Sends email on the command completion, either success or failure, notification settings as set in the Qubole account.
    # Escape the macro values.
    macros='[{"date": "\\"\\""}, {"name" : "\\"abc\\""}]', # Applies Qubole Macros to s3 script.
    tags=['tag1', 'tag2'],
    dag=dag)


# Hive Command - Add Hive Resources
QuboleOperator(
    task_id='hive_add_jar',
    command_type='hivecmd',
    query='ADD JAR s3://paid-qubole/jars/json-serde/json-serde-1.1.7.jar',
    cluster_label='default',
    dag=dag)
# For Azure:
QuboleOperator(
    task_id='hive_add_jar',
    command_type='hivecmd',
    query='ADD JAR wasb://[email protected]/jars/json-serde/json-serde-1.1.7.jar',
    cluster_label='default',
    dag=dag)

# Jupyter Notebook Command
QuboleOperator(
  task_id='jupyter_cmd',
  command_type="jupytercmd",
  cluster_label='default',
  path=<path/to/jupyternotebook/on/qds>, # Right click on the notebook in Jupyter and click Copy Path to get the path
  arguments='{"name":"hello world"}',
  dag=dag)

# Shell Command - S3 Script Location with arguments
QuboleOperator(
    task_id='shell_cmd',
    command_type="shellcmd",
    script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
    parameters="param1 param2",
    dag=dag)

# Shell Command - Inline query with files to copy in working directory
QuboleOperator(
    task_id='shell_cmd',
    command_type="shellcmd",
    script="hadoop dfs -lsr s3://paid-qubole/",
    files="s3://paid-qubole/ShellDemo/data/excite-small.sh,s3://paid-qubole/ShellDemo/data/excite-big.sh",
    dag=dag)
# For Azure:
QuboleOperator(
   task_id='shell_cmd',
   command_type="shellcmd",
   script="hadoop dfs -lsr wasb://[email protected]",
   files="wasb://[email protected]/ShellDemo/data/excite-small.sh,wasb://[email protected]/ShellDemo/data/excite-big.sh",

# Pig Command with s3 script location and arguments
QuboleOperator(
    task_id='pig_cmd',
    command_type="pigcmd",
    script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
    parameters="key1=value1 key2=value2", # Note these are space separated
    dag=dag)

# Presto Command - Inline
QuboleOperator(
    task_id='presto_cmd',
    command_type='prestocmd',
    query='select * from default.default_qubole_airline_origin_destination limit 5',
    dag=dag)

# Hadoop Command - Inline Custom Jar command
QuboleOperator(
    task_id='hadoop_jar_cmd',
    command_type='hadoopcmd',
    sub_command='jar s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar -mapper wc -numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/data/3.tsv -output s3://paid-qubole/HadoopAPITests/data/3_wc',
    cluster_label='default',
    dag=dag)
# For Azure:
QuboleOperator(
     task_id='hadoop_jar_cmd',
     command_type='hadoopcmd',
     sub_command='jar wasb://[email protected]/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar -mapper wc -numReduceTasks 0 -input wasb://[email protected]/HadoopAPITests/data/3.tsv -output wasb://[email protected]/HadoopAPITests/data/3_wc',
     cluster_label='default',
     dag=dag)


# DbTap Query
QuboleOperator(
    task_id='db_query',
    command_type='dbtapquerycmd',
    query='show tables',
    db_tap_id="2064",
    dag=dag)

# Db Export Command in Mode 1 - Simple Mode
QuboleOperator(
    task_id='db_export',
    command_type='dbexportcmd',
    mode=1,
    hive_table='default_qubole_airline_origin_destination',
    db_table='exported_airline_origin_destination',
    partition_spec='dt=20110104-02',
    dbtap_id="2064",
    use_customer_cluster="true",
    customer_cluster_label="default",
    dag=dag)

# Db Export Command in Mode 2 - Advanced Mode
QuboleOperator(
    task_id='db_export_mode_2',
    command_type='dbexportcmd',
    mode=2,
    db_table='mydb.mydata',
    dbtap_id="10942",
    export_dir="s3://mybucket/mydata.csv",
    fields_terminated_by="\\0x9",
    use_customer_cluster="true",
    customer_cluster_label="default",
    dag=dag)

# For Azure:
  QuboleOperator(
    task_id='db_export_mode_2',
    command_type='dbexportcmd',
    mode=2,
    db_table='mydb.mydata',
    dbtap_id="10942",
    export_dir="wasb://<container_name>@<storage_account_name>/mydata.csv",
    fields_terminated_by="\\0x9",
    use_customer_cluster="true",
    customer_cluster_label="default",
    dag=dag)



# Db Import Command in Mode 1 - Simple Mode
QuboleOperator(
    task_id='db_import',
    command_type='dbimportcmd',
    mode=1,
    hive_table='default_qubole_airline_origin_destination',
    db_table='exported_airline_origin_destination',
    where_clause='id < 10',
    parallelism=2,
    dbtap_id="2064",
    use_customer_cluster="true",
    customer_cluster_label="default",
    dag=dag)

prog = '''
import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
    val x = random * 2 - 1
    val y = random * 2 - 1
    if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
    }
}
'''
# Spark Command - Scala Program
QuboleOperator(
    task_id='spark_cmd',
    command_type="sparkcmd",
    program=prog,
    language='scala',
    arguments='--class SparkPi',
    dag=dag)

# Spark Command - Run a Notebook
QuboleOperator(
     task_id='spark_cmd',
     command_type="sparkcmd",
     note_id="36995",
     qubole_conn_id='qubole_prod',
     arguments='{"name":"hello world"}',
     dag=dag)


# Db Import Command in Mode 2 - Advanced Mode
QuboleOperator(
    task_id = "db_import_mode_2" ,
    command_type = "dbimportcmd" ,
    mode = "2" ,
    extract_query = "select id, dt from mydb.mydata where $CONDITIONS and id < 10”,
    boundary_query="select min(id), max(id) from mydata",
    split_column=”id”,
    dbtap_id = "9531" ,
    hive_table = "myhivedb.mydata" ,
    parallelism = "1",
    db_table = "" , # Please note functionally this parameter is not needed in mode 2, but due to some bug this cannot be ignored, so you can set it as empty string
    use_customer_cluster="true",
    customer_cluster_label="default",
    dag=dag)

How to Pass the Results of One QuboleOperator As A Parameter to Another Using get_results And xcom_pull

Here is an example to explain how to pass the results of one QuboleOperator as a parameter to another using get_results and xcom_pull. In the following example, QuboleOperator is used to run a Shell command to print a file which is stored in another cluster. The result of this Shell command is then sent to xcom by a Push command. As the next step, a Hive command is sent. This Hive command uses xcom_pull to fetch the result and run the query.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.qubole_operator import QuboleOperator



tod = datetime.now()
d = timedelta(days = 2)

default_args = {
            'owner': 'qubole',
            'depends_on_past': False,
            'start_date': tod - d,
            'retries': 0,
        'schedule_interval': '@once'
}

def push_command(**kwargs):
        ti = kwargs['ti']
            qubole_operator_result = open(qubole_shell_command.get_results(ti), 'r').read()
            ti.xcom_push(key='qubole_shell_command', value=qubole_operator_result)


def print_command(**kwargs):
             ti = kwargs['ti']
             qubole_operator_result = open(qubole_hive_command.get_results(ti), 'r').read()
             print(qubole_operator_result)

with DAG(dag_id="xcom_demo_dag", default_args=default_args, catchup=False) as dag:

             qubole_shell_command = QuboleOperator(
                                     task_id = 'qubole_shell_command',
                                     command_type = 'shellcmd',
                                     cluster_label = 'default',
                                     script = 'cat /usr/lib/temp/xcom_demo',
                                     fetch_logs = True,
                                     dag = dag)

             push_command = PythonOperator(
                 task_id = 'push_command',
                         python_callable = push_command,
                 provide_context = True,
                         dag = dag)

             print_command = PythonOperator(
                          task_id = 'print_command',
                          python_callable = print_command,
                          provide_context = True,
                          dag = dag)

             qubole_hive_command = QuboleOperator(
                                task_id = 'qubole_hive_command',
                                command_type = 'hivecmd',
                                cluster_label = 'default',
                                query = "SELECT * FROM default.salesdata WHERE shop_id = {{ task_instance.xcom_pull(key='qubole_shell_command') }}",
                                dag = dag)

qubole_shell_command >> push_command >> qubole_hive_command >> print_command