diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 6a328f4be2..6c7209e5f2 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -316,6 +316,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { } case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport { + + // this class is intended to be used with a specific scan impl + assert(scanImpl != CometConf.SCAN_AUTO) + override def isTypeSupported( dt: DataType, name: String, diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 9976ecd748..ccece3dde9 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -232,43 +232,47 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("array_contains - test all types (native Parquet reader)") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - DataGenOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = false)) - } - val table = spark.read.parquet(filename) - table.createOrReplaceTempView("t1") - val complexTypeFields = - table.schema.fields.filter(field => isComplexType(field.dataType)) - val primitiveTypeFields = - table.schema.fields.filterNot(field => isComplexType(field.dataType)) - for (field <- primitiveTypeFields) { - val fieldName = field.name - val typeName = field.dataType.typeName - sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") - .createOrReplaceTempView("t2") - checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2")) - checkSparkAnswerAndOperator( - sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2")) - } - for (field <- complexTypeFields) { - val fieldName = field.name - sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") - .createOrReplaceTempView("t3") - checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3")) + // TODO test fails if scan is auto + // https://github.com/apache/datafusion-comet/issues/2173 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + DataGenOptions( + allowNull = true, + generateNegativeZero = true, + generateArray = true, + generateStruct = true, + generateMap = false)) + } + val table = spark.read.parquet(filename) + table.createOrReplaceTempView("t1") + val complexTypeFields = + table.schema.fields.filter(field => isComplexType(field.dataType)) + val primitiveTypeFields = + table.schema.fields.filterNot(field => isComplexType(field.dataType)) + for (field <- primitiveTypeFields) { + val fieldName = field.name + val typeName = field.dataType.typeName + sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") + .createOrReplaceTempView("t2") + checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2")) + checkSparkAnswerAndOperator( + sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2")) + } + for (field <- complexTypeFields) { + val fieldName = field.name + sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") + .createOrReplaceTempView("t3") + checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3")) + } } } } @@ -406,9 +410,11 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("array_intersect") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + // TODO test fails if scan is auto + // https://github.com/apache/datafusion-comet/issues/2174 + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index b3fa3db8cf..fbf38e2e03 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -944,28 +944,31 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Complex Types test("cast StructType to StringType") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "tbl") { - // primitives - checkSparkAnswerAndOperator( - "SELECT CAST(struct(_1, _2, _3, _4, _5, _6, _7, _8) as string) FROM tbl") - checkSparkAnswerAndOperator("SELECT CAST(struct(_9, _10, _11, _12) as string) FROM tbl") - // decimals - // TODO add _16 when https://github.com/apache/datafusion-comet/issues/1068 is resolved - checkSparkAnswerAndOperator("SELECT CAST(struct(_15, _17) as string) FROM tbl") - // dates & timestamps - checkSparkAnswerAndOperator("SELECT CAST(struct(_18, _19, _20) as string) FROM tbl") - // named struct - checkSparkAnswerAndOperator( - "SELECT CAST(named_struct('a', _1, 'b', _2) as string) FROM tbl") - // nested struct - checkSparkAnswerAndOperator( - "SELECT CAST(named_struct('a', named_struct('b', _1, 'c', _2)) as string) FROM tbl") + // TODO test fails if scan is auto + // https://github.com/apache/datafusion-comet/issues/2175 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "tbl") { + // primitives + checkSparkAnswerAndOperator( + "SELECT CAST(struct(_1, _2, _3, _4, _5, _6, _7, _8) as string) FROM tbl") + checkSparkAnswerAndOperator( + "SELECT CAST(struct(_9, _10, _11, _12) as string) FROM tbl") + // decimals + // TODO add _16 when https://github.com/apache/datafusion-comet/issues/1068 is resolved + checkSparkAnswerAndOperator("SELECT CAST(struct(_15, _17) as string) FROM tbl") + // dates & timestamps + checkSparkAnswerAndOperator("SELECT CAST(struct(_18, _19, _20) as string) FROM tbl") + // named struct + checkSparkAnswerAndOperator( + "SELECT CAST(named_struct('a', _1, 'b', _2) as string) FROM tbl") + // nested struct + checkSparkAnswerAndOperator( + "SELECT CAST(named_struct('a', named_struct('b', _1, 'c', _2)) as string) FROM tbl") + } } } } diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 217cd322dd..ed250e141c 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -374,7 +374,10 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { Seq("native", "jvm").foreach { shuffleMode => - Seq("native_comet", "native_datafusion", "native_iceberg_compat").foreach { scanImpl => + Seq( + CometConf.SCAN_NATIVE_COMET, + CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl => super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index b5f074cc8d..b0b87509fb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -587,8 +587,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } test("fix: native Unsafe row accessors return incorrect results") { - // https://github.com/apache/datafusion-comet/issues/1538 - assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + // TODO byte/short issue + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_COMET) Seq(10, 201).foreach { numPartitions => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 2574e12e13..f21b59ac1a 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -86,67 +86,82 @@ abstract class ParquetReadSuite extends CometTestBase { } test("unsupported Spark types") { - // for native iceberg compat, CometScanExec supports some types that native_comet does not. - // note that native_datafusion does not use CometScanExec so we need not include that in - // the check - val isDataFusionScan = usingDataSourceExec(conf) - Seq( - NullType -> false, - BooleanType -> true, - ByteType -> true, - ShortType -> true, - IntegerType -> true, - LongType -> true, - FloatType -> true, - DoubleType -> true, - BinaryType -> true, - StringType -> true, - // Timestamp here arbitrary for picking a concrete data type to from ArrayType - // Any other type works - ArrayType(TimestampType) -> isDataFusionScan, - StructType( - Seq( - StructField("f1", DecimalType.SYSTEM_DEFAULT), - StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, - StructType( - Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) - .foreach { case (dt, expected) => - val fallbackReasons = new ListBuffer[String]() - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isTypeSupported(dt, "", fallbackReasons) == expected) - // usingDataFusionParquetExec does not support CometBatchScanExec yet - if (!isDataFusionScan) { - assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) + // TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET + // https://github.com/apache/datafusion-comet/issues/2188 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + // for native iceberg compat, CometScanExec supports some types that native_comet does not. + // note that native_datafusion does not use CometScanExec so we need not include that in + // the check + val isDataFusionScan = usingDataSourceExec(conf) + Seq( + NullType -> false, + BooleanType -> true, + ByteType -> true, + ShortType -> true, + IntegerType -> true, + LongType -> true, + FloatType -> true, + DoubleType -> true, + BinaryType -> true, + StringType -> true, + // Timestamp here arbitrary for picking a concrete data type to from ArrayType + // Any other type works + ArrayType(TimestampType) -> isDataFusionScan, + StructType( + Seq( + StructField("f1", DecimalType.SYSTEM_DEFAULT), + StructField("f2", StringType))) -> isDataFusionScan, + MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, + StructType( + Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, + MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) + .foreach { case (dt, expected) => + val fallbackReasons = new ListBuffer[String]() + // TODO CometScanTypeChecker should only be used for ParquetReadSuiteV1Suite + assert( + CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isTypeSupported(dt, "", fallbackReasons) == expected) + // usingDataFusionParquetExec does not support CometBatchScanExec yet + // TODO CometBatchScanExec should only be used for ParquetReadSuiteV2Suite + if (!isDataFusionScan) { + assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) + } } - } + } } test("unsupported Spark schema") { - val schemaDDLs = - Seq("f1 int, f2 boolean", "f1 int, f2 array", "f1 map, f2 array") - .map(s => StructType.fromDDL(s)) - - // Arrays support for iceberg compat native and for Parquet V1 - val cometScanExecSupported = - if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) - Seq(true, true, true) - else Seq(true, false, false) - - val cometBatchScanExecSupported = Seq(true, false, false) - val fallbackReasons = new ListBuffer[String]() - - schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isSchemaSupported(StructType(schema), fallbackReasons) == expected) - } + // TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET + // https://github.com/apache/datafusion-comet/issues/2188 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + val schemaDDLs = + Seq( + "f1 int, f2 boolean", + "f1 int, f2 array", + "f1 map, f2 array") + .map(s => StructType.fromDDL(s)) + + // Arrays support for iceberg compat native and for Parquet V1 + val cometScanExecSupported = + if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) + Seq(true, true, true) + else Seq(true, false, false) + + val cometBatchScanExecSupported = Seq(true, false, false) + val fallbackReasons = new ListBuffer[String]() + + // TODO CometScanTypeChecker should only be used for ParquetReadSuiteV1Suite + schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => + assert( + CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isSchemaSupported(StructType(schema), fallbackReasons) == expected) + } - schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => - assert( - CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) + // TODO CometBatchScanExec should only be used for ParquetReadSuiteV2Suite + schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => + assert( + CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) + } } } @@ -354,8 +369,6 @@ abstract class ParquetReadSuite extends CometTestBase { } test("test multiple pages with different sizes and nulls") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) def makeRawParquetFile( path: Path, dictionaryEnabled: Boolean, @@ -425,40 +438,42 @@ abstract class ParquetReadSuite extends CometTestBase { expected } - Seq(64, 128, 256, 512, 1024, 4096, 5000).foreach { pageSize => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "part-r-0.parquet") - val expected = makeRawParquetFile(path, dictionaryEnabled = false, 10000, pageSize) - readParquetFile(path.toString) { df => - checkAnswer( - df, - expected.map { - case None => - Row(null, null, null, null, null, null, null, null, null, null, null, null, null, - null) - case Some(i) => - val flba_field = Array.fill(3)(i % 10 + 48) // char '0' is 48 in ascii - Row( - i % 2 == 0, - i.toByte, - i.toShort, - i, - i.toLong, - i.toFloat, - i.toDouble, - i.toString * 48, - (-i).toByte, - (-i).toShort, - java.lang.Integer.toUnsignedLong(-i), - new BigDecimal(UnsignedLong.fromLongBits((-i).toLong).bigIntegerValue()), - i.toString, - flba_field) - }) - } - readParquetFile(path.toString) { df => - assert( - df.filter("_8 IS NOT NULL AND _4 % 256 == 255").count() == - expected.flatten.count(_ % 256 == 255)) + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + Seq(64, 128, 256, 512, 1024, 4096, 5000).foreach { pageSize => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + val expected = makeRawParquetFile(path, dictionaryEnabled = false, 10000, pageSize) + readParquetFile(path.toString) { df => + checkAnswer( + df, + expected.map { + case None => + Row(null, null, null, null, null, null, null, null, null, null, null, null, + null, null) + case Some(i) => + val flba_field = Array.fill(3)(i % 10 + 48) // char '0' is 48 in ascii + Row( + i % 2 == 0, + i.toByte, + i.toShort, + i, + i.toLong, + i.toFloat, + i.toDouble, + i.toString * 48, + (-i).toByte, + (-i).toShort, + java.lang.Integer.toUnsignedLong(-i), + new BigDecimal(UnsignedLong.fromLongBits((-i).toLong).bigIntegerValue()), + i.toString, + flba_field) + }) + } + readParquetFile(path.toString) { df => + assert( + df.filter("_8 IS NOT NULL AND _4 % 256 == 255").count() == + expected.flatten.count(_ % 256 == 255)) + } } } } @@ -1330,8 +1345,6 @@ abstract class ParquetReadSuite extends CometTestBase { } test("scan metrics") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) val cometScanMetricNames = Seq( "ParquetRowGroups", @@ -1351,30 +1364,34 @@ abstract class ParquetReadSuite extends CometTestBase { "time_elapsed_scanning_until_data") withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") { - val df = sql("SELECT * FROM tbl WHERE _1 > 0") - val scans = df.queryExecution.executedPlan collect { - case s: CometScanExec => s - case s: CometBatchScanExec => s - case s: CometNativeScanExec => s - } - assert(scans.size == 1, s"Expect one scan node but found ${scans.size}") - val metrics = scans.head.metrics + // TODO need to implement metrics for SCAN_NATIVE_ICEBERG_COMPAT + // https://github.com/apache/datafusion-comet/issues/1882 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + val df = sql("SELECT * FROM tbl WHERE _1 > 0") + val scans = df.queryExecution.executedPlan collect { + case s: CometScanExec => s + case s: CometBatchScanExec => s + case s: CometNativeScanExec => s + } + assert(scans.size == 1, s"Expect one scan node but found ${scans.size}") + val metrics = scans.head.metrics - val metricNames = scans.head match { - case _: CometNativeScanExec => cometNativeScanMetricNames - case _ => cometScanMetricNames - } + val metricNames = scans.head match { + case _: CometNativeScanExec => cometNativeScanMetricNames + case _ => cometScanMetricNames + } - metricNames.foreach { metricName => - assert(metrics.contains(metricName), s"metric $metricName was not found") - } + metricNames.foreach { metricName => + assert(metrics.contains(metricName), s"metric $metricName was not found") + } - df.collect() + df.collect() - metricNames.foreach { metricName => - assert( - metrics(metricName).value > 0, - s"Expect metric value for $metricName to be positive") + metricNames.foreach { metricName => + assert( + metrics(metricName).value > 0, + s"Expect metric value for $metricName to be positive") + } } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index cf11bdf590..35fb0681a8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -82,10 +82,6 @@ abstract class CometTestBase conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") - // set the scan impl to SCAN_NATIVE_COMET because many tests are implemented - // with the assumption that this is the default and would need updating if we - // change the default - conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, CometConf.SCAN_NATIVE_COMET) conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index cf887a1013..aa80251490 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -300,6 +300,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") + conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, CometConf.SCAN_NATIVE_COMET) conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala index a988467076..bdb4a9d4b1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala @@ -40,6 +40,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { test("reading ancient dates before 1582") { Seq(true, false).foreach { exceptionOnRebase => withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key -> exceptionOnRebase.toString) { Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion => @@ -65,6 +66,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { assume(!usingDataSourceExec(conf)) Seq(true, false).foreach { exceptionOnRebase => withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key -> exceptionOnRebase.toString) { Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion => @@ -91,6 +93,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { assume(!usingDataSourceExec(conf)) Seq(true, false).foreach { exceptionOnRebase => withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key -> exceptionOnRebase.toString) { Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion =>