Running DDL Commands
You can run DDL commands on a Snowflake data warehouse by using the new Qubole Spark Scala API runSnowflakeQuery
. This enables you to perform queries such as CTAS on Snowflake tables through Spark.
Prerequisites
You must ensure that you are using Spark version 2.x.x or above.
Before running any DDL command, you must ensure that the Snowflake virtual warehouse is running.
Note
If you want to use an already running Spark cluster to run DDL command on the newly added Snowflake data store, then restart the Spark cluster so that the Snowflake jars are installed on the Spark cluster.
Steps
Import the
org.apache.spark.sql.util.QuboleSQLUtils
file.
Method Signature:
def runSnowflakeQuery(quboleCatalogName: String, warehouse: String, database: String, query: String, snowflakeConfigs:
Map[String, String] = null): ResultSet
Run the following command to execute the DDL command:
QuboleSQLUtils.runSnowflakeQuery(<catalog_name>, <warehouse_name>, <db_name>, <sql_query>,<snowflake_configs>)
The API’s return type is java.sql.ResultSet
.
Example
The sample scripts run the following DDL queries for Scala and Pyspark.:
drop table if it already exists
print results of the query
create table
truncate table
drop table
Scala
import org.apache.spark.sql.util.QuboleSQLUtils
object SnowflakeDDLExample {
def main(args: Array[String]) {
// drop table if it already exists
val drop_table_if_exists_result = QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "drop table if exists table_xyz")
// printing results of the above query
printResults(drop_table_if_exists_result)
// create table
val create_table_result = QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "create table table_xyz (id int, name string)")
// printing results of the above query
printResults(create_table_result)
// truncate table
val truncate_table_result = QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "truncate table table_xyz")
// printing results of the above query
printResults(truncate_table_result)
// drop table
val drop_table_result = QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "drop table table_xyz")
// printing results of the above query
printResults(drop_table_result)
}
def printResults(rs: java.sql.ResultSet) {
val rsmd = rs.getMetaData()
val numberOfColumns = rsmd.getColumnCount()
println(s"Column count=${numberOfColumns}")
print("Table schema: ")
for (i <- 1 to numberOfColumns) {
if (i > 1) print(", ")
val columnName = rsmd.getColumnName(i)
print(columnName)
}
println("")
while (rs.next()) {
for (i <- 1 to numberOfColumns) {
if (i > 1) print(", ")
val columnValue = rs.getString(i)
print(columnValue)
}
println("")
}
println("")
}
}
Pyspark
%pyspark
def printResults(rs):
rsmd = rs.getMetaData()
numberOfColumns = rsmd.getColumnCount()
print("Column count= " + str(numberOfColumns))
columnNames = "Table schema: "
for i in range(numberOfColumns):
if (i > 0):
columnNames = columnNames + ", "
columnNames = columnNames + rsmd.getColumnName(i+1)
print(columnNames)
while (rs.next()):
columnValues = ""
for i in range(numberOfColumns):
if (i > 0):
columnValues = columnValues + ", "
columnValues = columnValues + rs.getString(i+1)
print(columnValues)
#drop table if exists
drop_table_if_exists_result = sc._jvm.org.apache.spark.sql.util.QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "drop table if exists table_xyz",None)
#printing results of the above query
printResults(drop_table_if_exists_result)
#create table
create_table_result = sc._jvm.org.apache.spark.sql.util.QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "create table table_xyz (id int, name string)",None)
#printing results of the above query
printResults(create_table_result)
#truncate table
truncate_table_result = sc._jvm.org.apache.spark.sql.util.QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "truncate table table_xyz",None)
#printing results of the above query
printResults(truncate_table_result)
#drop table
drop_table_result = sc._jvm.org.apache.spark.sql.util.QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "drop table table_xyz",None)
#printing results of the above query
printResults(drop_table_result)