Skip to content
100 changes: 72 additions & 28 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ index a6b295578d6..91acca4306f 100644

test("SPARK-35884: Explain Formatted") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 2796b1cf154..d628f44e4ee 100644
index 2796b1cf154..53dcfde932e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
Expand All @@ -534,41 +534,70 @@ index 2796b1cf154..d628f44e4ee 100644
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
@@ -499,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest
}
@@ -516,21 +517,24 @@ class FileBasedDataSourceSuite extends QueryTest
checkAnswer(sql(s"select A from $tableName"), data.select("A"))

// RuntimeException is triggered at executor side, which is then wrapped as
- // SparkException at driver side
- val e1 = intercept[SparkException] {
- sql(s"select b from $tableName").collect()
+ // SparkException at driver side. Comet native readers throw RuntimeException
+ // directly without the SparkException wrapper.
+ def getDuplicateFieldError(query: String): RuntimeException = {
+ try {
+ sql(query).collect()
+ fail("Expected an exception").asInstanceOf[RuntimeException]
+ } catch {
+ case e: SparkException =>
+ e.getCause.asInstanceOf[RuntimeException]
+ case e: RuntimeException => e
+ }
}
- assert(
- e1.getCause.isInstanceOf[RuntimeException] &&
- e1.getCause.getMessage.contains(
- """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
- val e2 = intercept[SparkException] {
- sql(s"select B from $tableName").collect()
- }
- assert(
- e2.getCause.isInstanceOf[RuntimeException] &&
- e2.getCause.getMessage.contains(
- """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
+ val e1 = getDuplicateFieldError(s"select b from $tableName")
+ assert(e1.getMessage.contains(
+ """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
+ val e2 = getDuplicateFieldError(s"select B from $tableName")
+ assert(e2.getMessage.contains(
+ """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
}

Seq("parquet", "orc").foreach { format =>
- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
@@ -815,6 +817,7 @@ class FileBasedDataSourceSuite extends QueryTest
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
@@ -815,6 +819,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
+ case smJoin: CometSortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
@@ -875,6 +878,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -875,6 +880,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
@@ -916,6 +920,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -916,6 +922,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
@@ -1100,6 +1105,9 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1100,6 +1107,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
Expand Down Expand Up @@ -2003,7 +2032,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 104b4e416cd..d865077684f 100644
index 104b4e416cd..b8af360fa14 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
Expand Down Expand Up @@ -2083,17 +2112,32 @@ index 104b4e416cd..d865077684f 100644
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
@@ -1934,7 +1950,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
@@ -1950,11 +1966,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
""".stripMargin)

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
- val e = intercept[SparkException] {
+ // Spark native readers wrap the error in SparkException.
+ // Comet native readers throw RuntimeException directly.
+ val msg = try {
sql(s"select a from $tableName where b > 0").collect()
+ fail("Expected an exception")
+ } catch {
+ case e: SparkException =>
+ assert(e.getCause.isInstanceOf[RuntimeException])
+ e.getCause.getMessage
+ case e: RuntimeException =>
+ e.getMessage
}
- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
- """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""))
+ assert(msg.contains(
+ """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""),
+ s"Unexpected error message: $msg")
}

- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
withTempPath { dir =>
val count = 10
val tableName = "spark_25207"
@@ -1985,7 +2002,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
@@ -1985,7 +2011,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

Expand All @@ -2103,7 +2147,7 @@ index 104b4e416cd..d865077684f 100644
// block 1:
// null count min max
// page-0 0 0 99
@@ -2045,7 +2063,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -2045,7 +2072,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

Expand All @@ -2113,7 +2157,7 @@ index 104b4e416cd..d865077684f 100644
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
@@ -2277,7 +2296,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
@@ -2277,7 +2305,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")

Expand All @@ -2126,7 +2170,7 @@ index 104b4e416cd..d865077684f 100644
} else {
assert(selectedFilters.isEmpty, "There is filter pushed down")
}
@@ -2337,7 +2360,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
@@ -2337,7 +2369,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")

Expand Down
85 changes: 58 additions & 27 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ index a206e97c353..fea1149b67d 100644

test("SPARK-35884: Explain Formatted") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 93275487f29..601cb6647fe 100644
index 93275487f29..77a27d1c40a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
Expand Down Expand Up @@ -522,41 +522,64 @@ index 93275487f29..601cb6647fe 100644
checkErrorMatchPVals(
exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
}

Seq("parquet", "orc").foreach { format =>
- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -656,18 +660,25 @@ class FileBasedDataSourceSuite extends QueryTest
checkAnswer(sql(s"select A from $tableName"), data.select("A"))

// RuntimeException is triggered at executor side, which is then wrapped as
- // SparkException at driver side
+ // SparkException at driver side. Comet native readers throw
+ // SparkRuntimeException directly without the SparkException wrapper.
+ def getDuplicateFieldError(query: String): SparkRuntimeException = {
+ try {
+ sql(query).collect()
+ fail("Expected an exception").asInstanceOf[SparkRuntimeException]
+ } catch {
+ case e: SparkException =>
+ e.getCause.asInstanceOf[SparkRuntimeException]
+ case e: SparkRuntimeException => e
+ }
+ }
checkError(
- exception = intercept[SparkException] {
- sql(s"select b from $tableName").collect()
- }.getCause.asInstanceOf[SparkRuntimeException],
+ exception = getDuplicateFieldError(s"select b from $tableName"),
errorClass = "_LEGACY_ERROR_TEMP_2093",
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
)
checkError(
- exception = intercept[SparkException] {
- sql(s"select B from $tableName").collect()
- }.getCause.asInstanceOf[SparkRuntimeException],
+ exception = getDuplicateFieldError(s"select B from $tableName"),
errorClass = "_LEGACY_ERROR_TEMP_2093",
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
)
@@ -955,6 +966,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
+ case smJoin: CometSortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1015,6 +1027,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1056,6 +1069,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1240,6 +1254,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
Expand Down Expand Up @@ -1959,7 +1982,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8e88049f51e..f19c12c98e6 100644
index 8e88049f51e..f9d515edee1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
Expand Down Expand Up @@ -2030,17 +2053,25 @@ index 8e88049f51e..f19c12c98e6 100644
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
@@ -1952,8 +1966,17 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
val e = intercept[SparkException] {
@@ -1949,11 +1965,24 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
""".stripMargin)

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
- val e = intercept[SparkException] {
+ // Spark native readers wrap the error in SparkException(FAILED_READ_FILE).
+ // Comet native readers throw SparkRuntimeException directly.
+ val msg = try {
sql(s"select a from $tableName where b > 0").collect()
+ fail("Expected an exception")
+ } catch {
+ case e: SparkException =>
+ assert(e.getCause.isInstanceOf[RuntimeException])
+ e.getCause.getMessage
+ case e: RuntimeException =>
+ e.getMessage
}
- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
- """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""))
+ assert(e.getCause.isInstanceOf[RuntimeException])
+ val msg = e.getCause.getMessage
+ // native_datafusion converts DataFusion's "Unable to get field named" error
+ // to _LEGACY_ERROR_TEMP_2093 but with a lowercase field name ("b" vs "B")
+ // because DataFusion resolves field names case-insensitively
+ assert(
+ msg.contains(
+ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") ||
Expand All @@ -2050,7 +2081,7 @@ index 8e88049f51e..f19c12c98e6 100644
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
@@ -1984,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -1984,7 +2013,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

Expand All @@ -2060,7 +2091,7 @@ index 8e88049f51e..f19c12c98e6 100644
// block 1:
// null count min max
// page-0 0 0 99
@@ -2044,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -2044,7 +2074,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

Expand All @@ -2070,7 +2101,7 @@ index 8e88049f51e..f19c12c98e6 100644
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
@@ -2276,7 +2301,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
@@ -2276,7 +2307,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")

Expand All @@ -2083,7 +2114,7 @@ index 8e88049f51e..f19c12c98e6 100644
} else {
assert(selectedFilters.isEmpty, "There is filter pushed down")
}
@@ -2336,7 +2365,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
@@ -2336,7 +2371,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")

Expand Down
Loading
Loading