Spark Structured Streaming

New Features

  • SPAR-3615: For Spark structured streaming, users can create a Snowflake datastore in the QDS UI, and use the corresponding catalog name (instead of passing username and password) on the QuEST UI or Notebooks UI . The Snowflake dependent jar is upgraded to support Snowflake structured streaming.

    Users can use the following API for Snowflake structured streaming:

    val query = df
    .writeStream
    .option("sfDatabase", <DATABASE NAME> )
    .option("streaming_stage", <TEMP TABLE NAME>)
    .option("checkpointLocation", <CHECKPOINT LOCATION>)
    .snowflake(<QUBOLE DBTAP CATALOG NAME>,<WAREHOUSE NAME>,<TARGET TABLE NAME>)
    .start()'
    

    This is supported on Spark 2.3.3, 2.4.0, 2.4.3, and later versions.

Enhancements

  • SPAR-3747: Memory leak issues in RocksDB based state store are fixed, and the rocksdb configuration is better tuned to improve read performance. The previous rocksDB state-store provider class is deprecated. Users should use the following Spark configuration to enable RocksDB based state store:

    spark.sql.streaming.stateStore.providerClass = org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider
    

    This is supported on Spark 2.4.0 and later versions.

  • SPAR-3760: Users should specify the SQS queue region in the following format:

      val inputDf = spark
      .readStream
      .format("s3-sqs")
      .schema(schema)
      .option("sqsUrl", queueUrl)
      .option("region", region)
      .option("fileFormat", "json")
      .option("sqsFetchIntervalSeconds", "2")
      .option("sqsLongPollingWaitTimeSeconds", "5")
      .option("maxFilesPerTrigger", "50")
      .option("ignoreFileDeletion", "true")
      .load()
    
    This is supported on 2.4.0, 2.4.3, and later versions.
    
  • SPAR-3555: Users can write a structured streaming query, which can append the data to a Spark Datasource or Hive table, and users can query the table to view the latest data in real-time. Users can use the following API:

    dataset
    .writeStream
    .appendToTable("tableName")
    .option("checkpointLocation", checkpointDir)
    .start()
    

    This is supported on Spark 2.3.2, 2.4.0, 2.4.3, and later versions.

  • SPAR-3591: Users can pass x-amz-meta-metadata(key1=value1,key2=val2) while creating a new streaming job with s3 as the sink by setting the option fs.s3a.user.metadata as key1=val1,key2=val2. The streaming application creates new files with this metadata.

    This is supported on Spark 2.3.2 and later versions.

For a list of bug fixes between versions R56 and R57, see Changelog for api.qubole.com.