Using StreamX (AWS)

StreamX captures data from Kafka logs and preserves it in a Cloud store (currently Amazon S3), where it can be processed by engines such as Hive or Spark.

Note

StreamX is in Beta. Create a Qubole Support ticket if you want to try out this new capability.

Why Use StreamX?

StreamX is a managed, scalable, and reliable open-source service, built on the Kafka Connect framework and running in a dedicated Qubole Data Service (QDS) cluster. It provides ready access to usable streaming ingest with minimal configuration or maintenance.

Major features include:

  • Output in Avro or Parquet
  • Output can be partitioned into multiple topics, and written to multiple paths in the cloud store

How to Use StreamX

Proceed as follows to configure and start StreamX:

  1. If you have not already done so, install Kafka and start a Kafka cluster.

  2. Use these instructions to create a persistent AWS security group, and open ports 2181, 8081, and 9092 in the security configuration of your Kafka cluster, to allow access by members of that group to ZooKeeper, the schema registry, and the Kafka server respectively.

  3. In the QDS UI, navigate to Clusters and choose New.

  4. Choose StreamX as the cluster type.

  5. Accept the default Kafka version or use the drop-down to change it.

  6. Specify the Kafka brokers as a comma-separated list of DNS names in the form <fully-qualified-domain-name:port>.

  7. Choose the instance types for master and slave nodes from the drop-downs.

  8. Specify the number of nodes in the cluster. (StreamX clusters do not support auto-scaling.)

  9. Assuming that your Kafka cluster is running on AWS, select the same AWS Region and Availability Zone for the QDS cluster as for the Kafka cluster.

  10. Optionally specify the number, type, and size of EBS reserved volumes to be mounted to each instance as additional storage.

  11. Provide a file name to be appended to the path for the node bootstrap file, or accept the default. Click Next to proceed.

    Note

    Automatic cluster termination is disabled in StreamX clusters, but you can stop the cluster manually.

  12. If your Kafka cluster is running in an AWS VPC, specify the same VPC for the QDS cluster.

  13. Optionally specify parameter values to override the Kafka Connect configuration defaults (the UI provides an example).

  14. If your Kafka cluster is SSL-enabled, choose the SSL protocol, and provide the location of the SSL truststore, and the password, configured on the Kafka cluster.

    Note

    The truststore path must be in the mounted directory; for example /media/ephemeral0/streamx/<custom_path>. You can use a node bootstrap script to download the trustore file to this location.

    • In addition, if the Kafka cluster requires client authentication, check the Client Authentication box and provide the keystore information.
  15. Provide the name of the Persistent Security Group you created in step 2.

  16. When you are satisfied with the configuration, click Create. (For more information on optional fields, see Configuring Clusters.)

Using the Directory Mounted to a StreamX Docker Container

To allow updates to files on the StreamX cluster hosts to be reflected in a Docker container, and vice versa, place the files in /media/ephemeral0/streamx. You can use a node bootstrap script to download files to this location.

Logging in to the StreamX Cluster

If you need to log in to a StreamX cluster, you can do so by means of SSH. Follow these instructions.

Running a StreamX Job

After launching a StreamX cluster, you can submit jobs directly to the master node. StreamX listens on port 8083.

To submit a job, you can either:

  • Connect to the master node via ssh and run the job directly (using curl); or
  • Create an SSH tunnel from your local machine to the master node and submit jobs using the tunnel.

The following examples show creating, deleting, and listing StreamX jobs. See also Scenarios below.

Example 1: submit a Kafka-S3 copy job

curl -i -X POST \
   -H "Content-Type:application/json" \
   -d \
'{"name":"a1",
 "config":
{
"connector.class":"com.qubole.streamx.s3.S3SinkConnector",
"tasks.max":"1",
"flush.size":"3",
"format.class":"com.qubole.streamx.SourceFormat",
"s3.url":"s3://streamx/demo/",
"hadoop.conf.dir":"/usr/local/streamx/config/hadoop-conf/",
"topics":"click-stream"
}}' \
 'http://localhost:8083/connectors'

Explanation of the configuration parameters:

Key Value Description
connector.class com.qubole.streamx.s3.S3SinkConnector StreamX Job
tasks.max 1 to (number of Kafka connections) The number of parallel tasks that can consume data from a given set of topics.
flush.size (number) The number of messages to write before flushing them to a file in S3.
format.class com.qubole.streamx.SourceFormat The format of the data.
s3.url (S3 URL) The S3 location where the data is written.
hadoop.conf.dir /usr/local/streamx/config/hadoop-conf/ The location of the hdfs-site.xml file which has the access keys for S3. Hard-code this location exactly.
topics <topic1>,<topic2> Comma-separated list of topics.

Example 2: list all jobs

curl -i -X GET \
   -H "Content-Type:application/json" \
 'http://localhost:8083/connectors/'

This will return the list of connect jobs.

Example 3: get the status of a job

curl -i -X GET \
   -H "Content-Type:application/json" \
 'http://localhost:8083/connectors/<job name>/status’

Example 4: delete a job

curl -i -X DELETE \
   -H "Content-Type:application/json" \
 'http://localhost:8083/connectors/<job name>'

Scenarios

Scenario 1: retain the Kafka-based partition and create a new file in S3 every x messages:

curl -i -X POST \
   -H "Content-Type:application/json" \
   -d \
'{"name":"a12",
 "config":
{
"connector.class":"com.qubole.streamx.s3.S3SinkConnector",
"tasks.max":"1",
"flush.size":"3",
  "format.class":"com.qubole.streamx.SourceFormat",
"s3.url":"s3://streamx/demo/",
"hadoop.conf.dir":"/usr/local/streamx/config/hadoop-conf/",
  "topics":"clickstream"
}}' \
 'http://localhost:8083/connectors'

Output directory structure in S3:

-rwxrwxrwx   1          7 2017-07-11 00:50 /demo/topics/clickstream/partition=0/rb44-1+0+0000000234+0000000236
-rwxrwxrwx   1         11 2017-07-11 00:50 /demo/topics/clickstream/partition=0/rb44-1+0+0000000237+0000000239
-rwxrwxrwx   1         11 2017-07-11 00:50 /demo/topics/clickstream/partition=0/rb44-1+0+0000000240+0000000242
-rwxrwxrwx   1          8 2017-07-11 00:50 /demo/topics/clickstream/partition=0/rb44-1+0+0000000243+0000000245

Scenario 2: using a time-based rotation policy, write a file in S3 :

curl -i -X POST \
   -H "Content-Type:application/json" \
   -d \
'{"name":"t1",
 "config":
{
"connector.class":"com.qubole.streamx.s3.S3SinkConnector",
"tasks.max":"1",
"flush.size":"3",
  "format.class":"com.qubole.streamx.SourceFormat",
  "s3.url":"s3://streamx/demo/",
"hadoop.conf.dir":"/usr/local/streamx/config/hadoop-conf/",
  "topics":"t1",
"rotate.interval.ms":"60000",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
"partition.duration.ms":"3600000",
"path.format":"YYYY/MM/dd/HH/",
"locale":"en",
  "timezone":"GMT"}}' \
 'http://localhost:8083/connectors'

Output directory structure in S3 :

-rwxrwxrwx   1         14 2017-07-11 00:57 /demo/topics/t1/2017/07/11/00/t1+0+0000000024+0000000026
-rwxrwxrwx   1         10 2017-07-11 00:57 /demo/topics/t1/2017/07/11/00/t1+0+0000000027+0000000029
-rwxrwxrwx   1          9 2017-07-11 00:57 /demo/topics/t1/2017/07/11/00/t1+0+0000000030+0000000032