Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ env:
RUST_VERSION: stable

jobs:
spark-sql-native-comet:
spark-sql-auto-scan:
strategy:
matrix:
os: [ubuntu-24.04]
Expand Down Expand Up @@ -75,7 +75,46 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=true build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

spark-sql-native-native-comet:
strategy:
matrix:
os: [ ubuntu-24.04 ]
java-version: [ 11 ]
spark-version: [ { short: '3.4', full: '3.4.3' }, { short: '3.5', full: '3.5.6' } ]
module:
- { name: "catalyst", args1: "catalyst/test", args2: "" }
- { name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest }
- { name: "sql/core-2", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest" }
- { name: "sql/core-3", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest" }
- { name: "sql/hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest" }
- { name: "sql/hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest" }
- { name: "sql/hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest" }
fail-fast: false
name: spark-sql-native-comet-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.java-version }}
runs-on: ${{ matrix.os }}
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{env.RUST_VERSION}}
jdk-version: ${{ matrix.java-version }}
- name: Setup Spark
uses: ./.github/actions/setup-spark-builder
with:
spark-version: ${{ matrix.spark-version.full }}
spark-short-version: ${{ matrix.spark-version.short }}
- name: Run Spark tests
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=true COMET_PARQUET_SCAN_IMPL=native_comet build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

Expand Down
81 changes: 0 additions & 81 deletions .github/workflows/spark_sql_test_native_auto.yml

This file was deleted.

2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object CometConf extends ShimCometConf {
.checkValues(
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
.createWithDefault(sys.env
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
.toLowerCase(Locale.ROOT))

val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, DataTypeSupport}
import org.apache.comet.{CometConf, CometSparkSessionExtensions, DataTypeSupport}
import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
Expand Down Expand Up @@ -261,6 +261,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {

val fallbackReasons = new ListBuffer[String]()

if (CometSparkSessionExtensions.isSpark40Plus) {
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT is not implemented for Spark 4.0.0"
}
Comment on lines +264 to +266
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revisit this after #1830 is merged


// native_iceberg_compat only supports local filesystem and S3
if (!scanExec.relation.inputFiles
.forall(path => path.startsWith("file://") || path.startsWith("s3a://"))) {
Copy link
Copy Markdown
Contributor

@hsiang-c hsiang-c Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3AFileSystem, used by HadoopFileIO class in Iceberg, recognizes s3a scheme.

However, there is a S3FileIO Iceberg class that recognizes s3, s3a and s3n. We might have to handle more schemes in the future.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also supports HDFS if the feature is enabled

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't bother with s3:// and s3n:// urls. Those are defunct afaik.

Expand Down
12 changes: 9 additions & 3 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("get_struct_field - select primitive fields") {
val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get()
assume(!(scanImpl == CometConf.SCAN_AUTO && CometSparkSessionExtensions.isSpark40Plus))
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
Expand All @@ -2225,7 +2227,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val df = spark.read.parquet(dir.toString()).select("nested1.id")
// Comet's original scan does not support structs.
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
if (!CometConf.COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_COMET)) {
if (!scanImpl.equals(CometConf.SCAN_NATIVE_COMET)) {
checkSparkAnswerAndOperator(df)
} else {
checkSparkAnswer(df)
Expand All @@ -2234,6 +2236,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("get_struct_field - select subset of struct") {
val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get()
assume(!(scanImpl == CometConf.SCAN_AUTO && CometSparkSessionExtensions.isSpark40Plus))
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
Expand All @@ -2255,7 +2259,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val df = spark.read.parquet(dir.toString())
// Comet's original scan does not support structs.
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
if (!CometConf.COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_COMET)) {
if (scanImpl != CometConf.SCAN_NATIVE_COMET) {
checkSparkAnswerAndOperator(df.select("nested1.id"))
checkSparkAnswerAndOperator(df.select("nested1.nested2"))
checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
Expand All @@ -2270,6 +2274,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("get_struct_field - read entire struct") {
val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get()
assume(!(scanImpl == CometConf.SCAN_AUTO && CometSparkSessionExtensions.isSpark40Plus))
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
Expand All @@ -2291,7 +2297,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val df = spark.read.parquet(dir.toString()).select("nested1.id")
// Comet's original scan does not support structs.
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
if (!CometConf.COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_COMET)) {
if (scanImpl != CometConf.SCAN_NATIVE_COMET) {
checkSparkAnswerAndOperator(df)
} else {
checkSparkAnswer(df)
Expand Down
Loading