From 0d5d1c331fb8681ba5f299e31d5f6ca7139ca873 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Aug 2025 00:43:28 +0800 Subject: [PATCH 1/3] revert API changes of rebase methods in AvroOptions and DataSourceUtils --- .../scala/org/apache/spark/sql/avro/AvroOptions.scala | 8 ++++---- .../sql/execution/datasources/DataSourceUtils.scala | 8 ++++---- .../datasources/parquet/ParquetFileFormat.scala | 8 +++----- .../v2/parquet/ParquetPartitionReaderFactory.scala | 8 +++----- .../streaming/state/RocksDBStateEncoder.scala | 11 ++++++++--- 5 files changed, 22 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index ab3607d1bd7a7..da42333fad0fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf /** * Options for Avro Reader and Writer stored in case insensitive manner. @@ -129,9 +129,9 @@ private[sql] class AvroOptions( /** * The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. */ - val datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters - .get(DATETIME_REBASE_MODE).map(LegacyBehaviorPolicy.withName) - .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ)) + val datetimeRebaseModeInRead: String = parameters + .get(DATETIME_REBASE_MODE) + .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ).toString) val useStableIdForUnionType: Boolean = parameters.get(STABLE_ID_FOR_UNION_TYPE).map(_.toBoolean).getOrElse(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 10cfe9f145f6e..d43c9eab0a5ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -128,7 +128,7 @@ object DataSourceUtils extends PredicateHelper { private def getRebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value, + modeByConfig: String, minVersion: String, metadataKey: String): RebaseSpec = { val policy = if (Utils.isTesting && @@ -146,7 +146,7 @@ object DataSourceUtils extends PredicateHelper { } else { LegacyBehaviorPolicy.CORRECTED } - }.getOrElse(modeByConfig) + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) } policy match { case LegacyBehaviorPolicy.LEGACY => @@ -157,7 +157,7 @@ object DataSourceUtils extends PredicateHelper { def datetimeRebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = { + modeByConfig: String): RebaseSpec = { getRebaseSpec( lookupFileMeta, modeByConfig, @@ -167,7 +167,7 @@ object DataSourceUtils extends PredicateHelper { def int96RebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = { + modeByConfig: String): RebaseSpec = { getRebaseSpec( lookupFileMeta, modeByConfig, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c56c947e3da5b..661be2b9cfa08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -182,10 +182,8 @@ class ParquetFileFormat val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName( - parquetOptions.datetimeRebaseModeInRead) - val int96RebaseModeInRead = LegacyBehaviorPolicy.withName( - parquetOptions.int96RebaseModeInRead) + val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead // Should always be set by FileSourceScanExec creating this. // Check conf before checking option, to allow working around an issue by changing conf. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 4674320e8498a..70ae8068a03a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -81,10 +81,8 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName( - options.datetimeRebaseModeInRead) - private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName( - options.int96RebaseModeInRead) + private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead + private val int96RebaseModeInRead = options.int96RebaseModeInRead private val parquetReaderCallback = new ParquetReaderCallback() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala index f49c79f96b9ce..d17a130a15468 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.StateStoreColumnFamilySchemaUtils import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{SCHEMA_ID_PREFIX_BYTES, STATE_ENCODING_NUM_VERSION_BYTES, STATE_ENCODING_VERSION} +import org.apache.spark.sql.internal.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform @@ -805,9 +806,13 @@ class AvroStateEncoder( private def getAvroDeserializer(schema: StructType): AvroDeserializer = { val avroType = SchemaConverters.toAvroTypeWithDefaults(schema) val avroOptions = AvroOptions(Map.empty) - new AvroDeserializer(avroType, schema, - avroOptions.datetimeRebaseModeInRead, avroOptions.useStableIdForUnionType, - avroOptions.stableIdPrefixForUnionType, avroOptions.recursiveFieldMaxDepth) + new AvroDeserializer( + avroType, + schema, + LegacyBehaviorPolicy.withName(avroOptions.datetimeRebaseModeInRead), + avroOptions.useStableIdForUnionType, + avroOptions.stableIdPrefixForUnionType, + avroOptions.recursiveFieldMaxDepth) } /** From eeba8adbd212f2dcbdb6ee77c615157d8a6b73ad Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Aug 2025 11:52:35 +0800 Subject: [PATCH 2/3] Update AvroDeserializer.scala --- .../scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 65fafb5a34c6e..f66b5bd988c2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -58,7 +58,7 @@ private[sql] class AvroDeserializer( def this( rootAvroType: Schema, rootCatalystType: DataType, - datetimeRebaseMode: LegacyBehaviorPolicy.Value, + datetimeRebaseMode: String, useStableIdForUnionType: Boolean, stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int) = { @@ -66,7 +66,7 @@ private[sql] class AvroDeserializer( rootAvroType, rootCatalystType, positionalFieldMatch = false, - RebaseSpec(datetimeRebaseMode), + RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)), new NoopFilters, useStableIdForUnionType, stableIdPrefixForUnionType, From 55134f06a7cfc517515391eba6a4aa13917bbf74 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Aug 2025 11:54:02 +0800 Subject: [PATCH 3/3] Update RocksDBStateEncoder.scala --- .../streaming/state/RocksDBStateEncoder.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala index d17a130a15468..f49c79f96b9ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.StateStoreColumnFamilySchemaUtils import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{SCHEMA_ID_PREFIX_BYTES, STATE_ENCODING_NUM_VERSION_BYTES, STATE_ENCODING_VERSION} -import org.apache.spark.sql.internal.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform @@ -806,13 +805,9 @@ class AvroStateEncoder( private def getAvroDeserializer(schema: StructType): AvroDeserializer = { val avroType = SchemaConverters.toAvroTypeWithDefaults(schema) val avroOptions = AvroOptions(Map.empty) - new AvroDeserializer( - avroType, - schema, - LegacyBehaviorPolicy.withName(avroOptions.datetimeRebaseModeInRead), - avroOptions.useStableIdForUnionType, - avroOptions.stableIdPrefixForUnionType, - avroOptions.recursiveFieldMaxDepth) + new AvroDeserializer(avroType, schema, + avroOptions.datetimeRebaseModeInRead, avroOptions.useStableIdForUnionType, + avroOptions.stableIdPrefixForUnionType, avroOptions.recursiveFieldMaxDepth) } /**