Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 19 additions & 55 deletions dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ index 81713c777bc..b5f92ed9742 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 2c24cc7d570..5a1fe7017c3 100644
index 2c24cc7d570..8c214e7d05c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -605,15 +605,7 @@ index 2c24cc7d570..5a1fe7017c3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1330,6 +1347,7 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("Subquery reuse across the whole plan",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"),
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
@@ -1424,7 +1442,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1424,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -623,15 +615,15 @@ index 2c24cc7d570..5a1fe7017c3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1578,6 +1597,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1578,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase

val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
case s: SubqueryBroadcastExec => s
+ case s: CometSubqueryBroadcastExec => s
}
assert(subqueryBroadcastExecs.size === 1)
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
@@ -1730,6 +1750,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1730,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down Expand Up @@ -668,7 +660,7 @@ index 9c90e0105a4..fadf2f0f698 100644

test("SPARK-35884: Explain Formatted") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 9c529d14221..a046f1ed1ca 100644
index 9c529d14221..ab2850b5d68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
Expand All @@ -680,20 +672,16 @@ index 9c529d14221..a046f1ed1ca 100644
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -203,7 +205,7 @@ class FileBasedDataSourceSuite extends QueryTest
}

allFileBasedDataSources.foreach { format =>
- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
+ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
+ Seq(IgnoreCometNativeDataFusion(
+ "https://github.com/apache/datafusion-comet/issues/3314"))
+ } else Seq.empty
+ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly {
+ test(s"Enabling/disabling ignoreMissingFiles using $format") { quietly {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -263,7 +265,7 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
}
Expand All @@ -702,7 +690,7 @@ index 9c529d14221..a046f1ed1ca 100644
}

Seq("json", "orc").foreach { format =>
@@ -668,18 +674,25 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -668,18 +670,25 @@ class FileBasedDataSourceSuite extends QueryTest
checkAnswer(sql(s"select A from $tableName"), data.select("A"))

// RuntimeException is triggered at executor side, which is then wrapped as
Expand Down Expand Up @@ -735,31 +723,31 @@ index 9c529d14221..a046f1ed1ca 100644
condition = "_LEGACY_ERROR_TEMP_2093",
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
)
@@ -967,6 +980,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -967,6 +976,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
+ case smJoin: CometSortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
@@ -1027,6 +1041,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1027,6 +1037,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
@@ -1068,6 +1083,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1068,6 +1079,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
@@ -1252,6 +1268,9 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1252,6 +1264,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
Expand Down Expand Up @@ -1817,20 +1805,6 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index 77a988f340e..263208a67d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
}
}

- test("alter temporary view should follow current storeAnalyzedPlanForView config") {
+ test("alter temporary view should follow current storeAnalyzedPlanForView config",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
withTable("t") {
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
withView("v1") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index aed11badb71..1a365b5aacf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
Expand Down Expand Up @@ -2814,7 +2788,7 @@ index 4474ec1fd42..05fa0257c82 100644
checkAnswer(
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index bba71f1c48d..faee9b4ce83 100644
index bba71f1c48d..35247c13ad9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
Expand All @@ -2835,17 +2809,7 @@ index bba71f1c48d..faee9b4ce83 100644
val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))

Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

- test("Enabling/disabling ignoreCorruptFiles") {
+ test("Enabling/disabling ignoreCorruptFiles",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
Seq(Some("A"), Some("A"), None).toDF().repartition(1)
.write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
Expand All @@ -2858,7 +2822,7 @@ index bba71f1c48d..faee9b4ce83 100644
}
}
}
@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
}

Expand All @@ -2868,7 +2832,7 @@ index bba71f1c48d..faee9b4ce83 100644
def readParquet(schema: String, path: File): DataFrame = {
spark.read.schema(schema).parquet(path.toString)
}
@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
checkAnswer(readParquet(schema2, path), df)
}

Expand All @@ -2878,7 +2842,7 @@ index bba71f1c48d..faee9b4ce83 100644
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
checkAnswer(readParquet(schema1, path), df)
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
df.write.parquet(path.toString)

Expand All @@ -2888,7 +2852,7 @@ index bba71f1c48d..faee9b4ce83 100644
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ class CometExecIterator(
case parquetError() =>
// See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
// See org.apache.parquet.hadoop.ParquetFileReader for error message.
// _LEGACY_ERROR_TEMP_2254 has no message placeholders; Spark 4 strict-checks
// parameters and raises INTERNAL_ERROR if any are passed.
throw new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2254",
messageParameters = Map("message" -> e.getMessage),
messageParameters = Map.empty,
cause = new SparkException("File is not a Parquet file.", e))
case _ =>
throw e
Expand Down
Loading