Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ jobs:
maven_opts: "-Pspark-3.4 -Pscala-2.12"
scan_impl: "native_comet"

- name: "Spark 3.5, JDK 11, Scala 2.12"
java_version: "17"
maven_opts: "-Pspark-3.5 -Pscala-2.12"
- name: "Spark 3.5.4, JDK 11, Scala 2.12"
java_version: "11"
maven_opts: "-Pspark-3.5 -Dspark.version=3.5.4 -Pscala-2.12"
scan_impl: "native_comet"

- name: "Spark 3.5, JDK 17, Scala 2.13"
- name: "Spark 3.5.5, JDK 17, Scala 2.13"
java_version: "17"
maven_opts: "-Pspark-3.5 -Pscala-2.13"
maven_opts: "-Pspark-3.5 -Dspark.version=3.5.5 -Pscala-2.13"
scan_impl: "native_comet"

- name: "Spark 3.5, JDK 17, Scala 2.12 native_datafusion"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
matrix:
os: [ubuntu-24.04]
java-version: [11]
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.4'}, {short: '3.5', full: '3.5.5'}, {short: '3.5', full: '3.5.6'}]
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.6'}]
module:
- {name: "catalyst", args1: "catalyst/test", args2: ""}
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
Expand Down
128 changes: 9 additions & 119 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ index 56e9520fdab..917932336df 100644
spark.range(100).write.saveAsTable(s"$dbName.$table2Name")

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index a9f69ab28a1..5d9d4f2cb83 100644
index a9f69ab28a1..760ea0e9565 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -39,11 +39,12 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri
Expand Down Expand Up @@ -343,16 +343,6 @@ index a9f69ab28a1..5d9d4f2cb83 100644
}
assert(exchanges.size == 2)
}
@@ -3325,7 +3328,8 @@ class DataFrameSuite extends QueryTest
assert(df2.isLocal)
}

- test("SPARK-35886: PromotePrecision should be subexpr replaced") {
+ test("SPARK-35886: PromotePrecision should be subexpr replaced",
+ IgnoreComet("TODO: fix Comet for this test")) {
withTable("tbl") {
sql(
"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 433b4741979..07148eee480 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
Expand Down Expand Up @@ -1900,7 +1890,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 104b4e416cd..81af723b4d0 100644
index 104b4e416cd..0bd2e24e387 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1096,7 +1096,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
Expand Down Expand Up @@ -1952,17 +1942,7 @@ index 104b4e416cd..81af723b4d0 100644
}
}
}
@@ -1744,7 +1757,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

- test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
+ test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
+ IgnoreComet("IN predicate is not yet supported in Comet, see issue #36")) {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
@@ -1985,7 +1999,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -1985,7 +1998,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

Expand All @@ -1972,7 +1952,7 @@ index 104b4e416cd..81af723b4d0 100644
// block 1:
// null count min max
// page-0 0 0 99
@@ -2045,7 +2060,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -2045,7 +2059,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

Expand All @@ -1982,7 +1962,7 @@ index 104b4e416cd..81af723b4d0 100644
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
@@ -2277,7 +2293,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
@@ -2277,7 +2292,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")

Expand All @@ -1995,7 +1975,7 @@ index 104b4e416cd..81af723b4d0 100644
} else {
assert(selectedFilters.isEmpty, "There is filter pushed down")
}
@@ -2337,7 +2357,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
@@ -2337,7 +2356,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")

Expand Down Expand Up @@ -2520,94 +2500,18 @@ index 75f440caefc..36b1146bc3a 100644
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
index b597a244710..b2e8be41065 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
@@ -21,6 +21,7 @@ import java.io.File

import org.apache.commons.io.FileUtils

+import org.apache.spark.sql.IgnoreComet
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, MemoryStream}
import org.apache.spark.sql.internal.SQLConf
@@ -91,7 +92,7 @@ class FlatMapGroupsWithStateDistributionSuite extends StreamTest
}

test("SPARK-38204: flatMapGroupsWithState should require StatefulOpClusteredDistribution " +
- "from children - without initial state") {
+ "from children - without initial state", IgnoreComet("TODO: fix Comet for this test")) {
// function will return -1 on timeout and returns count of the state otherwise
val stateFunc =
(key: (String, String), values: Iterator[(String, String, Long)],
@@ -243,7 +244,8 @@ class FlatMapGroupsWithStateDistributionSuite extends StreamTest
}

test("SPARK-38204: flatMapGroupsWithState should require ClusteredDistribution " +
- "from children if the query starts from checkpoint in 3.2.x - without initial state") {
+ "from children if the query starts from checkpoint in 3.2.x - without initial state",
+ IgnoreComet("TODO: fix Comet for this test")) {
// function will return -1 on timeout and returns count of the state otherwise
val stateFunc =
(key: (String, String), values: Iterator[(String, String, Long)],
@@ -335,7 +337,8 @@ class FlatMapGroupsWithStateDistributionSuite extends StreamTest
}

test("SPARK-38204: flatMapGroupsWithState should require ClusteredDistribution " +
- "from children if the query starts from checkpoint in prior to 3.2") {
+ "from children if the query starts from checkpoint in prior to 3.2",
+ IgnoreComet("TODO: fix Comet for this test")) {
// function will return -1 on timeout and returns count of the state otherwise
val stateFunc =
(key: (String, String), values: Iterator[(String, String, Long)],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 6aa7d0945c7..38523536154 100644
index 6aa7d0945c7..ad26ad833e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.exceptions.TestFailedException

import org.apache.spark.SparkException
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
-import org.apache.spark.sql.{DataFrame, Encoder}
+import org.apache.spark.sql.{DataFrame, Encoder, IgnoreCometSuite}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState
@@ -46,8 +46,9 @@ case class RunningCount(count: Long)
@@ -46,6 +46,7 @@ case class RunningCount(count: Long)

case class Result(key: Long, count: Int)

+// TODO: fix Comet to enable this suite
@SlowSQLTest
-class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
+class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with IgnoreCometSuite {

import testImplicits._

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
index 2a2a83d35e1..e3b7b290b3e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.streaming

import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Dataset, KeyValueGroupedDataset}
+import org.apache.spark.sql.{AnalysisException, Dataset, IgnoreComet, KeyValueGroupedDataset}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper
@@ -253,7 +253,8 @@ class FlatMapGroupsWithStateWithInitialStateSuite extends StateStoreMetricsTest
assert(e.message.contains(expectedError))
}
class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {

- test("flatMapGroupsWithState - initial state - initial state has flatMapGroupsWithState") {
+ test("flatMapGroupsWithState - initial state - initial state has flatMapGroupsWithState",
+ IgnoreComet("TODO: fix Comet for this test")) {
val initialStateDS = Seq(("keyInStateAndData", new RunningCount(1))).toDS()
val initialState: KeyValueGroupedDataset[String, RunningCount] =
initialStateDS.groupByKey(_._1).mapValues(_._2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index ef5b8a769fe..84fe1bfabc9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
Expand Down Expand Up @@ -2912,20 +2816,6 @@ index 52abd248f3a..7a199931a08 100644
case h: HiveTableScanExec => h.partitionPruningPred.collect {
case d: DynamicPruningExpression => d.child
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 1966e1e64fd..cde97a0aafe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -656,7 +656,8 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
Row(3, 4, 4, 3, null) :: Nil)
}

- test("single distinct multiple columns set") {
+ test("single distinct multiple columns set",
+ IgnoreComet("TODO: fix Comet for this test")) {
checkAnswer(
spark.sql(
"""
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 07361cfdce9..b4d53dbe900 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
Expand Down
Loading
Loading