diff --git a/.github/actions/setup-spark-builder/action.yaml b/.github/actions/setup-spark-builder/action.yaml
index b0bae362e9..8f2882efe4 100644
--- a/.github/actions/setup-spark-builder/action.yaml
+++ b/.github/actions/setup-spark-builder/action.yaml
@@ -67,3 +67,23 @@ runs:
run: |
# Native library should already be in native/target/release/
./mvnw install -Prelease -DskipTests -Pspark-${{inputs.spark-short-version}}
+
+ - name: Purge partial Maven cache entries
+ shell: bash
+ run: |
+ # Comet's Maven phase resolves the dependency graph and downloads POMs
+ # for transitive artifacts whose JARs it never actually needs. When sbt
+ # then resolves Spark's deps, Coursier sees the POM in mavenLocal,
+ # declares the artifact "found locally", and fails on the missing JAR
+ # without falling back to Maven Central. Delete those partial entries
+ # so sbt re-fetches the full artifact remotely.
+ for repo in "$HOME/.m2/repository" /root/.m2/repository; do
+ [ -d "$repo" ] || continue
+ find "$repo" -name '*.pom' | while read -r pom; do
+ jar="${pom%.pom}.jar"
+ [ -f "$jar" ] && continue
+ grep -q 'jar\|bundle' "$pom" 2>/dev/null || continue
+ rm -f "$pom" "${pom}.sha1" "${pom%.pom}.pom.lastUpdated" \
+ "$(dirname "$pom")/_remote.repositories"
+ done
+ done
diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml
index 569a4a3ab9..980629174f 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -126,7 +126,6 @@ jobs:
needs: build-native
strategy:
matrix:
- os: [ubuntu-24.04]
module:
- {name: "catalyst", args1: "catalyst/test", args2: ""}
- {name: "sql_core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
@@ -142,12 +141,17 @@ jobs:
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto'}
- {spark-short: '4.0', spark-full: '4.0.2', java: 17, scan-impl: 'auto'}
- {spark-short: '4.0', spark-full: '4.0.2', java: 21, scan-impl: 'auto'}
+ - {spark-short: '4.1', spark-full: '4.1.1', java: 17, scan-impl: 'auto'}
fail-fast: false
name: spark-sql-${{ matrix.config.scan-impl }}-${{ matrix.module.name }}/spark-${{ matrix.config.spark-full }}
- runs-on: ${{ matrix.os }}
+ # Hive tests stay on the standard GitHub-hosted runner: HiveSparkSubmitSuite
+ # relies on an Ivy 'local-m2-cache' resolver that the runs-on.com
+ # ubuntu24-full-x64 image does not provide, so spark-submit fails there.
+ runs-on: ${{ startsWith(matrix.module.name, 'sql_hive') && 'ubuntu-24.04' || (github.repository_owner == 'apache' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion-comet', github.run_id) || 'ubuntu-latest') }}
container:
image: amd64/rust
steps:
+ - uses: runs-on/action@742bf56072eb4845a0f94b3394673e4903c90ff0 # v2.1.0
- uses: actions/checkout@v6
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
@@ -170,7 +174,7 @@ jobs:
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
NOLINT_ON_COMPILE=true ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=${{ matrix.config.scan-impl }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
- build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
+ build/sbt -Dsbt.log.noformat=true -mem 6144 ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log
fi
diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff
new file mode 100644
index 0000000000..8c2949c293
--- /dev/null
+++ b/dev/diffs/4.1.1.diff
@@ -0,0 +1,4326 @@
+diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+index 6df8bc85b51..dabb75e2b75 100644
+--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
++++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+@@ -268,6 +268,11 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
+ }
+
+ test("Upload from all decommissioned executors") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle storage")
+ sc = new SparkContext(getSparkConf(2, 2))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+@@ -298,6 +303,11 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
+ }
+
+ test("Upload multi stages") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle storage")
+ sc = new SparkContext(getSparkConf())
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+@@ -332,6 +342,11 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
+
+ CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
+ test(s"$codec - Newly added executors should access old data from remote storage") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle storage")
+ sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, codec))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+diff --git a/pom.xml b/pom.xml
+index dc757d78812..18841e95f3d 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -152,6 +152,8 @@
+ 4.0.3
+ 2.5.3
+ 2.0.8
++ 4.1
++ 0.16.0-SNAPSHOT
+
+
+ org.apache.datasketches
+diff --git a/sql/core/pom.xml b/sql/core/pom.xml
+index d2d07a08aa9..d89f80e5b68 100644
+--- a/sql/core/pom.xml
++++ b/sql/core/pom.xml
+@@ -97,6 +97,10 @@
+ org.apache.spark
+ spark-tags_${scala.binary.version}
+
++
++ org.apache.datafusion
++ comet-spark-spark${spark.version.short}_${scala.binary.version}
++
+
+