Qubole Operator Examples
For Qubole Operator API information, see Qubole Operator API.
For a real ETL use case using Qubole Operator example, see Readme.
The following examples illustrate the use of the Qubole Operator.
# Importing Qubole Operator in DAG
from airflow.contrib.operators.qubole_operator import QuboleOperator
# Hive Command - Inline query, Bonus - Attaching command tags & qubole connection id
QuboleOperator(
task_id='hive_inline',
command_type='hivecmd',
query='show tables',
cluster_label='default',
tags='aiflow_example_run', # Attach tags to Qubole command, auto attaches 3 tags - dag_id, task_id, run_id
qubole_conn_id='qubole_default', # Connection ID to submit commands inside QDS, if not set **qubole_default** is used
dag=dag)
# Hive Command - S3 Script location, Bonus - Qubole Macros, Email Notifications
QuboleOperator(
task_id='hive_s3_location',
command_type="hivecmd",
script_location="s3://public-qubole/qbol-library/scripts/show_table.hql",
notify=True, # Sends email on the command completion, either success or failure, notification settings as set in the Qubole account.
# Escape the macro values.
macros='[{"date": "\\"\\""}, {"name" : "\\"abc\\""}]', # Applies Qubole Macros to s3 script.
tags=['tag1', 'tag2'],
dag=dag)
# Hive Command - Add Hive Resources
QuboleOperator(
task_id='hive_add_jar',
command_type='hivecmd',
query='ADD JAR s3://paid-qubole/jars/json-serde/json-serde-1.1.7.jar',
cluster_label='default',
dag=dag)
# For Azure:
QuboleOperator(
task_id='hive_add_jar',
command_type='hivecmd',
query='ADD JAR wasb://[email protected]/jars/json-serde/json-serde-1.1.7.jar',
cluster_label='default',
dag=dag)
# Jupyter Notebook Command
QuboleOperator(
task_id='jupyter_cmd',
command_type="jupytercmd",
cluster_label='default',
path=<path/to/jupyternotebook/on/qds>, # Right click on the notebook in Jupyter and click Copy Path to get the path
arguments='{"name":"hello world"}',
dag=dag)
# Shell Command - S3 Script Location with arguments
QuboleOperator(
task_id='shell_cmd',
command_type="shellcmd",
script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
parameters="param1 param2",
dag=dag)
# Shell Command - Inline query with files to copy in working directory
QuboleOperator(
task_id='shell_cmd',
command_type="shellcmd",
script="hadoop dfs -lsr s3://paid-qubole/",
files="s3://paid-qubole/ShellDemo/data/excite-small.sh,s3://paid-qubole/ShellDemo/data/excite-big.sh",
dag=dag)
# For Azure:
QuboleOperator(
task_id='shell_cmd',
command_type="shellcmd",
script="hadoop dfs -lsr wasb://[email protected]",
files="wasb://[email protected]/ShellDemo/data/excite-small.sh,wasb://[email protected]/ShellDemo/data/excite-big.sh",
# Pig Command with s3 script location and arguments
QuboleOperator(
task_id='pig_cmd',
command_type="pigcmd",
script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
parameters="key1=value1 key2=value2", # Note these are space separated
dag=dag)
# Presto Command - Inline
QuboleOperator(
task_id='presto_cmd',
command_type='prestocmd',
query='select * from default.default_qubole_airline_origin_destination limit 5',
dag=dag)
# Hadoop Command - Inline Custom Jar command
QuboleOperator(
task_id='hadoop_jar_cmd',
command_type='hadoopcmd',
sub_command='jar s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar -mapper wc -numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/data/3.tsv -output s3://paid-qubole/HadoopAPITests/data/3_wc',
cluster_label='default',
dag=dag)
# For Azure:
QuboleOperator(
task_id='hadoop_jar_cmd',
command_type='hadoopcmd',
sub_command='jar wasb://[email protected]/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar -mapper wc -numReduceTasks 0 -input wasb://[email protected]/HadoopAPITests/data/3.tsv -output wasb://[email protected]/HadoopAPITests/data/3_wc',
cluster_label='default',
dag=dag)
# DbTap Query
QuboleOperator(
task_id='db_query',
command_type='dbtapquerycmd',
query='show tables',
db_tap_id="2064",
dag=dag)
# Db Export Command in Mode 1 - Simple Mode
QuboleOperator(
task_id='db_export',
command_type='dbexportcmd',
mode=1,
hive_table='default_qubole_airline_origin_destination',
db_table='exported_airline_origin_destination',
partition_spec='dt=20110104-02',
dbtap_id="2064",
use_customer_cluster="true",
customer_cluster_label="default",
dag=dag)
# Db Export Command in Mode 2 - Advanced Mode
QuboleOperator(
task_id='db_export_mode_2',
command_type='dbexportcmd',
mode=2,
db_table='mydb.mydata',
dbtap_id="10942",
export_dir="s3://mybucket/mydata.csv",
fields_terminated_by="\\0x9",
use_customer_cluster="true",
customer_cluster_label="default",
dag=dag)
# For Azure:
QuboleOperator(
task_id='db_export_mode_2',
command_type='dbexportcmd',
mode=2,
db_table='mydb.mydata',
dbtap_id="10942",
export_dir="wasb://<container_name>@<storage_account_name>/mydata.csv",
fields_terminated_by="\\0x9",
use_customer_cluster="true",
customer_cluster_label="default",
dag=dag)
# Db Import Command in Mode 1 - Simple Mode
QuboleOperator(
task_id='db_import',
command_type='dbimportcmd',
mode=1,
hive_table='default_qubole_airline_origin_destination',
db_table='exported_airline_origin_destination',
where_clause='id < 10',
parallelism=2,
dbtap_id="2064",
use_customer_cluster="true",
customer_cluster_label="default",
dag=dag)
prog = '''
import scala.math.random
import org.apache.spark._
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
'''
# Spark Command - Scala Program
QuboleOperator(
task_id='spark_cmd',
command_type="sparkcmd",
program=prog,
language='scala',
arguments='--class SparkPi',
dag=dag)
# Spark Command - Run a Notebook
QuboleOperator(
task_id='spark_cmd',
command_type="sparkcmd",
note_id="36995",
qubole_conn_id='qubole_prod',
arguments='{"name":"hello world"}',
dag=dag)
# Db Import Command in Mode 2 - Advanced Mode
QuboleOperator(
task_id = "db_import_mode_2" ,
command_type = "dbimportcmd" ,
mode = "2" ,
extract_query = "select id, dt from mydb.mydata where $CONDITIONS and id < 10”,
boundary_query="select min(id), max(id) from mydata",
split_column=”id”,
dbtap_id = "9531" ,
hive_table = "myhivedb.mydata" ,
parallelism = "1",
db_table = "" , # Please note functionally this parameter is not needed in mode 2, but due to some bug this cannot be ignored, so you can set it as empty string
use_customer_cluster="true",
customer_cluster_label="default",
dag=dag)
How to Pass the Results of One QuboleOperator As A Parameter to Another Using get_results And xcom_pull
Here is an example to explain how to pass the results of one QuboleOperator as a parameter to another using get_results
and xcom_pull
.
In the following example, QuboleOperator is used to run a Shell command to print a file which is stored in
another cluster. The result of this Shell command is then sent to xcom
by a Push command. As the next step, a Hive
command is sent. This Hive command uses xcom_pull
to fetch the result and run the query.
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.qubole_operator import QuboleOperator
tod = datetime.now()
d = timedelta(days = 2)
default_args = {
'owner': 'qubole',
'depends_on_past': False,
'start_date': tod - d,
'retries': 0,
'schedule_interval': '@once'
}
def push_command(**kwargs):
ti = kwargs['ti']
qubole_operator_result = open(qubole_shell_command.get_results(ti), 'r').read()
ti.xcom_push(key='qubole_shell_command', value=qubole_operator_result)
def print_command(**kwargs):
ti = kwargs['ti']
qubole_operator_result = open(qubole_hive_command.get_results(ti), 'r').read()
print(qubole_operator_result)
with DAG(dag_id="xcom_demo_dag", default_args=default_args, catchup=False) as dag:
qubole_shell_command = QuboleOperator(
task_id = 'qubole_shell_command',
command_type = 'shellcmd',
cluster_label = 'default',
script = 'cat /usr/lib/temp/xcom_demo',
fetch_logs = True,
dag = dag)
push_command = PythonOperator(
task_id = 'push_command',
python_callable = push_command,
provide_context = True,
dag = dag)
print_command = PythonOperator(
task_id = 'print_command',
python_callable = print_command,
provide_context = True,
dag = dag)
qubole_hive_command = QuboleOperator(
task_id = 'qubole_hive_command',
command_type = 'hivecmd',
cluster_label = 'default',
query = "SELECT * FROM default.salesdata WHERE shop_id = {{ task_instance.xcom_pull(key='qubole_shell_command') }}",
dag = dag)
qubole_shell_command >> push_command >> qubole_hive_command >> print_command