Connecting to Redshift Data Source from Spark

Spark on Qubole supports the Spark Redshift connector, which is a library that lets you load data from Amazon Redshift tables into Spark SQL DataFrames, and write data back to Redshift tables. Amazon S3 is used to efficiently transfer data in and out of Redshift, and a Redshift JDBC is used to automatically trigger the appropriate COPY and UNLOAD commands on Redshift. As a result, it requires AWS credentials with read and write access to a S3 bucket (specified using the tempdir configuration parameter).

Note

The Spark Redshift connector is supported on Spark 2.4 and later versions.

Note

This feature is not enabled for all users by default. Create a ticket with Qubole Support to enable this feature on the QDS account.

Usage

You can use the Spark Redshift connector by using the Data Sources API in Scala, Python, R or SQL.

The following example notebooks show how to use the Spark Redshift connector.

Scala Example Notebook

// Spark Redshift connector Example Notebook - Scala

val jdbcURL = "jdbc:redshift://redshifthost:5439/database?user=username&password=pass"
val tempS3Dir = "s3://path/for/temp/data"

// Read Redshift table using dataframe apis
val df: DataFrame = spark.read
    .format("com.qubole.spark.redshift")
    .option("url", jdbcURL)
    .option("dbtable", "tbl")
    .option("tempdir", tempS3Dir)
    .load()

// Load Redshift query results in a Spark dataframe
val df: DataFrame = spark.read
    .format("com.qubole.spark.redshift")
    .option("url", jdbcURL)
    .option("query", "select col1, col2 from tbl group by col3")
    .option("forward_spark_s3_credentials", "true")
    .option("tempdir",tempS3Dir)
    .load()

// Write data to Redshift

// Create a new Redshift table with the given dataframe
// df = <dataframe that is to be written to Redshift>
df.write
  .format("com.qubole.spark.redshift")
  .option("url",jdbcURL)
  .option("dbtable", "tbl_write")
  .option("forward_spark_s3_credentials", "true")
  .option("tempdir", tempS3Dir)
  .mode("error")
  .save()

// To overwrite data in Redshift table
df.write
  .format("com.qubole.spark.redshift")
  .option("url",jdbcURL)
  .option("dbtable", "tbl_write")
  .option("forward_spark_s3_credentials", "true")
  .option("tempdir", tempS3Dir)
  .mode("overwrite")
  .save()

// Authentication
// Using IAM Role based authentication instead of keys
df.write
  .format("com.qubole.spark.redshift")
  .option("url",jdbcURL)
  .option("dbtable", "tbl")
  .option("aws_iam_role", <IAM_ROLE_ARN>)
  .option("tempdir", tempS3Dir)
  .mode("error")
  .save()

Python Example Notebook

# Spark Redshift connector Example Notebook - PySpark

 jdbcURL = "jdbc:redshift://redshifthost:5439/database?user=username&password=pass"
 tempS3Dir = "s3://path/for/temp/data"

 # Read Redshift table using dataframe apis
 df = spark.read \
       .format("com.qubole.spark.redshift") \
       .option("url", jdbcURL) \
       .option("dbtable", "tbl") \
       .option("forward_spark_s3_credentials", "true")
       .option("tempdir", tempS3Dir) \
       .load()

 # Load Redshift query results in a Spark dataframe
 df = spark.read \
       .format("com.qubole.spark.redshift") \
       .option("url", jdbcURL) \
       .option("query", "select col1, col2 from tbl group by col3") \
       .option("forward_spark_s3_credentials", "true")
       .option("tempdir",tempS3Dir) \
       .load()


 # Create a new redshift table with the given dataframe data
 # df = <dataframe that is to be written to Redshift>
 df.write \
   .format("com.qubole.spark.redshift") \
   .option("url",jdbcURL) \
   .option("dbtable", "tbl_write") \
   .option("forward_spark_s3_credentials", "true")
   .option("tempdir", tempS3Dir) \
   .mode("error") \
   .save()

 // To overwrite data in Redshift table
 df.write \
   .format("com.qubole.spark.redshift") \
   .option("url",jdbcURL) \
   .option("dbtable", "tbl_write") \
   .option("forward_spark_s3_credentials", "true")
   .option("tempdir", tempS3Dir) \
   .mode("overwrite") \
   .save()

 # Using IAM Role based authentication instead of keys
 df.write \
   .format("com.qubole.spark.redshift") \
   .option("url",jdbcURL) \
   .option("dbtable", "tbl") \
   .option("aws_iam_role", <IAM_ROLE_ARN>) \
   .option("tempdir", tempS3Dir) \
   .mode("error") \
   .save()

SparkR Example Notebook

# Spark Redshift connector Example Notebook - SparkR


 jdbcURL <- "jdbc:redshift://redshifthost:5439/database?user=username&password=pass"
 tempS3Dir <- "s3://path/for/temp/data"

 # Read Redshift table using dataframe apis
 df <- read.df(
    NULL,
    "com.qubole.spark.redshift",
    forward_spark_s3_credentials = "true",
    tempdir = tempS3Dir,
    dbtable = "tbl",
    url = jdbcURL,

 # Load Redshift query results in a Spark dataframe
 df <- read.df(
    NULL,
    "com.qubole.spark.redshift",
    forward_spark_s3_credentials = "true",
    tempdir = tempS3Dir,
    query = "select col1, col2 from tbl group by col3",
    url = jdbcURL,


 # Create a new redshift table with the given dataframe data
 # df = <dataframe that is to be written to Redshift>
 df <- write.df(df,
    NULL,
    "com.qubole.spark.redshift",
    forward_spark_s3_credentials = "true",
    tempdir = tempS3Dir,
    dbtable = "tbl_write",
    url = jdbcURL,

 # To overwrite data in Redshift table
 df <- write.df(df,
    NULL,
    "com.qubole.spark.redshift",
    forward_spark_s3_credentials = "true",
    tempdir = tempS3Dir,
    dbtable = "tbl_write",
    url = jdbcURL,
    mode = "overwrite",


 # Using IAM Role based authentication instead of keys
 df <- read.df(
    NULL,
    "com.qubole.spark.redshift",
    tempdir = tempS3Dir,
    dbtable = "tbl",
    url = jdbcURL,
    aws_iam_role = <IAM_ROLE_ARN>)

SQL Example Notebook

Note

The SQL API supports only creating of new tables. Overwriting or appending is not supported.

-- Spark Redshift connector Example Notebook - SQL

-- Read from Redshift

-- Read Redshift table using dataframe apis
CREATE TABLE tbl
USING com.qubole.spark.redshift
OPTIONS (
  dbtable 'tbl',
  forward_spark_s3_credentials 'true',
  tempdir 's3://path/for/temp/data',
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass',
)

-- Load Redshift query results in a Spark dataframe
CREATE TABLE tbl
USING com.qubole.spark.redshift
OPTIONS (
  query 'select x, count(*) from table_in_redshift group by x',
  forward_spark_s3_credentials 'true',
  tempdir 's3://path/for/temp/data',
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass',
)

-- Writing to Redshift

-- Create a new table in redshift, throws an error if a table with the same name already exists
CREATE TABLE tbl_write
USING com.qubole.spark.redshift
OPTIONS (
  dbtable 'tbl_write',
  forward_spark_s3_credentials 'true',
  tempdir 's3n://path/for/temp/data'
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
)
AS SELECT * FROM tabletosave;

-- Using IAM Role based authentication instead of keys
CREATE TABLE tbl
USING com.qubole.spark.redshift
OPTIONS (
  dbtable 'table_in_redshift',
  tempdir 's3://path/for/temp/data',
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass',
  aws_iam_role <IAM_ROLE_ARN>
)

For more information, see https://github.com/databricks/spark-redshift.