From 87ce47f9002c6a8a113fa0104f3b4081ffb053b0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 16:29:57 -0700 Subject: [PATCH 1/2] fix: fall back to Spark when Parquet field ID matching is enabled in native_datafusion When `spark.sql.parquet.fieldId.read.enabled` is true, native_datafusion reads columns by name/position rather than Parquet field IDs, producing wrong results. Detect this config and fall back to Spark. Closes #3316 Co-Authored-By: Claude Opus 4.6 --- dev/diffs/3.5.8.diff | 73 ------------------- .../apache/comet/rules/CometScanRule.scala | 4 + 2 files changed, 4 insertions(+), 73 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 10f579da64..27a3d05ab8 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2065,79 +2065,6 @@ index 07e2849ce6f..3e73645b638 100644 val extraOptions = Map[String, String]( ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala -index 5e01d3f447c..284d6657d4f 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala -@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet - import scala.collection.JavaConverters._ - - import org.apache.spark.SparkException --import org.apache.spark.sql.{QueryTest, Row} -+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.internal.SQLConf - import org.apache.spark.sql.test.SharedSparkSession - import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, MetadataBuilder, StringType, StructType} -@@ -30,7 +30,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - private def withId(id: Int): Metadata = - new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() - -- test("Parquet reads infer fields using field ids correctly") { -+ test("Parquet reads infer fields using field ids correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() -@@ -78,7 +79,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("absence of field ids") { -+ test("absence of field ids", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() -@@ -107,7 +109,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("SPARK-38094: absence of field ids: reading nested schema") { -+ test("SPARK-38094: absence of field ids: reading nested schema", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - // now with nested schema/complex type - val readSchema = -@@ -136,7 +139,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("multiple id matches") { -+ test("multiple id matches", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() -@@ -163,7 +167,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("read parquet file without ids") { -+ test("read parquet file without ids", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() -@@ -196,7 +201,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("global read/write flag should work correctly") { -+ test("global read/write flag should work correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala index c10e1799702..ba6629abfd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 4be2fe5011..70e67fb8d0 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -201,6 +201,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com withInfo(scanExec, "Native DataFusion scan does not support row index generation") return None } + if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED)) { + withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") + return None + } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None } From 6520b071ce329f77c219ab838f27cc2fd93daccd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 18:11:04 -0700 Subject: [PATCH 2/2] fix: only fall back when schema actually contains Parquet field IDs Spark's TestSQLContext overrides PARQUET_FIELD_ID_READ_ENABLED to true for all tests, so checking only the config caused every native_datafusion scan to fall back in CI. Now also check ParquetUtils.hasFieldIds() on the required schema, matching Spark's own pattern in ParquetRowConverter. Co-Authored-By: Claude Opus 4.6 --- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 70e67fb8d0..29555a61ef 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -201,7 +202,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com withInfo(scanExec, "Native DataFusion scan does not support row index generation") return None } - if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED)) { + if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) && + ParquetUtils.hasFieldIds(scanExec.requiredSchema)) { withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") return None }