From d5e06591c7e199b744c2435d93644d628abd2200 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 26 Apr 2026 07:35:23 -0600 Subject: [PATCH] fix: enable Spark 4 SQL tests previously ignored for issues #3313 and #3314 Issue 3313 was already resolved by recent non-AQE DPP work (#4011 and #4053). The test "Subquery reuse across the whole plan" now passes. Issue 3314 covered three tests. Two of them (ignoreMissingFiles parquet, alter temporary view) now pass because Spark 4's ShimSparkErrorConverter translates native FileNotFound into the expected FAILED_READ_FILE.FILE_NOT_EXIST. The third test (ParquetV1QuerySuite "Enabling/disabling ignoreCorruptFiles") still failed because CometExecIterator wraps native Parquet errors using _LEGACY_ERROR_TEMP_2254 with a "message" parameter, but Spark 4 strict- checks that error class has no placeholders and raises INTERNAL_ERROR during construction, masking the underlying "is not a Parquet file" cause that the test asserts on. Drop the message parameter so the SparkException can be constructed, allowing the cause-chain to surface as expected. Regenerate dev/diffs/4.0.1.diff to remove the four IgnoreCometNativeDataFusion tags. --- dev/diffs/4.0.1.diff | 74 +++++-------------- .../org/apache/comet/CometExecIterator.scala | 4 +- 2 files changed, 22 insertions(+), 56 deletions(-) diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 0da594b58d..700b7aa6ab 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -535,7 +535,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..5a1fe7017c3 100644 +index 2c24cc7d570..8c214e7d05c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -605,15 +605,7 @@ index 2c24cc7d570..5a1fe7017c3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1330,6 +1347,7 @@ abstract class DynamicPartitionPruningSuiteBase - } - - test("Subquery reuse across the whole plan", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"), - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", - SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", -@@ -1424,7 +1442,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1424,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -623,7 +615,7 @@ index 2c24cc7d570..5a1fe7017c3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1578,6 +1597,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1578,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) { case s: SubqueryBroadcastExec => s @@ -631,7 +623,7 @@ index 2c24cc7d570..5a1fe7017c3 100644 } assert(subqueryBroadcastExecs.size === 1) subqueryBroadcastExecs.foreach { subqueryBroadcastExec => -@@ -1730,6 +1750,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -668,7 +660,7 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..a046f1ed1ca 100644 +index 9c529d14221..ab2850b5d68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha @@ -680,20 +672,16 @@ index 9c529d14221..a046f1ed1ca 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -203,7 +205,7 @@ class FileBasedDataSourceSuite extends QueryTest } allFileBasedDataSources.foreach { format => - testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") { -+ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") { -+ Seq(IgnoreCometNativeDataFusion( -+ "https://github.com/apache/datafusion-comet/issues/3314")) -+ } else Seq.empty -+ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly { ++ test(s"Enabling/disabling ignoreMissingFiles using $format") { quietly { def testIgnoreMissingFiles(options: Map[String, String]): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath -@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -263,7 +265,7 @@ class FileBasedDataSourceSuite extends QueryTest } } } @@ -702,7 +690,7 @@ index 9c529d14221..a046f1ed1ca 100644 } Seq("json", "orc").foreach { format => -@@ -668,18 +674,25 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -668,18 +670,25 @@ class FileBasedDataSourceSuite extends QueryTest checkAnswer(sql(s"select A from $tableName"), data.select("A")) // RuntimeException is triggered at executor side, which is then wrapped as @@ -735,7 +723,7 @@ index 9c529d14221..a046f1ed1ca 100644 condition = "_LEGACY_ERROR_TEMP_2093", parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") ) -@@ -967,6 +980,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -967,6 +976,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -743,7 +731,7 @@ index 9c529d14221..a046f1ed1ca 100644 } assert(smJoinExec.nonEmpty) } -@@ -1027,6 +1041,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1027,6 +1037,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -751,7 +739,7 @@ index 9c529d14221..a046f1ed1ca 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1068,6 +1083,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1068,6 +1079,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -759,7 +747,7 @@ index 9c529d14221..a046f1ed1ca 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1252,6 +1268,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1252,6 +1264,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -1817,20 +1805,6 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -index 77a988f340e..263208a67d9 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { - } - } - -- test("alter temporary view should follow current storeAnalyzedPlanForView config") { -+ test("alter temporary view should follow current storeAnalyzedPlanForView config", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { - withTable("t") { - Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") - withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index aed11badb71..1a365b5aacf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -2814,7 +2788,7 @@ index 4474ec1fd42..05fa0257c82 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..faee9b4ce83 100644 +index bba71f1c48d..35247c13ad9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2835,17 +2809,7 @@ index bba71f1c48d..faee9b4ce83 100644 val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("Enabling/disabling ignoreCorruptFiles") { -+ test("Enabling/disabling ignoreCorruptFiles", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { - def testIgnoreCorruptFiles(options: Map[String, String]): Unit = { - withTempDir { dir => - val basePath = dir.getCanonicalPath -@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2858,7 +2822,7 @@ index bba71f1c48d..faee9b4ce83 100644 } } } -@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -2868,7 +2832,7 @@ index bba71f1c48d..faee9b4ce83 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2878,7 +2842,7 @@ index bba71f1c48d..faee9b4ce83 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2888,7 +2852,7 @@ index bba71f1c48d..faee9b4ce83 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index c7a0e33c4b..a93564811c 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -180,9 +180,11 @@ class CometExecIterator( case parquetError() => // See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError // See org.apache.parquet.hadoop.ParquetFileReader for error message. + // _LEGACY_ERROR_TEMP_2254 has no message placeholders; Spark 4 strict-checks + // parameters and raises INTERNAL_ERROR if any are passed. throw new SparkException( errorClass = "_LEGACY_ERROR_TEMP_2254", - messageParameters = Map("message" -> e.getMessage), + messageParameters = Map.empty, cause = new SparkException("File is not a Parquet file.", e)) case _ => throw e