Using the Supported Capabilities¶
Hive ACID Data Source for Spark on Qubole supports READ
, WRITE
, UPDATE
, and DELETE
capabilities on Hive ACID tables.
You must review the Prerequisites and the Supported Dataframe APIs before using the supported capabilities.
The following sections explain how to use the supported capabilities with examples:
Prerequisites¶
Before you begin, you must ensure that the following requirements are met:
- Hive ACID Data Source for Spark is set up.
- To run SQL statements, the
com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
extension is added tospark.sql.extensions
. - The latest binary version of the ACID Datasource is added to Spark, for example by using the
--jar <path of the jar>
option.
Supported Dataframe APIs¶
Hive ACID Data Source for Spark on Qubole supports the following APIs:
Note
The DatastreamReader API is not supported currently.
DataframeWriter API¶
To write in to an existing table, use
insertInto()
with the following syntax:df.write.insertInto("<hive-acid-table-name>")
To append data to a table, use
save()
with the following syntax:df.write.format("HiveAcid").option("table", "<hive-acid-table-name>").mode("append").save()
To overwrite an existing table, use
save()
with the following syntax:df.write.format("HiveAcid").option("table", "<hive-acid-table-name>").mode("overwrite").save()
To insert to a table, use
save()
with the following syntax:val df = spark.read.parquet("tbldata.parquet") df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("append").save()
To perform insert overwrite, use
save()
with the following syntax:val df = spark.read.parquet("tbldata.parquet") df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("overwrite").save()
To perform insert into using implicit, use the following syntax:
import com.qubole.spark.hiveacid._ val df = spark.read.parquet("tbldata.parquet") df.write.hiveacid("acid.acidtbl", "append")
Note
save(path)
is not supported on Hive ACID tables, and saveAsTable()
is not supported on Hive ACID tables currently.
DataframeReader API¶
To read an entire Hive ACID table in a dataframe, use
table()
with the following syntax:val df = spark.read.table(“<hive-acid-table-name>”)
To load an entire Hive ACID table, use
load()
with the following syntax:val df = spark.read.format("HiveAcid").options(Map("table" -> "acid.acidtbl")).load() df.select("status", "rank").filter($"rank" > "20").show()
To read a Hive ACID table via implicit, use the following syntax:
import com.qubole.spark.hiveacid._ val df = spark.read.hiveacid("acid.acidtbl") df.select("status", "rank").filter($"rank" > "20").show()
DatastreamWriter API¶
ACID table supports streaming writes and can also be used as a Streaming Sink.
Streaming write occurs under transactional guarantees, which allows other concurrent writes to the same table either through streaming writes or batch writes.
For exactly-once semantics, spark.acid.streaming.log.metadataDir
is specified to store the latest batchId processed.
Note
You should specify different metadataDir
for the concurrent streaming writes to the same table.
Use the following syntax:
val query = newDf .writeStream .format("HiveAcid") .options(Map( "table" ->"<database>.<hive-acid-tableName>", "spark.acid.streaming.log.metadataDir"->"/pathTo/metadataDir")) .outputMode(OutputMode.Append) .option("checkpointLocation", "/pathTo/checkpointDir") .start()
Note
appendToTable()
is not supported currently.
Read ACID Tables¶
The following example shows how to read from ACID table by using the SQL Syntax.
Note
To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
to spark.sql.extensions
.
spark.sql("SELECT status, rank from acid.acidtbl where rank > 20")
For information about reading from ACID table by using the Dataframe API, see Reading by using Dataframe API.
Batch Write into ACID Tables¶
The following example shows how to perform insert by using the SQL Syntax.
Note
To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
to spark.sql.extensions
.
spark.sql("INSERT INTO acid.acidtbl select * from sample_data")
The following example shows how to perform insert overwrite by using the SQL Syntax.
spark.sql("INSERT OVERWRITE TABLE acid.acidtbl select * from sample_data")
For information about writing by using the Dataframe API, see Writing by using Dataframe API.
Stream Write into ACID Tables¶
For information about streaming writes, see Streaming Writes by using Dataframe API.
Update¶
SQL Syntax¶
UPDATE tablename SET column = updateExp [, column = updateExp ...] [WHERE expression]
column
must be a column of the table being updated.updateExp
is an expression that Spark supports in the SELECT clause. Subqueries are not supported.WHERE
clause specifies the row to be updated.
Note
Partitioning columns cannot be updated and bucketed tables are not supported currently.
Example¶
The following example shows how to update by using SQL syntax.
spark.sql("UPDATE acid.acidtbl set rank = rank - 1, status = true where rank > 20 and rank < 25 and status = false")
Delete¶
SQL Syntax¶
DELETE FROM tablename [WHERE expression]
WHERE
clause specifies rows to be deleted from the table.
Note
Bucketed tables are not supported currently.
Example¶
The following example shows how to delete by using SQL syntax.
Note
To run SQL statements, add com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
to spark.sql.extensions
.
DELETE from acid.acidtbl where rank = 1000