diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 661be2b9cfa08..c56c947e3da5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -182,8 +182,10 @@ class ParquetFileFormat val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead - val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName( + parquetOptions.datetimeRebaseModeInRead) + val int96RebaseModeInRead = LegacyBehaviorPolicy.withName( + parquetOptions.int96RebaseModeInRead) // Should always be set by FileSourceScanExec creating this. // Check conf before checking option, to allow working around an issue by changing conf. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index eaedd99d8628c..dd5669bda07c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf /** * Options for the Parquet data source. @@ -74,16 +74,15 @@ class ParquetOptions( /** * The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. */ - def datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters + def datetimeRebaseModeInRead: String = parameters .get(DATETIME_REBASE_MODE) - .map(LegacyBehaviorPolicy.withName) - .getOrElse(sqlConf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ)) + .getOrElse(sqlConf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ).toString) /** * The rebasing mode for INT96 timestamp values in reads. */ - def int96RebaseModeInRead: LegacyBehaviorPolicy.Value = parameters - .get(INT96_REBASE_MODE).map(LegacyBehaviorPolicy.withName) - .getOrElse(sqlConf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ)) + def int96RebaseModeInRead: String = parameters + .get(INT96_REBASE_MODE) + .getOrElse(sqlConf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ).toString) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 70ae8068a03a0..4674320e8498a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -81,8 +81,10 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead - private val int96RebaseModeInRead = options.int96RebaseModeInRead + private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName( + options.datetimeRebaseModeInRead) + private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName( + options.int96RebaseModeInRead) private val parquetReaderCallback = new ParquetReaderCallback()