Skip to content
29 changes: 28 additions & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,30 @@ def parquet(self, *paths, **options):
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
datetimeRebaseMode : str, optional
the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``,
``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar.

* ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps
that are ambiguous between the two calendars.
* ``CORRECTED``: loading of dates/timestamps without rebasing.
* ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian
to Proleptic Gregorian calendar.

If None is set, the value of the SQL config
``spark.sql.legacy.parquet.datetimeRebaseModeInRead`` is used by default.
int96RebaseMode : str, optional
the rebasing mode for ``INT96`` timestamps from the Julian to
Proleptic Gregorian calendar.

* ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps
that are ambiguous between the two calendars.
* ``CORRECTED``: loading of ``INT96`` timestamps without rebasing.
* ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian
to Proleptic Gregorian calendar.

If None is set, the value of the SQL config
``spark.sql.legacy.parquet.int96RebaseModeInRead`` is used by default.

Examples
--------
Expand All @@ -451,9 +475,12 @@ def parquet(self, *paths, **options):
modifiedBefore = options.get('modifiedBefore', None)
modifiedAfter = options.get('modifiedAfter', None)
recursiveFileLookup = options.get('recursiveFileLookup', None)
datetimeRebaseMode = options.get('datetimeRebaseMode', None)
int96RebaseMode = options.get('int96RebaseMode', None)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
modifiedAfter=modifiedAfter)
modifiedAfter=modifiedAfter, datetimeRebaseMode=datetimeRebaseMode,
int96RebaseMode=int96RebaseMode)

return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

Expand Down
30 changes: 28 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N
else:
raise TypeError("path can be only a single string")

def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
datetimeRebaseMode=None, int96RebaseMode=None):
"""
Loads a Parquet file stream, returning the result as a :class:`DataFrame`.

Expand All @@ -688,6 +689,30 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook
recursively scan a directory for files. Using this option
disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
datetimeRebaseMode : str, optional
the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``,
``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar.

* ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps
that are ambiguous between the two calendars.
* ``CORRECTED``: loading of dates/timestamps without rebasing.
* ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian
to Proleptic Gregorian calendar.

If None is set, the value of the SQL config
``spark.sql.legacy.parquet.datetimeRebaseModeInRead`` is used by default.
int96RebaseMode : str, optional
the rebasing mode for ``INT96`` timestamps from the Julian to
Proleptic Gregorian calendar.

* ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps
that are ambiguous between the two calendars.
* ``CORRECTED``: loading of ``INT96`` timestamps without rebasing.
* ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian
to Proleptic Gregorian calendar.

If None is set, the value of the SQL config
``spark.sql.legacy.parquet.int96RebaseModeInRead`` is used by default.

Examples
--------
Expand All @@ -698,7 +723,8 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook
True
"""
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
recursiveFileLookup=recursiveFileLookup,
datetimeRebaseMode=datetimeRebaseMode, int96RebaseMode=int96RebaseMode)
if isinstance(path, str):
return self._df(self._jreader.parquet(path))
else:
Expand Down
23 changes: 23 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,29 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li>
* <li>`datetimeRebaseMode` (default is the value specified in the SQL config
* `spark.sql.legacy.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values
* of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to
* Proleptic Gregorian calendar:
* <ul>
* <li>`EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous
* between the two calendars</li>
* <li>`CORRECTED` : loading of dates/timestamps without rebasing</li>
* <li>`LEGACY` : perform rebasing of ancient dates/timestamps from the Julian to Proleptic
* Gregorian calendar</li>
* </ul>
* </li>
* <li>`int96RebaseMode` (default is the value specified in the SQL config
* `spark.sql.legacy.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps
* from the Julian to Proleptic Gregorian calendar:
* <ul>
* <li>`EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous
* between the two calendars</li>
* <li>`CORRECTED` : loading of timestamps without rebasing</li>
* <li>`LEGACY` : perform rebasing of ancient `INT96` timestamps from the Julian to Proleptic
* Gregorian calendar</li>
* </ul>
* </li>
* </ul>
*
* @since 1.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ class ParquetFileFormat
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand Down Expand Up @@ -301,10 +304,10 @@ class ParquetFileFormat

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
datetimeRebaseModeInRead)
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
int96RebaseModeInRead)

val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ class ParquetOptions(
.get(MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sqlConf.isParquetSchemaMergingEnabled)

/**
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
*/
def datetimeRebaseModeInRead: String = parameters
.get(DATETIME_REBASE_MODE)
.getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
/**
* The rebasing mode for INT96 timestamp values in reads.
*/
def int96RebaseModeInRead: String = parameters
.get(INT96_REBASE_MODE)
.getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
}


Expand All @@ -89,4 +102,16 @@ object ParquetOptions {
def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
}

// The option controls rebasing of the DATE and TIMESTAMP values between
// Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet
// datasource similarly to the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead`,
// and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`.
val DATETIME_REBASE_MODE = "datetimeRebaseMode"

// The option controls rebasing of the INT96 timestamp values between Julian and Proleptic
// Gregorian calendars. It impacts on the behaviour of the Parquet datasource similarly to
// the SQL config `spark.sql.legacy.parquet.int96RebaseModeInRead`.
// The valid option values are: `EXCEPTION`, `LEGACY` or `CORRECTED`.
val INT96_REBASE_MODE = "int96RebaseMode"
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ import org.apache.spark.util.SerializableConfiguration
* @param readDataSchema Required schema of Parquet files.
* @param partitionSchema Schema of partitions.
* @param filters Filters to be pushed down in the batch scan.
* @param parquetOptions The options of Parquet datasource that are set for the read.
*/
case class ParquetPartitionReaderFactory(
sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
filters: Array[Filter]) extends FilePartitionReaderFactory with Logging {
filters: Array[Filter],
parquetOptions: ParquetOptions) extends FilePartitionReaderFactory with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really work? The 2 fields of ParquetOptions are transient, and become null after (de)serialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nvm, we read the confs and put it in val.

private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields)
private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
Expand All @@ -74,6 +76,8 @@ case class ParquetPartitionReaderFactory(
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead

override def supportColumnarReads(partition: InputPartition): Boolean = {
sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
Expand Down Expand Up @@ -174,10 +178,10 @@ case class ParquetPartitionReaderFactory(
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
datetimeRebaseModeInRead)
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
int96RebaseModeInRead)
val reader = buildReaderFunc(
split,
file.partitionValues,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2.parquet

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetInputFormat
Expand All @@ -24,7 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
Expand Down Expand Up @@ -76,8 +78,15 @@ case class ParquetScan(

val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
ParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, pushedFilters)
val sqlConf = sparkSession.sessionState.conf
ParquetPartitionReaderFactory(
sqlConf,
broadcastedConf,
dataSchema,
readDataSchema,
readPartitionSchema,
pushedFilters,
new ParquetOptions(options.asCaseSensitiveMap.asScala.toMap, sqlConf))
}

override def equals(obj: Any): Boolean = obj match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,29 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* It does not change the behavior of partition discovery.</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li>
* <li>`datetimeRebaseMode` (default is the value specified in the SQL config
* `spark.sql.legacy.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values
* of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to
* Proleptic Gregorian calendar:
* <ul>
* <li>`EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous
* between the two calendars</li>
* <li>`CORRECTED` : loading of dates/timestamps without rebasing</li>
* <li>`LEGACY` : perform rebasing of ancient dates/timestamps from the Julian to Proleptic
* Gregorian calendar</li>
* </ul>
* </li>
* <li>`int96RebaseMode` (default is the value specified in the SQL config
* `spark.sql.legacy.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps
* from the Julian to Proleptic Gregorian calendar:
* <ul>
* <li>`EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous
* between the two calendars</li>
* <li>`CORRECTED` : loading of timestamps without rebasing</li>
* <li>`LEGACY` : perform rebasing of ancient `INT96` timestamps from the Julian to Proleptic
* Gregorian calendar</li>
* </ul>
* </li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException}
import org.apache.spark.sql.{QueryTest, Row, SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, ParquetOutputTimestampType}
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, EXCEPTION, LEGACY}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.test.SharedSparkSession

abstract class ParquetRebaseDatetimeSuite
Expand Down Expand Up @@ -97,6 +97,27 @@ abstract class ParquetRebaseDatetimeSuite
}
}

private def inReadConfToOptions(
conf: String,
mode: LegacyBehaviorPolicy.Value): Map[String, String] = conf match {
case SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key =>
Map(ParquetOptions.INT96_REBASE_MODE -> mode.toString)
case _ => Map(ParquetOptions.DATETIME_REBASE_MODE -> mode.toString)
}

private def runInMode(
conf: String,
modes: Seq[LegacyBehaviorPolicy.Value])(f: Map[String, String] => Unit): Unit = {
modes.foreach { mode =>
withSQLConf(conf -> mode.toString) { f(Map.empty) }
}
withSQLConf(conf -> EXCEPTION.toString) {
modes.foreach { mode =>
f(inReadConfToOptions(conf, mode))
}
}
}

test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") {
val N = 8
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
Expand Down Expand Up @@ -132,9 +153,9 @@ abstract class ParquetRebaseDatetimeSuite
}
// For Parquet files written by Spark 3.0, we know the writer info and don't need the
// config to guide the rebase behavior.
withSQLConf(inReadConf -> LEGACY.toString) {
runInMode(inReadConf, Seq(LEGACY)) { options =>
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
spark.read.format("parquet").options(options).load(path2_4, path3_0, path3_0_rebase),
(0 until N).flatMap { i =>
val (dictS, plainS) = rowFunc(i)
Seq.tabulate(3) { _ =>
Expand Down Expand Up @@ -235,12 +256,10 @@ abstract class ParquetRebaseDatetimeSuite
withAllParquetReaders {
// The file metadata indicates if it needs rebase or not, so we can always get the
// correct result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(inReadConf -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
}
runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).parquet(path),
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
}

// Force to not rebase to prove the written datetime values are rebased
Expand Down Expand Up @@ -275,12 +294,12 @@ abstract class ParquetRebaseDatetimeSuite
withAllParquetReaders {
// The file metadata indicates if it needs rebase or not, so we can always get the
// correct result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the wrong SQL conf: LEGACY_AVRO_REBASE_MODE_IN_READ

checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
}
runInMode(
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key,
Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).parquet(path),
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
}

// Force to not rebase to prove the written datetime values are rebased and we will get
Expand Down