Using the Supported Capabilities
Hive ACID Data Source for Spark on Qubole supports READ
, WRITE
, UPDATE
, and DELETE
capabilities on Hive ACID tables.
You must review the Prerequisites and the Supported Dataframe APIs before using the supported capabilities.
The following sections explain how to use the supported capabilities with examples:
Prerequisites
Before you begin, you must ensure that the following requirements are met:
To run SQL statements, the
com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
extension is added tospark.sql.extensions
.The latest binary version of the ACID Datasource is added to Spark, for example by using the
--jar <path of the jar>
option.
Supported Dataframe APIs
Hive ACID Data Source for Spark on Qubole supports the following APIs:
Note
The DatastreamReader API is not supported currently.
DataframeWriter API
To write in to an existing table, use
insertInto()
with the following syntax:df.write.insertInto("<hive-acid-table-name>")
To append data to a table, use
save()
with the following syntax:df.write.format("HiveAcid").option("table", "<hive-acid-table-name>").mode("append").save()
To overwrite an existing table, use
save()
with the following syntax:df.write.format("HiveAcid").option("table", "<hive-acid-table-name>").mode("overwrite").save()
To insert to a table, use
save()
with the following syntax:val df = spark.read.parquet("tbldata.parquet") df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("append").save()
To perform insert overwrite, use
save()
with the following syntax:val df = spark.read.parquet("tbldata.parquet") df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("overwrite").save()
To perform insert into using implicit, use the following syntax:
import com.qubole.spark.hiveacid._ val df = spark.read.parquet("tbldata.parquet") df.write.hiveacid("acid.acidtbl", "append")
Note
save(path)
is not supported on Hive ACID tables, and saveAsTable()
is not supported on Hive ACID tables currently.
DataframeReader API
To read an entire Hive ACID table in a dataframe, use
table()
with the following syntax:val df = spark.read.table(“<hive-acid-table-name>”)
To load an entire Hive ACID table, use
load()
with the following syntax:val df = spark.read.format("HiveAcid").options(Map("table" -> "acid.acidtbl")).load() df.select("status", "rank").filter($"rank" > "20").show()
To read a Hive ACID table via implicit, use the following syntax:
import com.qubole.spark.hiveacid._ val df = spark.read.hiveacid("acid.acidtbl") df.select("status", "rank").filter($"rank" > "20").show()
DatastreamWriter API
ACID table supports streaming writes and can also be used as a Streaming Sink.
Streaming write occurs under transactional guarantees, which allows other concurrent writes to the same table either through streaming writes or batch writes.
For exactly-once semantics, spark.acid.streaming.log.metadataDir
is specified to store the latest batchId processed.
Note
You should specify different metadataDir
for the concurrent streaming writes to the same table.
Use the following syntax:
val query = newDf .writeStream .format("HiveAcid") .options(Map( "table" ->"<database>.<hive-acid-tableName>", "spark.acid.streaming.log.metadataDir"->"/pathTo/metadataDir")) .outputMode(OutputMode.Append) .option("checkpointLocation", "/pathTo/checkpointDir") .start()
Note
appendToTable()
is not supported currently.
Read ACID Tables
The following example shows how to read from ACID table by using the SQL Syntax.
Note
To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
to spark.sql.extensions
.
spark.sql("SELECT status, rank from acid.acidtbl where rank > 20")
For information about reading from ACID table by using the Dataframe API, see Reading by using Dataframe API.
Batch Write into ACID Tables
The following example shows how to perform insert by using the SQL Syntax.
Note
To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
to spark.sql.extensions
.
spark.sql("INSERT INTO acid.acidtbl select * from sample_data")
The following example shows how to perform insert overwrite by using the SQL Syntax.
spark.sql("INSERT OVERWRITE TABLE acid.acidtbl select * from sample_data")
For information about writing by using the Dataframe API, see Writing by using Dataframe API.
Stream Write into ACID Tables
For information about streaming writes, see Streaming Writes by using Dataframe API.
Update
SQL Syntax
UPDATE tablename SET column = updateExp [, column = updateExp ...] [WHERE expression]
column
must be a column of the table being updated.updateExp
is an expression that Spark supports in the SELECT clause. Subqueries are not supported.WHERE
clause specifies the row to be updated.
Note
Partitioning columns cannot be updated and bucketed tables are not supported currently.
Example
The following example shows how to update by using SQL syntax.
spark.sql("UPDATE acid.acidtbl set rank = rank - 1, status = true where rank > 20 and rank < 25 and status = false")
Delete
SQL Syntax
DELETE FROM tablename [WHERE expression]
WHERE
clause specifies rows to be deleted from the table.
Note
Bucketed tables are not supported currently.
Example
The following example shows how to delete by using SQL syntax.
Note
To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
to spark.sql.extensions
.
DELETE from acid.acidtbl where rank = 1000