Upgrade Considerations for Spark 3.3.2

When upgrading to Spark 3.3.2, you should review the following considerations and perform the necessary steps.

Spark core

Features

Upgrades

log4j

Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x is no longer supported by the community. Users should rewrite original log4j properties files using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the conf/log4j.properties.template which is included in the Spark distribution, to conf/log4j2.properties.template with log4j2 properties format.

read file without a schema

spark.scheduler.allocation.file supports reading remote files using hadoop filesystem, which means that if the path has no scheme, Spark will respect hadoop configuration to read it.

RDD/Partions

spark.hadoopRDD.ignoreEmptySplits is set to true by default, which means Spark will not create empty partitions for empty input splits.

commression codec

spark.eventLog.compression.codec is set to zstd by default, which means Spark will not fallback to use spark.io.compression.codec anymore.

storage replication proactive

spark.storage.replication.proactive is enabled by default, which means Spark will try to replenish in case of losing cached RDD block replicas due to executor failures. Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x is no longer.

depreciated property

spark.launcher.childConectionTimeout is deprecated though still works. Use spark.launcher.childConnectionTimeout instead.

Apache Mesos

Support for Apache Mesos as a resource manager is deprecated.

kubernetes driver

Spark will delete the K8s driver service resource when the application terminates itself.

SparkContext in executors

An exception will be thrown when creating SparkContext in executors. You can allow it by setting the configuration spark.executor.allowSparkContext when creating SparkContext in executors.

Hadoop classpath

It does not propagate the Hadoop classpath from yarn.application.classpath and mapreduce.application.classpath into the Spark application submitted to YARN. This happens when the Spark distribution includes the built-in Hadoop to prevent a failure from different transitive dependencies picked up from the Hadoop cluster, such as Guava and Jackson.

SparkR (R on Spark)

Features

Upgrades

methods for parquet

The deprecated methods parquetFile, saveAsParquetFile, jsonFile, jsonRDD have been removed. Use read.parquet, write.parquet, read.json instead (upgraded in Spark 3.0).

automatic download and install

Now, SparkR does not automatically download and install the Spark distribution in the user’s cache. Instead, it requests permission to download and install it.

PySpark

Features

Upgrades

Error Handling in PySpark Methods

PySpark methods from the sql, ml, and spark_on_pandas modules now raise a TypeError instead of a ValueError when applied to parameters of an inappropriate type.

Simplified Traceback for Python UDFs

The traceback for Python UDFs, pandas UDFs, and pandas function APIs is simplified by default. The internal Python worker’s traceback is no longer printed, which helps reduce noise in error logs.

Pinned Thread Mode Enabled by Default

Pinned thread mode is available by default. This mode maps each Python thread to a corresponding JVM thread. Previously, a single JVM thread could be reused for multiple Python threads, leading to shared local JVM thread data across multiple Python threads.

SQL, Datasets and DataFrame

Features

Upgrades

Statistical Functions and Aggregate Behavior

Functions like std, stddev, variance, covar_samp, etc., now return NULL instead of Double.NaN when a DivideByZero occurs (e.g., when applying stddev_samp on a single-element set). To revert to the old behavior, set spark.sql.legacy.statisticalAggregate = true.

Enhanced error handling for statistical functions to ensure operations are predictable regarding NULL handling. Improvements include that covar_samp returns NULL for insufficient data instead of erroneous results and consistent treatment of edge cases in variance calculations.

Further performance enhancements for aggregate operations on larger datasets and improved execution plans for statistical functions.

Return Types and Handling of Data Types

grouping_id() now returns long instead of int. Revert it by setting spark.sql.legacy.integerGroupingId = true.

Support for CHAR and VARCHAR types in the table schema has been introduced. Table scan/insertion will respect the CHAR/VARCHAR semantic. If CHAR/VARCHAR types are used in places other than the table schema, an exception will be thrown. The CAST operation remains an exception that treats CHAR/VARCHAR as string, similar to earlier versions. Revert it by setting spark.sql.legacy.charVarcharAsString = true.

Improvements in handling complex data structures. Improving the ability to work with nested types, arrays, maps, and structs more efficiently.

Datetime Functions

Functions like from_unixtime, unix_timestamp, etc., fail if the specified datetime pattern is invalid (previously it returned NULL).

Enhanced consistency in error handling across all datetime functions. This ensures that invalid datetime inputs are handled uniformly, improving predictability and reliability in datetime operations.

Path Options and DataFrame Operations

The path option cannot coexist with specified path parameters in methods like DataFrameReader.load() and DataFrameWriter.save(). Set spark.sql.legacy.pathOptionBehavior.enabled = true to revert it.

Enhanced error messages for clarity regarding path option issues.

JSON and CSV Functions

schema_of_json and schema_of_csv now return quoted field names.

JSON datasource infers TimestampType from string values by setting the option inferTimestamp = true.

Optimizations for JSON and CSV handling improve overall performance.

Handling of NULL Values and Complex TypesHandling of NULL Values and Complex Types

NULL elements in structs, arrays, and maps are converted to null in string casting instead of staying as empty strings. Revert it by setting spark.sql.legacy.castComplexTypesToString.enabled = true.

Consistent behavior in NULL handling across all complex types.

Error Handling and Exception Management

Specific exceptions are thrown for Hive external catalog operations (PartitionsAlreadyExistException and NoSuchPartitionsException). IllegalArgumentException is now returned for incomplete interval literals (e.g., INTERVAL ‘1’, INTERVAL ‘1 DAY 2’), which were previously treated as NULL.

Enhanced exception handling across various operations.

View Management, Table Management and SQL Configurations

Permanent views capture runtime SQL configs and store them as view properties. Temporary views created via CACHE TABLE … AS SELECT behave similarly, affecting parsing and analysis phases. Refreshing a table triggers an uncache operation for all other caches that reference the table, even if the table itself is not cached. This behavior differs from Spark 3.0, where the operation only triggered if the table itself was cached.

Performance improvements in view resolution.

Timestamp Handling

Timestamps before 1900-01-01 00:00:00Z fail when saved as INT96. To revert to old behavior, set spark.sql.legacy.parquet.int96RebaseModeInRead or spark.sql.legacy.parquet.int96RebaseModeInWrite to LEGACY.

Ongoing enhancements for timestamp handling, focusing on improving overall robustness and compatibility.

Ongoing enhancements for robust timestamp handling, further refining how timestamps are processed and managed. When the date or timestamp pattern is not specified, Spark converts an input string to a date/timestamp using the CAST expression approach. This change affects CSV and JSON data sources and parsing of partition values.Spark recognizes various date and timestamp patterns, including: o Date Patterns:

  • [+-]yyyy* <br>

  • [+-]yyyy*-[m]m

  • [+-]yyyy*-[m]m-[d]d

  • Additional variations with T*.

o Timestamp Patterns: Various combinations of year, month, day, hour, minute, second, and optional zone identifiers.

Hive Compatibility

Migration to Hive 2.3 is necessary because of the built-in Hive 1.2 removal. Refer to HIVE-15167 for details.

Database Output Schema

Output schema of SHOW DATABASES changes to namespace: string from databaseName: string. Restore old schema by setting spark.sql.legacy.keepCommandOutputSchema = true.

Null Partition Handling

PARTITION(col=null) is parsed as a null literal in the partition specification. Previously parsed as a string literal if the partition column was of string type. Restore legacy behavior by setting spark.sql.legacy.parseNullPartitionSpecAsStringLiteral = true.

Structured Streaming

Features

Upgrades

Stateful Operators

Queries with stateful operations that can emit rows older than the current watermark plus the allowed late record delay (referred to as late rows in downstream operations) are supported. Spark will now throw an AnalysisException by default to address potential correctness issues. The earlier Spark only printed a warning message for such cases. Disable this check by setting spark.sql.streaming.statefulOperator.checkCorrectness.enabled = false.

All stateful operators now require hash partitioning with exact grouping keys. In earlier versions, all stateful operators except stream-stream joins allowed looser partitioning, which could lead to correctness issues. The old behavior is still supported for backward compatibility when using a checkpoint built from older versions (see SPARK-38204).

Kafka Offset Fetching Mechanism

depreciated property

A new configuration, spark.sql.streaming.kafka.useDeprecatedOffsetFetching (default: true), was introduced to allow users to enable a new offset fetching mechanism using Kafka’s AdminClient. By setting this option to false, Spark will no longer use KafkaConsumer for offset fetching, which could previously cause infinite waits in the driver.

For more information about upgrading to Spark 3.3, see: