Connecting to Redshift Data Source from Spark

Spark on Qubole supports the Spark Redshift connector, which is a library that lets you load data from Amazon Redshift tables into Spark SQL DataFrames, and write data back to Redshift tables. Amazon S3 is used to efficiently transfer data in and out of Redshift, and a Redshift JDBC is used to automatically trigger the appropriate COPY and UNLOAD commands on Redshift. As a result, it requires AWS credentials with read and write access to a S3 bucket (specified using the tempdir configuration parameter). This is supported on Scala and Python.

The Spark Redshift connector is supported on Spark 2.4 and later versions, and the supported AWS Redshift JDBC jar version is com.amazon.redshift.jdbc42-1.2.36.1060.

Note

This feature is not enabled for all users by default. Create a ticket with Qubole Support to enable this feature on the QDS account.

Usage

You can use the Spark Redshift connector to load data from and write back data to Redshift tables in the following ways:

  • Creating a Redshift Data Store and using the Catalog Name for the configuration.

  • Adding the Redshift configuration details inline.

You can use the Data Sources API in Scala, Python, R or SQL languages.

Creating a Redshift Data Store

  1. Navigate to the Explore UI.

  2. Click on the drop-down list near the top left of the page (it defaults to Qubole Hive) and choose +Add Data Store.

  3. Select Redshift from the Database Type drop-down list.

  4. Enter the appropriate values in the following fields:

    • Data Store Name

    • Catalog Name

    • Database Name

    • Host Address

    • Port

    • Username and Password.

  5. Click Save.

  6. After the data store is created, restart the cluster for the changes to take effect.

The following figure shows Connect to a data store page with sample values for adding a Redshift data store.

../../../_images/ds-redshift.png

The following example shows how to connect and read data from the Redshift Data store.

val catalogName = "test-catalog"
val  tempS3Dir = "s3://path/for/temp/data"
val sQuery = """SELECT * from event"""
val df = spark.read.option("forward_spark_s3_credentials", "true").option("query", sQuery).redshift(catalogName, tempS3Dir)
df.show

Examples with Redshift Configuration Details Inline

The following example notebooks show how to use the Spark Redshift connector with configuration details inline.

// Spark Redshift connector Example Notebook - Scala

val jdbcURL = "jdbc:redshift://redshifthost:5439/database?user=username&password=pass"
val tempS3Dir = "s3://path/for/temp/data"

// Read Redshift table using dataframe apis
val df: DataFrame = spark.read
    .format("com.qubole.spark.redshift")
    .option("url", jdbcURL)
    .option("dbtable", "tbl")
    .option("tempdir", tempS3Dir)
    .load()

// Load Redshift query results in a Spark dataframe
val df: DataFrame = spark.read
    .format("com.qubole.spark.redshift")
    .option("url", jdbcURL)
    .option("query", "select col1, col2 from tbl group by col3")
    .option("forward_spark_s3_credentials", "true")
    .option("tempdir",tempS3Dir)
    .load()

// Write data to Redshift

// Create a new Redshift table with the given dataframe
// df = <dataframe that is to be written to Redshift>
df.write
  .format("com.qubole.spark.redshift")
  .option("url",jdbcURL)
  .option("dbtable", "tbl_write")
  .option("forward_spark_s3_credentials", "true")
  .option("tempdir", tempS3Dir)
  .mode("error")
  .save()

// To overwrite data in Redshift table
df.write
  .format("com.qubole.spark.redshift")
  .option("url",jdbcURL)
  .option("dbtable", "tbl_write")
  .option("forward_spark_s3_credentials", "true")
  .option("tempdir", tempS3Dir)
  .mode("overwrite")
  .save()

// Authentication
// Using IAM Role based authentication instead of keys
df.write
  .format("com.qubole.spark.redshift")
  .option("url",jdbcURL)
  .option("dbtable", "tbl")
  .option("aws_iam_role", <IAM_ROLE_ARN>)
  .option("tempdir", tempS3Dir)
  .mode("error")
  .save()
# Spark Redshift connector Example Notebook - PySpark

 jdbcURL = "jdbc:redshift://redshifthost:5439/database?user=username&password=pass"
 tempS3Dir = "s3://path/for/temp/data"

 # Read Redshift table using dataframe apis
 df = spark.read \
       .format("com.qubole.spark.redshift") \
       .option("url", jdbcURL) \
       .option("dbtable", "tbl") \
       .option("forward_spark_s3_credentials", "true")
       .option("tempdir", tempS3Dir) \
       .load()

 # Load Redshift query results in a Spark dataframe
 df = spark.read \
       .format("com.qubole.spark.redshift") \
       .option("url", jdbcURL) \
       .option("query", "select col1, col2 from tbl group by col3") \
       .option("forward_spark_s3_credentials", "true")
       .option("tempdir",tempS3Dir) \
       .load()


 # Create a new redshift table with the given dataframe data
 # df = <dataframe that is to be written to Redshift>
 df.write \
   .format("com.qubole.spark.redshift") \
   .option("url",jdbcURL) \
   .option("dbtable", "tbl_write") \
   .option("forward_spark_s3_credentials", "true")
   .option("tempdir", tempS3Dir) \
   .mode("error") \
   .save()

 // To overwrite data in Redshift table
 df.write \
   .format("com.qubole.spark.redshift") \
   .option("url",jdbcURL) \
   .option("dbtable", "tbl_write") \
   .option("forward_spark_s3_credentials", "true")
   .option("tempdir", tempS3Dir) \
   .mode("overwrite") \
   .save()

 # Using IAM Role based authentication instead of keys
 df.write \
   .format("com.qubole.spark.redshift") \
   .option("url",jdbcURL) \
   .option("dbtable", "tbl") \
   .option("aws_iam_role", <IAM_ROLE_ARN>) \
   .option("tempdir", tempS3Dir) \
   .mode("error") \
   .save()
# Spark Redshift connector Example Notebook - SparkR


 jdbcURL <- "jdbc:redshift://redshifthost:5439/database?user=username&password=pass"
 tempS3Dir <- "s3://path/for/temp/data"

 # Read Redshift table using dataframe apis
 df <- read.df(
    NULL,
    "com.qubole.spark.redshift",
    forward_spark_s3_credentials = "true",
    tempdir = tempS3Dir,
    dbtable = "tbl",
    url = jdbcURL,

 # Load Redshift query results in a Spark dataframe
 df <- read.df(
    NULL,
    "com.qubole.spark.redshift",
    forward_spark_s3_credentials = "true",
    tempdir = tempS3Dir,
    query = "select col1, col2 from tbl group by col3",
    url = jdbcURL,


 # Create a new redshift table with the given dataframe data
 # df = <dataframe that is to be written to Redshift>
 df <- write.df(df,
    NULL,
    "com.qubole.spark.redshift",
    forward_spark_s3_credentials = "true",
    tempdir = tempS3Dir,
    dbtable = "tbl_write",
    url = jdbcURL,

 # To overwrite data in Redshift table
 df <- write.df(df,
    NULL,
    "com.qubole.spark.redshift",
    forward_spark_s3_credentials = "true",
    tempdir = tempS3Dir,
    dbtable = "tbl_write",
    url = jdbcURL,
    mode = "overwrite",


 # Using IAM Role based authentication instead of keys
 df <- read.df(
    NULL,
    "com.qubole.spark.redshift",
    tempdir = tempS3Dir,
    dbtable = "tbl",
    url = jdbcURL,
    aws_iam_role = <IAM_ROLE_ARN>)

Note

The SQL API supports only creating of new tables. Overwriting or appending is not supported.

-- Spark Redshift connector Example Notebook - SQL

-- Read from Redshift

-- Read Redshift table using dataframe apis
CREATE TABLE tbl
USING com.qubole.spark.redshift
OPTIONS (
  dbtable 'tbl',
  forward_spark_s3_credentials 'true',
  tempdir 's3://path/for/temp/data',
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass',
)

-- Load Redshift query results in a Spark dataframe
CREATE TABLE tbl
USING com.qubole.spark.redshift
OPTIONS (
  query 'select x, count(*) from table_in_redshift group by x',
  forward_spark_s3_credentials 'true',
  tempdir 's3://path/for/temp/data',
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass',
)

-- Writing to Redshift

-- Create a new table in redshift, throws an error if a table with the same name already exists
CREATE TABLE tbl_write
USING com.qubole.spark.redshift
OPTIONS (
  dbtable 'tbl_write',
  forward_spark_s3_credentials 'true',
  tempdir 's3n://path/for/temp/data'
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
)
AS SELECT * FROM tabletosave;

-- Using IAM Role based authentication instead of keys
CREATE TABLE tbl
USING com.qubole.spark.redshift
OPTIONS (
  dbtable 'table_in_redshift',
  tempdir 's3://path/for/temp/data',
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass',
  aws_iam_role <IAM_ROLE_ARN>
)

For more information, see https://github.com/databricks/spark-redshift.