Running DDL Commands

You can run DDL commands on a Snowflake data warehouse by using the new Qubole Spark Scala API runSnowflakeQuery. This enables you to perform queries such as CTAS on Snowflake tables through Spark.

Prerequisites

  • You must ensure that you are using Spark version 2.x.x or above.
  • Before running any DDL command, you must ensure that the Snowflake virtual warehouse is running.

Note

If you want to use an already running Spark cluster to run DDL command on the newly added Snowflake data store, then restart the Spark cluster so that the Snowflake jars are installed on the Spark cluster.

Steps

  1. Import the org.apache.spark.sql.util.QuboleSQLUtils file.

Method Signature:

def runSnowflakeQuery(quboleCatalogName: String, warehouse: String, database: String, query: String, snowflakeConfigs:
Map[String, String] = null): ResultSet
  1. Run the following command to execute the DDL command:
QuboleSQLUtils.runSnowflakeQuery(<catalog_name>, <warehouse_name>, <db_name>, <sql_query>,<snowflake_configs>)

The API’s return type is java.sql.ResultSet.

Example

The sample scripts run the following DDL queries for Scala and Pyspark.:

  • drop table if it already exists
  • print results of the query
  • create table
  • truncate table
  • drop table

Scala

import org.apache.spark.sql.util.QuboleSQLUtils

 object SnowflakeDDLExample {
   def main(args: Array[String]) {
     // drop table if it already exists
     val drop_table_if_exists_result = QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "drop table if exists table_xyz")
     // printing results of the above query
     printResults(drop_table_if_exists_result)
     // create table
     val create_table_result = QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "create table table_xyz (id int, name string)")
     // printing results of the above query
     printResults(create_table_result)
     // truncate table
     val truncate_table_result = QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "truncate table table_xyz")
     // printing results of the above query
     printResults(truncate_table_result)
     // drop table
     val drop_table_result = QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "drop table table_xyz")
     // printing results of the above query
     printResults(drop_table_result)
   }

   def printResults(rs: java.sql.ResultSet) {
     val rsmd = rs.getMetaData()
     val numberOfColumns = rsmd.getColumnCount()
     println(s"Column count=${numberOfColumns}")
     print("Table schema: ")
     for (i <- 1 to numberOfColumns) {
       if (i > 1) print(",  ")
       val columnName = rsmd.getColumnName(i)
       print(columnName)
     }
     println("")
     while (rs.next()) {
       for (i <- 1 to numberOfColumns) {
         if (i > 1) print(",  ")
         val columnValue = rs.getString(i)
         print(columnValue)
       }
       println("")
     }
     println("")
   }
 }

Pyspark

%pyspark
def printResults(rs):
    rsmd = rs.getMetaData()
    numberOfColumns = rsmd.getColumnCount()
    print("Column count= " + str(numberOfColumns))
    columnNames = "Table schema: "
    for i in range(numberOfColumns):
        if (i > 0):
            columnNames = columnNames + ",  "
        columnNames = columnNames + rsmd.getColumnName(i+1)
    print(columnNames)
    while (rs.next()):
        columnValues = ""
        for i in range(numberOfColumns):
            if (i > 0):
                columnValues = columnValues + ",  "
            columnValues = columnValues + rs.getString(i+1)
        print(columnValues)

#drop table if exists
drop_table_if_exists_result = sc._jvm.org.apache.spark.sql.util.QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "drop table if exists table_xyz",None)
#printing results of the above query
printResults(drop_table_if_exists_result)

#create table
create_table_result = sc._jvm.org.apache.spark.sql.util.QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "create table table_xyz (id int, name string)",None)
#printing results of the above query
printResults(create_table_result)

#truncate table
truncate_table_result = sc._jvm.org.apache.spark.sql.util.QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "truncate table table_xyz",None)
#printing results of the above query
printResults(truncate_table_result)

#drop table
drop_table_result = sc._jvm.org.apache.spark.sql.util.QuboleSQLUtils.runSnowflakeQuery("snowflake-test_231", "load_wh", "TEST_DB", "drop table table_xyz",None)
#printing results of the above query
printResults(drop_table_result)