Using the Supported Capabilities

Hive ACID Data Source for Spark on Qubole supports READ, WRITE, UPDATE, and DELETE capabilities on Hive ACID tables.

You must review the Prerequisites and the Supported Dataframe APIs before using the supported capabilities.

The following sections explain how to use the supported capabilities with examples:

Prerequisites

Before you begin, you must ensure that the following requirements are met:

  1. Hive ACID Data Source for Spark is set up.
  2. To run SQL statements, the com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension extension is added to spark.sql.extensions.
  3. The latest binary version of the ACID Datasource is added to Spark, for example by using the --jar <path of the jar> option.

Supported Dataframe APIs

Hive ACID Data Source for Spark on Qubole supports the following APIs:

Note

The DatastreamReader API is not supported currently.

DataframeWriter API

  • To write in to an existing table, use insertInto() with the following syntax:

    df.write.insertInto("<hive-acid-table-name>")
    
  • To append data to a table, use save() with the following syntax:

    df.write.format("HiveAcid").option("table", "<hive-acid-table-name>").mode("append").save()
    
  • To overwrite an existing table, use save() with the following syntax:

    df.write.format("HiveAcid").option("table", "<hive-acid-table-name>").mode("overwrite").save()
    
  • To insert to a table, use save() with the following syntax:

    val df = spark.read.parquet("tbldata.parquet")
    df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("append").save()
    
  • To perform insert overwrite, use save() with the following syntax:

    val df = spark.read.parquet("tbldata.parquet")
    df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("overwrite").save()
    
  • To perform insert into using implicit, use the following syntax:

    import com.qubole.spark.hiveacid._
    
    val df = spark.read.parquet("tbldata.parquet")
    df.write.hiveacid("acid.acidtbl", "append")
    

Note

save(path) is not supported on Hive ACID tables, and saveAsTable() is not supported on Hive ACID tables currently.

DataframeReader API

  • To read an entire Hive ACID table in a dataframe, use table() with the following syntax:

    val df = spark.read.table(“<hive-acid-table-name>”)
    
  • To load an entire Hive ACID table, use load() with the following syntax:

    val df = spark.read.format("HiveAcid").options(Map("table" -> "acid.acidtbl")).load()
    df.select("status", "rank").filter($"rank" > "20").show()
    
  • To read a Hive ACID table via implicit, use the following syntax:

    import com.qubole.spark.hiveacid._
    
    val df = spark.read.hiveacid("acid.acidtbl")
    df.select("status", "rank").filter($"rank" > "20").show()
    

DatastreamWriter API

ACID table supports streaming writes and can also be used as a Streaming Sink. Streaming write occurs under transactional guarantees, which allows other concurrent writes to the same table either through streaming writes or batch writes. For exactly-once semantics, spark.acid.streaming.log.metadataDir is specified to store the latest batchId processed.

Note

You should specify different metadataDir for the concurrent streaming writes to the same table.

Use the following syntax:

 val query = newDf
 .writeStream
 .format("HiveAcid")
 .options(Map(
   "table" ->"<database>.<hive-acid-tableName>",
"spark.acid.streaming.log.metadataDir"->"/pathTo/metadataDir"))
  .outputMode(OutputMode.Append)
  .option("checkpointLocation", "/pathTo/checkpointDir")
  .start()

Note

appendToTable() is not supported currently.

Read ACID Tables

The following example shows how to read from ACID table by using the SQL Syntax.

Note

To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension to spark.sql.extensions.

spark.sql("SELECT status, rank from acid.acidtbl where rank > 20")

For information about reading from ACID table by using the Dataframe API, see Reading by using Dataframe API.

Batch Write into ACID Tables

The following example shows how to perform insert by using the SQL Syntax.

Note

To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension to spark.sql.extensions.

spark.sql("INSERT INTO acid.acidtbl select * from sample_data")

The following example shows how to perform insert overwrite by using the SQL Syntax.

spark.sql("INSERT OVERWRITE TABLE acid.acidtbl select * from sample_data")

For information about writing by using the Dataframe API, see Writing by using Dataframe API.

Stream Write into ACID Tables

For information about streaming writes, see Streaming Writes by using Dataframe API.

Update

SQL Syntax

UPDATE tablename SET column = updateExp [, column = updateExp ...] [WHERE expression]
  • column must be a column of the table being updated.
  • updateExp is an expression that Spark supports in the SELECT clause. Subqueries are not supported.
  • WHERE clause specifies the row to be updated.

Note

Partitioning columns cannot be updated and bucketed tables are not supported currently.

Example

The following example shows how to update by using SQL syntax.

spark.sql("UPDATE acid.acidtbl set rank = rank - 1, status = true where rank > 20 and rank < 25 and status = false")

Delete

SQL Syntax

DELETE FROM tablename [WHERE expression]
  • WHERE clause specifies rows to be deleted from the table.

Note

Bucketed tables are not supported currently.

Example

The following example shows how to delete by using SQL syntax.

Note

To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension to spark.sql.extensions.

DELETE from acid.acidtbl where rank = 1000