diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index d538d1161d..72c41e4f82 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2065,79 +2065,6 @@ index 07e2849ce6f..3e73645b638 100644 val extraOptions = Map[String, String]( ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala -index 5e01d3f447c..284d6657d4f 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala -@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet - import scala.collection.JavaConverters._ - - import org.apache.spark.SparkException --import org.apache.spark.sql.{QueryTest, Row} -+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.internal.SQLConf - import org.apache.spark.sql.test.SharedSparkSession - import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, MetadataBuilder, StringType, StructType} -@@ -30,7 +30,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - private def withId(id: Int): Metadata = - new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() - -- test("Parquet reads infer fields using field ids correctly") { -+ test("Parquet reads infer fields using field ids correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() -@@ -78,7 +79,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("absence of field ids") { -+ test("absence of field ids", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() -@@ -107,7 +109,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("SPARK-38094: absence of field ids: reading nested schema") { -+ test("SPARK-38094: absence of field ids: reading nested schema", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - // now with nested schema/complex type - val readSchema = -@@ -136,7 +139,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("multiple id matches") { -+ test("multiple id matches", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() -@@ -163,7 +167,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("read parquet file without ids") { -+ test("read parquet file without ids", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() -@@ -196,7 +201,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS - } - } - -- test("global read/write flag should work correctly") { -+ test("global read/write flag should work correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) { - withTempDir { dir => - val readSchema = - new StructType() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 8e88049f51e..49f2001dc6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 4be2fe5011..29555a61ef 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -201,6 +202,11 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com withInfo(scanExec, "Native DataFusion scan does not support row index generation") return None } + if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) && + ParquetUtils.hasFieldIds(scanExec.requiredSchema)) { + withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") + return None + } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None }