Configuring an Airflow Cluster

Configure an Airflow cluster as described under Configuring the Cluster.

This page also provides information on the following topics:

Configuring the Cluster

Navigate to the Clusters page. Click New to add a new cluster. Select Airflow as the cluster type. See Managing Clusters for detailed instructions on configuring a QDS cluster. For Airflow, note the following:

  • Airflow Version: The default version is 1.8.2. AWS also supports version 1.7.0. Version 1.8.2 is compatible with MySQL 5.6 or higher.

  • Python Version: Qubole supports Python version 2.7 and 3.5 on Airflow clusters. Python version 3.5 is supported with Airflow version 1.8.2 or later. The default Python version is 2.7. However, this field is not visible to you unless you create a ticket with Qubole Support and get this field enabled on the QDS account.

    When you create an Airflow cluster with Python version 3.5, it gets automatically attached to a Package Management environment.

  • Data Store: Select the data store from the drop-down list. Currently, the MySQL and Amazon Aurora-MySQL data stores are supported on Airflow clusters.

  • Fernet Key: Encryption key (32 url-safe base64 encoded bytes) for sensitive information inside the Airflow database, such as user passwords and connections. QDS auto-generates a Fernet key if you do not specify it here.

  • Node Type: An Airflow cluster is actually a single node, so there are no Master or Worker nodes. Select the instance type from the drop-down list.

  • Autoscaling is not supported in Airflow clusters, and, for AWS, only On-Demand clusters are supported.

Under Advanced Configuration, do the tasks described under:

To add more workers in an Airflow cluster, see Configuring a Multi-node Airflow Cluster.

Configuring EC2 Settings (AWS)

Configuring AWS EC2 settings is the same as configuring them for any other type of QDS cluster. See Advanced configuration: Modifying EC2 Settings (AWS) for more information.

Configuring Airflow Settings

Qubole provides an Airflow Recommended Configuration, as shown in the QDS UI under the Advanced tab. You can override this configuration by adding new values in the Override Airflow Configuration Variables text box. See also Using or Overriding Default Airflow Settings.

Configuring Security Groups

Configure security settings in the same way as for any other type of QDS cluster. See Managing Clusters for more information. The security group must be the security group that you create while setting up a data store. See Setting up a Data Store (AWS) for more information.

Starting an Airflow Cluster

You can start a cluster by clicking the Start button on the Clusters page. See Understanding Cluster Operations for more information.

Afer starting an Airflow cluster, you can find Airflow DAGs and logs, and the configuration file, under usr/lib/airflow.

Populating a Default or Custom Authentication Token in Airflow

After the Airflow cluster is successfully started, a default QDS connection (qubole_default) is created (if it does not exist), with the required host. The host parameter is set to the Qubole API endpoint for your Cloud, with an empty password. A password is the QDS authentication token of a QDS account user. You can decide the default authentication token and populate it using the Airflow Webserver Connection Dashboard.

You can create custom Qubole Airflow connections through the Airflow Webserver Connection Dashboard for different users. You can use them in the Qubole Operator to submit commands in the corresponding accounts.

You can use a custom connection (for example, my_qubole_connection) in the Airflow DAG script by setting the qubole_conn_id parameter in the Qubole Operator. If this parameter is not set, the Qubole Operator uses the qubole_default connection. The following sample code shows how to set the qubole_conn_id parameter.

qubole_task = QuboleOperator(
                                  task_id='hive_show_table',
                                  command_type='hivecmd',
                                  query='show tables',
                                  qubole_conn_id='my_qubole_connection', #*takes qubole_default as default connection*
                                  cluster_label='default',
                                  dag=dag
                                 )

Terminating an Airflow Cluster

An Airflow cluster does not automatically stop when it is left unused. Click the stop button to terminate the cluster. See Understanding Cluster Operations for more information.

Editing an Airflow Cluster

Click the edit button to modify the configuration. See Understanding Cluster Operations for more information. No configuration is pushable in a running Airflow cluster.

Using or Overriding Default Airflow Settings

By default Qubole has set CeleryExecutor as the executor mode. CeleryExecutor allows you to scale the pipeline vertically in the same machine by increasing the number of workers. See also Configuring a Multi-node Airflow Cluster. Celery needs a message broker and backend to store state and results.

Qubole ships rabbitmq pre-installed inside an Airflow cluster, and sets it as the default message broker for Airflow. For the result backend, Qubole uses the configured Airflow datastore for storing Celery data. If you want to use your own message broker and backend, you can configure celery.broker_url and celery.celery_result_backend in the Override Airflow Configuration Variables cluster configuration field.

User Level Privileges

In Qubole, Airflow clusters offer these two simple authorization methods:

  • User - A user who can view all the tabs except the Admin tabs on the Airflow UI.
  • Admin - An admin can view all tabs. A user with the Update access on that cluster is considered as Admin while other users with no Update access are considered as User.

Configuring a Multi-node Airflow Cluster

Currently, Airflow clusters contain only a single node by default. If you want more workers, you can scale vertically by selecting a larger instance type and adding more workers, using the cluster configuration override parameter celery.celeryd_concurrency. You can do this while the cluster is running; choose Update and Push on the Clusters page to implement the change.

To scale horizontally, you can use a workaround to add more workers to the existing cluster.

Create a new user in rabbitmq running on the first cluster, which you can do through a shell command on the Analyze page:

sudo /usr/sbin/rabbitmqctl add_user new_user new_password;
sudo /usr/sbin/rabbitmqctl set_user_tags new_user administrator;
sudo /usr/sbin/rabbitmqctl set_permissions -p / new_user ".*" ".*" ".*"

After running the above shell command, go to the Clusters page, clone the parent Airflow cluster, and override the broker details for new cluster as follows:

celery.broker_url=amqp://new_user:new_password@<master_dns_of_first_cluster>//

Once the new cluster is up and running, stop the Airflow scheduler running on the new cluster.

sudo monit stop scheduler

Note the following:

  • The parent Airflow cluster and its cloned cluster must use the same data store and Fernet key
  • You must sync the DAG files on the new cluster.
  • You must allow inbound TCP requests from the cloned cluster over 5672 and 15672 ports to the parent Airflow cluster.

Qubole plans to add multi-node Airflow cluster support in the future.