From 8f26e806c5e862c6aaad22d4e962f308c3295201 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jan 2026 15:00:33 -0700 Subject: [PATCH 1/3] enable native_datafusion in auto scan mode --- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 4291e3fb65..e4e047453f 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -165,8 +165,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com COMET_NATIVE_SCAN_IMPL.get() match { case SCAN_AUTO => - // TODO add support for native_datafusion in the future - nativeIcebergCompatScan(session, scanExec, r, hadoopConf) + nativeDataFusionScan(session, scanExec, r, hadoopConf) + .orElse(nativeIcebergCompatScan(session, scanExec, r, hadoopConf)) .getOrElse(scanExec) case SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) From c9a505201531e3160de2e8b8bdaca734ed3cdeb6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jan 2026 17:21:31 -0700 Subject: [PATCH 2/3] Invert usingDataSourceExec test helper to usingLegacyNativeCometScan (#3309) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With native_datafusion enabled in auto scan mode, test helpers that check for specific scan config values fail because auto resolves at plan time, not config time. Invert the logic so tests check for the legacy native_comet mode instead, which is forward-compatible with auto and any future scan implementations. - Rename usingDataSourceExec → usingLegacyNativeCometScan (inverted) - Rename usingDataSourceExecWithIncompatTypes → hasUnsignedSmallIntSafetyCheck - Update all call sites across 11 test files Co-Authored-By: Claude Opus 4.5 --- .../org/apache/comet/CometCastSuite.scala | 4 ++-- .../apache/comet/CometExpressionSuite.scala | 8 ++++---- .../apache/comet/CometFuzzAggregateSuite.scala | 18 +++++++++--------- .../org/apache/comet/CometFuzzTestSuite.scala | 14 +++++++------- .../apache/comet/CometMapExpressionSuite.scala | 4 ++-- .../comet/exec/CometColumnarShuffleSuite.scala | 2 +- .../org/apache/comet/exec/CometJoinSuite.scala | 2 +- .../comet/parquet/ParquetReadSuite.scala | 15 +++++++-------- .../org/apache/spark/sql/CometTestBase.scala | 13 ++++++------- .../sql/comet/ParquetDatetimeRebaseSuite.scala | 10 +++++----- 10 files changed, 44 insertions(+), 46 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8a68df3820..26bb810b76 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -64,7 +64,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { private val timestampPattern = "0123456789/:T" + whitespaceChars lazy val usingParquetExecWithIncompatTypes: Boolean = - usingDataSourceExecWithIncompatTypes(conf) + hasUnsignedSmallIntSafetyCheck(conf) test("all valid cast combinations covered") { val names = testNames @@ -1087,7 +1087,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { |USING parquet """.stripMargin) sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')") - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { checkSparkAnswerAndOperator( "SELECT CAST(s AS struct) AS new_struct FROM tab1") } else { diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index fe5ea77a89..1bab18a1a1 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1509,7 +1509,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("round") { // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) + assume(usingLegacyNativeCometScan) Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -1573,7 +1573,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("hex") { // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) + assume(usingLegacyNativeCometScan) Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "hex.parquet") @@ -2607,7 +2607,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("get_struct_field with DataFusion ParquetExec - read entire struct") { - assume(usingDataSourceExec(conf)) + assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled withSQLConf(CometConf.COMET_ENABLED.key -> "false") { @@ -2644,7 +2644,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("read array[int] from parquet") { - assume(usingDataSourceExec(conf)) + assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala index 19812f38ce..191ebd908a 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala @@ -29,7 +29,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT count(distinct $col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } @@ -45,7 +45,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT count(distinct $col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -57,7 +57,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } @@ -73,7 +73,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -87,7 +87,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns) { val sql = s"SELECT c1, c2, c3, count(distinct $col, c4, c5) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -99,7 +99,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns) { val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -112,7 +112,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns.drop(1)) { val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol ORDER BY $groupCol" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -126,7 +126,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " + s"GROUP BY $groupCol ORDER BY $groupCol" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -138,7 +138,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { // cannot run fully native due to HashAggregate val sql = s"SELECT min($col), max($col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 833314a5c6..02d13c841d 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -37,7 +37,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") val sql = "SELECT * FROM t1" - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { checkSparkAnswerAndOperator(sql) } else { checkSparkAnswer(sql) @@ -59,7 +59,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") val sql = "SELECT * FROM t1 LIMIT 500" - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { checkSparkAnswerAndOperator(sql) } else { checkSparkAnswer(sql) @@ -112,7 +112,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { s"alter table t2 add column col2 $defaultValueType default $defaultValueString") // Verify that our default value matches Spark's answer val sql = "select col2 from t2" - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { checkSparkAnswerAndOperator(sql) } else { checkSparkAnswer(sql) @@ -139,7 +139,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val sql = s"SELECT $col FROM t1 ORDER BY $col" // cannot run fully natively due to range partitioning and sort val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -152,7 +152,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols" // cannot run fully natively due to range partitioning and sort val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -207,7 +207,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename) val df2 = df.repartition(8, df.col("c0")).sort("c1") df2.collect() - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { val cometShuffles = collectCometShuffleExchanges(df2.queryExecution.executedPlan) val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() match { case CometConf.SCAN_NATIVE_COMET => @@ -233,7 +233,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { // cannot run fully native due to HashAggregate val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(2 == collectNativeScans(cometPlan).length) } } diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 9276a20348..ee77bb80f5 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -31,7 +31,7 @@ import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOpti class CometMapExpressionSuite extends CometTestBase { test("read map[int, int] from parquet") { - assume(usingDataSourceExec(conf)) + assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled @@ -63,7 +63,7 @@ class CometMapExpressionSuite extends CometTestBase { // repro for https://github.com/apache/datafusion-comet/issues/1754 test("read map[struct, struct] from parquet") { - assume(usingDataSourceExec(conf)) + assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 70479f0e34..ed204ef776 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -618,7 +618,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar // TODO: revisit this when we have resolution of https://github.com/apache/arrow-rs/issues/7040 // and https://github.com/apache/arrow-rs/issues/7097 val fieldsToTest = - if (usingDataSourceExec(conf)) { + if (!usingLegacyNativeCometScan(conf)) { Seq( $"_1", $"_4", diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index d47b4e0c1a..6111b9c0d4 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -199,7 +199,7 @@ class CometJoinSuite extends CometTestBase { test("HashJoin struct key") { // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) + assume(usingLegacyNativeCometScan) withSQLConf( "spark.sql.join.forceApplyShuffledHashJoin" -> "true", SQLConf.PREFER_SORTMERGEJOIN.key -> "false", diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index a05bb7c390..69d67d4066 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -92,7 +92,7 @@ abstract class ParquetReadSuite extends CometTestBase { // for native iceberg compat, CometScanExec supports some types that native_comet does not. // note that native_datafusion does not use CometScanExec so we need not include that in // the check - val isDataFusionScan = usingDataSourceExec(conf) + val isDataFusionScan = !usingLegacyNativeCometScan(conf) Seq( NullType -> false, BooleanType -> true, @@ -143,7 +143,7 @@ abstract class ParquetReadSuite extends CometTestBase { // Arrays support for iceberg compat native and for Parquet V1 val cometScanExecSupported = - if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) + if (!usingLegacyNativeCometScan(conf) && this.isInstanceOf[ParquetReadV1Suite]) Seq(true, true, true) else Seq(true, false, false) @@ -185,7 +185,7 @@ abstract class ParquetReadSuite extends CometTestBase { i.toDouble, DateTimeUtils.toJavaDate(i)) } - if (!usingDataSourceExecWithIncompatTypes(conf)) { + if (!hasUnsignedSmallIntSafetyCheck(conf)) { checkParquetScan(data) } checkParquetFile(data) @@ -207,7 +207,7 @@ abstract class ParquetReadSuite extends CometTestBase { i.toDouble, DateTimeUtils.toJavaDate(i)) } - if (!usingDataSourceExecWithIncompatTypes(conf)) { + if (!hasUnsignedSmallIntSafetyCheck(conf)) { checkParquetScan(data) } checkParquetFile(data) @@ -228,7 +228,7 @@ abstract class ParquetReadSuite extends CometTestBase { DateTimeUtils.toJavaDate(i)) } val filter = (row: Row) => row.getBoolean(0) - if (!usingDataSourceExecWithIncompatTypes(conf)) { + if (!hasUnsignedSmallIntSafetyCheck(conf)) { checkParquetScan(data, filter) } checkParquetFile(data, filter) @@ -1249,8 +1249,7 @@ abstract class ParquetReadSuite extends CometTestBase { withParquetDataFrame(data, schema = Some(readSchema)) { df => // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' - if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL - .get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) { + if (enableSchemaEvolution || !usingLegacyNativeCometScan(conf)) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect()) @@ -1515,7 +1514,7 @@ abstract class ParquetReadSuite extends CometTestBase { test("row group skipping doesn't overflow when reading into larger type") { // Spark 4.0 no longer fails for widening types SPARK-40876 // https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967 - assume(!isSpark40Plus && !usingDataSourceExec(conf)) + assume(!isSpark40Plus && usingLegacyNativeCometScan(conf)) withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // Reading integer 'a' as a long isn't supported. Check that an exception is raised instead diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 8a2f8af5c2..e612d72cc4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -591,7 +591,7 @@ abstract class CometTestBase } def getPrimitiveTypesParquetSchema: String = { - if (usingDataSourceExecWithIncompatTypes(conf)) { + if (hasUnsignedSmallIntSafetyCheck(conf)) { // Comet complex type reader has different behavior for uint_8, uint_16 types. // The issue stems from undefined behavior in the parquet spec and is tracked // here: https://github.com/apache/parquet-java/issues/3142 @@ -1268,14 +1268,13 @@ abstract class CometTestBase writer.close() } - def usingDataSourceExec: Boolean = usingDataSourceExec(SQLConf.get) + def usingLegacyNativeCometScan: Boolean = usingLegacyNativeCometScan(SQLConf.get) - def usingDataSourceExec(conf: SQLConf): Boolean = - Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION).contains( - CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)) + def usingLegacyNativeCometScan(conf: SQLConf): Boolean = + CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_COMET - def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = { - usingDataSourceExec(conf) && + def hasUnsignedSmallIntSafetyCheck(conf: SQLConf): Boolean = { + !usingLegacyNativeCometScan(conf) && CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala index bdb4a9d4b1..131423ddeb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala @@ -52,7 +52,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { // Parquet file written by 2.4.5 should throw exception for both Spark and Comet // For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5") && - !usingDataSourceExec(conf)) { + usingLegacyNativeCometScan(conf)) { intercept[SparkException](df.collect()) } else { checkSparkNoRebaseAnswer(df) @@ -63,7 +63,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { } test("reading ancient timestamps before 1582") { - assume(!usingDataSourceExec(conf)) + assume(usingLegacyNativeCometScan(conf)) Seq(true, false).foreach { exceptionOnRebase => withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, @@ -78,7 +78,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { // Parquet file written by 2.4.5 should throw exception for both Spark and Comet // For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5") - && !usingDataSourceExec(conf)) { + && usingLegacyNativeCometScan(conf)) { intercept[SparkException](df.collect()) } else { checkSparkNoRebaseAnswer(df) @@ -90,7 +90,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { } test("reading ancient int96 timestamps before 1582") { - assume(!usingDataSourceExec(conf)) + assume(usingLegacyNativeCometScan(conf)) Seq(true, false).foreach { exceptionOnRebase => withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, @@ -105,7 +105,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { // Parquet file written by 2.4.5 should throw exception for both Spark and Comet // For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5") - && !usingDataSourceExec(conf)) { + && usingLegacyNativeCometScan(conf)) { intercept[SparkException](df.collect()) } else { checkSparkNoRebaseAnswer(df) From 7527fa7d3db6cbc355ed1795449aeb7ca7f434fd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jan 2026 18:08:26 -0700 Subject: [PATCH 3/3] Add fallback checks in nativeDataFusionScan for unsupported features Add checks for metadata columns, Parquet field ID reads, bucketed scans, and row index generation so that auto mode falls back to native_iceberg_compat. Co-Authored-By: Claude Opus 4.5 --- .../comet/serde/operator/CometNativeScan.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index b7909b67cb..7c8d4cbf95 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -39,6 +39,7 @@ import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClas import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} +import org.apache.comet.shims.ShimFileFormat /** * Validation and serde logic for `native_datafusion` scans. @@ -77,6 +78,22 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled") } + if (scanExec.fileConstantMetadataColumns.nonEmpty) { + withInfo(scanExec, "Native DataFusion scan does not support metadata columns") + } + + if (CometParquetUtils.readFieldId(SQLConf.get)) { + withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID based reads") + } + + if (scanExec.bucketedScan) { + withInfo(scanExec, "Native DataFusion scan does not support bucketed scans") + } + + if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) { + withInfo(scanExec, "Native DataFusion scan does not support row index generation") + } + // the scan is supported if no fallback reasons were added to the node !hasExplainInfo(scanExec) }