Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ jobs:
- name: "Spark 4.0, JDK 17"
java_version: "17"
maven_opts: "-Pspark-4.0"
# Spark 4.1 is intentionally absent: the lint job invokes -Psemanticdb,
# but semanticdb-scalac_2.13.17 is not yet published, so we cannot
# currently run scalafix against the spark-4.1 profile.
fail-fast: false
steps:
- uses: runs-on/action@742bf56072eb4845a0f94b3394673e4903c90ff0 # v2.1.0
Expand Down Expand Up @@ -289,6 +292,11 @@ jobs:
java_version: "17"
maven_opts: "-Pspark-4.0"
scan_impl: "auto"

- name: "Spark 4.1, JDK 17"
java_version: "17"
maven_opts: "-Pspark-4.1"
scan_impl: "auto"
suite:
- name: "fuzz"
value: |
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ jobs:
java_version: "17"
maven_opts: "-Pspark-4.0 -Pscala-2.13"

- name: "Spark 4.1, JDK 17, Scala 2.13"
java_version: "17"
# The spark-4.1 profile already pins Scala to 2.13.17 to match Spark 4.1.1's
# runtime; the scala-2.13 profile would override it back to 2.13.16 and break.
maven_opts: "-Pspark-4.1"

suite:
- name: "fuzz"
value: |
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ object CometConf extends ShimCometConf {
s"format when `${COMET_SPARK_TO_ARROW_ENABLED.key}` is true.")
.stringConf
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan,RDDScan"))
.createWithDefault(Seq("Range,InMemoryTableScan,RDDScan,OneRowRelation"))

val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.caseConversion.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
// fields plus per-field VariantMetadata, expecting the scan to honor Parquet variant
// shredding semantics. Comet's native scans don't, so fall back to Spark.
fallbackReasons +=
s"$scanImpl scan does not support shredded Variant reads (column $name)"
s"Unsupported $name of type VariantType (shredded; not supported by $scanImpl scan)"
false
case s: StructType if s.fields.isEmpty =>
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.TaskContext
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
Expand Down Expand Up @@ -177,7 +177,9 @@ case class CometNativeWriteExec(
committer.setupTask(taskContext)

// Get the work directory for temp files
val workPath = committer.newTaskTempFile(taskContext, None, "")
// Spark 4.1 made the (taskContext, dir, ext: String) overload throw by default;
// the FileNameSpec overload is the supported one and exists in 3.4+.
val workPath = committer.newTaskTempFile(taskContext, None, FileNameSpec("", ""))
val workDir = new Path(workPath).getParent.toString

(Some(workDir), Some((committer, taskContext)), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ query expect_error(DIVIDE_BY_ZERO)
SELECT a div b FROM ansi_div_zero

-- column % 0 (remainder) should throw
query expect_error(DIVIDE_BY_ZERO)
-- Spark 4.0 raises DIVIDE_BY_ZERO; Spark 4.1 raises REMAINDER_BY_ZERO. Match the common substring.
query expect_error(BY_ZERO)
SELECT a % b FROM ansi_div_zero

-- literal / 0 should throw
Expand All @@ -127,7 +128,7 @@ query expect_error(DIVIDE_BY_ZERO)
SELECT 1 div 0

-- literal % 0 should throw
query expect_error(DIVIDE_BY_ZERO)
query expect_error(BY_ZERO)
SELECT 1 % 0

-- ============================================================================
Expand All @@ -143,7 +144,7 @@ query expect_error(DIVIDE_BY_ZERO)
SELECT c div d FROM ansi_div_zero

-- long column % 0 should throw
query expect_error(DIVIDE_BY_ZERO)
query expect_error(BY_ZERO)
SELECT c % d FROM ansi_div_zero

-- ============================================================================
Expand Down
10 changes: 5 additions & 5 deletions spark/src/test/resources/sql-tests/expressions/misc/variant.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ INSERT INTO test_variant VALUES
(3, parse_json('null')),
(4, NULL)

query expect_fallback(Unsupported v of type VariantType)
query expect_fallback(type VariantType)
SELECT id, v FROM test_variant ORDER BY id

query expect_fallback(Unsupported v of type VariantType)
query expect_fallback(type VariantType)
SELECT variant_get(v, '$.a', 'int') AS a FROM test_variant ORDER BY id

query expect_fallback(Unsupported v of type VariantType)
query expect_fallback(type VariantType)
SELECT id FROM test_variant WHERE variant_get(v, '$.a', 'int') = 1

query expect_fallback(Unsupported v of type VariantType)
query expect_fallback(type VariantType)
SELECT COUNT(*) FROM test_variant WHERE v IS NOT NULL

statement
Expand All @@ -51,5 +51,5 @@ INSERT INTO test_variant_struct VALUES
(1, named_struct('v', parse_json('{"x": 10}'))),
(2, named_struct('v', parse_json('{"x": 20}')))

query expect_fallback(Unsupported v of type VariantType)
query expect_fallback(type VariantType)
SELECT id, s FROM test_variant_struct ORDER BY id
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometProject
+- CometBroadcastHashJoin
:- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
: : +- CometBroadcastHashJoin
: : :- CometFilter
: : : +- CometHashAggregate
: : : +- CometExchange
: : : +- CometHashAggregate
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.store_returns
: : : : +- CometSubqueryBroadcast
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- CometBroadcastExchange
: : +- CometFilter
: : +- CometHashAggregate
: : +- CometExchange
: : +- CometHashAggregate
: : +- CometHashAggregate
: : +- CometExchange
: : +- CometHashAggregate
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.store_returns
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- CometBroadcastExchange
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.store
+- CometBroadcastExchange
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer

Comet accelerated 47 out of 49 eligible operators (95%). Final plan contains 1 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometProject
+- CometBroadcastHashJoin
:- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
: : +- CometBroadcastHashJoin
: : :- CometFilter
: : : +- CometHashAggregate
: : : +- CometExchange
: : : +- CometHashAggregate
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometFilter
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_returns
: : : : +- SubqueryBroadcast
: : : : +- BroadcastExchange
: : : : +- CometNativeColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : +- CometBroadcastExchange
: : +- CometFilter
: : +- CometHashAggregate
: : +- CometExchange
: : +- CometHashAggregate
: : +- CometHashAggregate
: : +- CometExchange
: : +- CometHashAggregate
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometFilter
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_returns
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: +- CometBroadcastExchange
: +- CometProject
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store
+- CometBroadcastExchange
+- CometProject
+- CometFilter
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer

Comet accelerated 46 out of 49 eligible operators (93%). Final plan contains 2 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
TakeOrderedAndProject
+- HashAggregate
+- Exchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- Project
: +- BroadcastHashJoin
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometBroadcastHashJoin
: : : : :- CometNativeScan parquet spark_catalog.default.store_sales
: : : : : +- CometSubqueryBroadcast
: : : : : +- CometBroadcastExchange
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometNativeScan parquet spark_catalog.default.web_sales
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometNativeScan parquet spark_catalog.default.catalog_sales
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer_demographics

Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 5 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
TakeOrderedAndProject
+- HashAggregate
+- Exchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- Project
: +- BroadcastHashJoin
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometBroadcastHashJoin
: : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales
: : : : : +- SubqueryBroadcast
: : : : : +- BroadcastExchange
: : : : : +- CometNativeColumnarToRow
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics

Comet accelerated 34 out of 54 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet.
Loading
Loading