Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
806a06c
Remove auto scan fallback for Spark 4.0.0
andygrove Jul 1, 2025
d97cbb2
format
andygrove Jul 1, 2025
0f3b37c
fix
andygrove Jul 2, 2025
4ce9e35
fix
andygrove Jul 2, 2025
ec029c5
update diff
andygrove Jul 2, 2025
31d7078
Merge remote-tracking branch 'apache/main' into auto-scan-4.0.0
andygrove Jul 2, 2025
161ce4f
fix diff
andygrove Jul 2, 2025
7a5cb5f
fix diff
andygrove Jul 2, 2025
9d8bd5f
fix
andygrove Jul 2, 2025
fabf461
Merge remote-tracking branch 'apache/main' into auto-scan-4.0.0
andygrove Jul 8, 2025
395eebd
improve error message
andygrove Jul 8, 2025
c4cf271
fix
andygrove Jul 9, 2025
fd4cdb6
fix
andygrove Jul 9, 2025
398d49d
upmerge
andygrove Aug 19, 2025
3c6db94
format
andygrove Aug 19, 2025
cd95aed
possible fix
andygrove Aug 20, 2025
a78874b
fix build?
andygrove Aug 20, 2025
dbc3df5
format
andygrove Aug 20, 2025
d7f8909
debug
andygrove Aug 20, 2025
f8b9619
fix?
andygrove Aug 20, 2025
47ae97e
fix?
andygrove Aug 20, 2025
d641025
Merge branch 'fix-build-tpcds' into auto-scan-4.0.0
andygrove Aug 20, 2025
75c1bd5
upmerge
andygrove Aug 20, 2025
466b452
update Spark CollationSuite to expect Comet plans
andygrove Aug 21, 2025
24b216f
update Spark CollationSuite to expect Comet plans
andygrove Aug 21, 2025
e14f6be
fix one regression
andygrove Aug 21, 2025
b061418
ignore tests in VariantShreddingSuite
andygrove Aug 21, 2025
fd395db
ignore test that uses non utf-8 strings
andygrove Aug 21, 2025
84a71f2
Merge remote-tracking branch 'apache/main' into auto-scan-4.0.0
andygrove Aug 25, 2025
a2a862e
add comments
andygrove Aug 25, 2025
df9d42f
use DataTypeSupport.isComplexType
andygrove Aug 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}

import org.apache.comet.shims.CometTypeShim
import org.apache.comet.vector.CometVector

object Utils {
object Utils extends CometTypeShim {
def getConfPath(confFileName: String): String = {
sys.env
.get("COMET_CONF_DIR")
Expand Down Expand Up @@ -124,7 +125,8 @@ object Utils {
case LongType => new ArrowType.Int(8 * 8, true)
case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
case StringType => ArrowType.Utf8.INSTANCE
case _: StringType => ArrowType.Utf8.INSTANCE
case dt if isStringCollationType(dt) => ArrowType.Utf8.INSTANCE
case BinaryType => ArrowType.Binary.INSTANCE
case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale, 128)
case DateType => new ArrowType.Date(DateUnit.DAY)
Expand All @@ -138,7 +140,8 @@ object Utils {
case TimestampNTZType =>
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
case _ =>
throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}")
throw new UnsupportedOperationException(
s"Unsupported data type: [${dt.getClass.getName}] ${dt.catalogString}")
}

/** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.comet.shims

import org.apache.spark.sql.types.DataType

trait CometTypeShim {
def isStringCollationType(dt: DataType): Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.comet.shims

import org.apache.spark.sql.internal.types.StringTypeWithCollation
import org.apache.spark.sql.types.DataType

trait CometTypeShim {
def isStringCollationType(dt: DataType): Boolean = dt.isInstanceOf[StringTypeWithCollation]
}
134 changes: 116 additions & 18 deletions dev/diffs/4.0.0.diff
Original file line number Diff line number Diff line change
Expand Up @@ -700,10 +700,10 @@ index 9c529d14221..069b7c5adeb 100644
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
new file mode 100644
index 00000000000..5eb3fa17ca8
index 00000000000..5691536c114
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
@@ -0,0 +1,43 @@
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -732,6 +732,8 @@ index 00000000000..5eb3fa17ca8
+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`).
+ */
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
+
+/**
Expand Down Expand Up @@ -1277,11 +1279,26 @@ index 2e33f6505ab..e1e93ab3bad 100644
}

withTable("t1", "t2") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
index fee375db10a..8c2c24e2c5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._
import org.apache.spark.types.variant._
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}

-class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest {
+class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest
+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed
+ with IgnoreCometSuite {
def parseJson(s: String): VariantVal = {
val v = VariantBuilder.parseJson(s, false)
new VariantVal(v.getValue, v.getMetadata)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
index 11e9547dfc5..df5678c8d82 100644
index 11e9547dfc5..be9ae40ab3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.collation
@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation
import scala.jdk.CollectionConverters.MapHasAsJava

import org.apache.spark.SparkException
Expand All @@ -1290,7 +1307,21 @@ index 11e9547dfc5..df5678c8d82 100644
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.CollationFactory
@@ -1505,7 +1505,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
+import org.apache.spark.sql.comet.{CometBroadcastHashJoinExec, CometHashJoinExec, CometSortMergeJoinExec}
import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2ProviderWithCustomSchema}
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
@@ -55,7 +56,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
assert(
collectFirst(queryPlan) {
case _: SortMergeJoinExec => assert(isSortMergeForced)
+ case _: CometSortMergeJoinExec => assert(isSortMergeForced)
case _: HashJoin => assert(!isSortMergeForced)
+ case _: CometHashJoinExec | _: CometBroadcastHashJoinExec => assert(!isSortMergeForced)
}.nonEmpty
)
}
@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}
}

Expand All @@ -1301,7 +1332,23 @@ index 11e9547dfc5..df5678c8d82 100644
val t1 = "T_1"
val t2 = "T_2"

@@ -1815,7 +1817,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
} else {
assert(!collectFirst(queryPlan) {
case b: BroadcastHashJoinExec => b.leftKeys.head
+ case b: CometBroadcastHashJoinExec => b.leftKeys.head
}.head.isInstanceOf[ArrayTransform])
}
}
@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
} else {
assert(!collectFirst(queryPlan) {
case b: BroadcastHashJoinExec => b.leftKeys.head
+ case b: CometBroadcastHashJoinExec => b.leftKeys.head
}.head.isInstanceOf[ArrayTransform])
}
}
@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}
}

Expand Down Expand Up @@ -2636,10 +2683,23 @@ index 22839d3f0d2..7e66d100e90 100644
checkAnswer(
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index bba71f1c48d..4f33ce4b3f2 100644
index bba71f1c48d..38c60ee2584 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -1060,7 +1060,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
Seq(Some("A"), Some("A"), None).toDF().repartition(1)
.write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
- checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
+ // Similar to Spark's vectorized reader, Comet doesn't do row-level filtering but relies
+ // on Spark to apply the data filters after columnar batches are returned
+ if (!isCometEnabled) {
+ checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
+ }
}
}
}
@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
checkAnswer(readParquet(schema2, path), df)
}

Expand All @@ -2649,7 +2709,7 @@ index bba71f1c48d..4f33ce4b3f2 100644
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
checkAnswer(readParquet(schema1, path), df)
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
@@ -1084,7 +1085,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
df.write.parquet(path.toString)

Expand Down Expand Up @@ -3330,40 +3390,64 @@ index 86c4e49f6f6..2e639e5f38d 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index f0f3f94b811..486a436afb2 100644
index f0f3f94b811..d64e4e54e22 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
import scala.util.control.NonFatal

+import org.apache.comet.CometConf
import org.apache.hadoop.fs.Path
import org.scalactic.source.Position
import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE
@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits}
+import org.apache.spark.sql.comet._
+import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec}
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.datasources.DataSourceUtils
@@ -128,7 +129,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
}
}
} else {
- super.test(testName, testTags: _*)(testFun)
+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
+ } else {
+ super.test(testName, testTags: _*)(testFun)
+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
+ cometScanImpl == CometConf.SCAN_AUTO
+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
+ cometScanImpl == CometConf.SCAN_AUTO
+ if (isCometEnabled && isNativeIcebergCompat &&
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun)
+ } else if (isCometEnabled && isNativeDataFusion &&
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
+ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) &&
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)",
+ testTags: _*)(testFun)
+ } else {
+ super.test(testName, testTags: _*)(testFun)
+ }
+ }
}
}

@@ -248,8 +253,33 @@ private[sql] trait SQLTestUtilsBase
@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase
override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter
}

Expand Down Expand Up @@ -3397,7 +3481,7 @@ index f0f3f94b811..486a436afb2 100644
super.withSQLConf(pairs: _*)(f)
}

@@ -451,6 +481,8 @@ private[sql] trait SQLTestUtilsBase
@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
Expand Down Expand Up @@ -3509,6 +3593,20 @@ index 4b27082e188..09f591dfed3 100644
Utils.withContextClassLoader(Utils.getSparkClassLoader) {
withUserDefinedFunction(udfInfo.funcName -> false) {
val sparkClassLoader = Thread.currentThread().getContextClassLoader
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index cc7bb193731..06555d48da7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -818,7 +818,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}

- test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT") {
+ test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT",
+ IgnoreComet("Comet does not support reading non UTF-8 strings")) {
withTable("t1", "t2") {
withTempDir { dir =>
val file = new File(dir, "test.hex")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index b67370f6eb9..746b3974b29 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
Expand Down
Loading
Loading