diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 0da594b58d..700b7aa6ab 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -535,7 +535,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..5a1fe7017c3 100644 +index 2c24cc7d570..8c214e7d05c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -605,15 +605,7 @@ index 2c24cc7d570..5a1fe7017c3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1330,6 +1347,7 @@ abstract class DynamicPartitionPruningSuiteBase - } - - test("Subquery reuse across the whole plan", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"), - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", - SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", -@@ -1424,7 +1442,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1424,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -623,7 +615,7 @@ index 2c24cc7d570..5a1fe7017c3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1578,6 +1597,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1578,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) { case s: SubqueryBroadcastExec => s @@ -631,7 +623,7 @@ index 2c24cc7d570..5a1fe7017c3 100644 } assert(subqueryBroadcastExecs.size === 1) subqueryBroadcastExecs.foreach { subqueryBroadcastExec => -@@ -1730,6 +1750,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -668,7 +660,7 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..a046f1ed1ca 100644 +index 9c529d14221..ab2850b5d68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha @@ -680,20 +672,16 @@ index 9c529d14221..a046f1ed1ca 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -203,7 +205,7 @@ class FileBasedDataSourceSuite extends QueryTest } allFileBasedDataSources.foreach { format => - testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") { -+ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") { -+ Seq(IgnoreCometNativeDataFusion( -+ "https://github.com/apache/datafusion-comet/issues/3314")) -+ } else Seq.empty -+ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly { ++ test(s"Enabling/disabling ignoreMissingFiles using $format") { quietly { def testIgnoreMissingFiles(options: Map[String, String]): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath -@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -263,7 +265,7 @@ class FileBasedDataSourceSuite extends QueryTest } } } @@ -702,7 +690,7 @@ index 9c529d14221..a046f1ed1ca 100644 } Seq("json", "orc").foreach { format => -@@ -668,18 +674,25 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -668,18 +670,25 @@ class FileBasedDataSourceSuite extends QueryTest checkAnswer(sql(s"select A from $tableName"), data.select("A")) // RuntimeException is triggered at executor side, which is then wrapped as @@ -735,7 +723,7 @@ index 9c529d14221..a046f1ed1ca 100644 condition = "_LEGACY_ERROR_TEMP_2093", parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") ) -@@ -967,6 +980,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -967,6 +976,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -743,7 +731,7 @@ index 9c529d14221..a046f1ed1ca 100644 } assert(smJoinExec.nonEmpty) } -@@ -1027,6 +1041,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1027,6 +1037,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -751,7 +739,7 @@ index 9c529d14221..a046f1ed1ca 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1068,6 +1083,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1068,6 +1079,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -759,7 +747,7 @@ index 9c529d14221..a046f1ed1ca 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1252,6 +1268,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1252,6 +1264,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -1817,20 +1805,6 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -index 77a988f340e..263208a67d9 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { - } - } - -- test("alter temporary view should follow current storeAnalyzedPlanForView config") { -+ test("alter temporary view should follow current storeAnalyzedPlanForView config", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { - withTable("t") { - Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") - withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index aed11badb71..1a365b5aacf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -2814,7 +2788,7 @@ index 4474ec1fd42..05fa0257c82 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..faee9b4ce83 100644 +index bba71f1c48d..35247c13ad9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2835,17 +2809,7 @@ index bba71f1c48d..faee9b4ce83 100644 val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("Enabling/disabling ignoreCorruptFiles") { -+ test("Enabling/disabling ignoreCorruptFiles", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { - def testIgnoreCorruptFiles(options: Map[String, String]): Unit = { - withTempDir { dir => - val basePath = dir.getCanonicalPath -@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2858,7 +2822,7 @@ index bba71f1c48d..faee9b4ce83 100644 } } } -@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -2868,7 +2832,7 @@ index bba71f1c48d..faee9b4ce83 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2878,7 +2842,7 @@ index bba71f1c48d..faee9b4ce83 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2888,7 +2852,7 @@ index bba71f1c48d..faee9b4ce83 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index c7a0e33c4b..a93564811c 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -180,9 +180,11 @@ class CometExecIterator( case parquetError() => // See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError // See org.apache.parquet.hadoop.ParquetFileReader for error message. + // _LEGACY_ERROR_TEMP_2254 has no message placeholders; Spark 4 strict-checks + // parameters and raises INTERNAL_ERROR if any are passed. throw new SparkException( errorClass = "_LEGACY_ERROR_TEMP_2254", - messageParameters = Map("message" -> e.getMessage), + messageParameters = Map.empty, cause = new SparkException("File is not a Parquet file.", e)) case _ => throw e