From 806a06c0a8a479780f17bfa0ec3c536c1f088332 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 1 Jul 2025 14:46:11 -0600 Subject: [PATCH 01/25] Remove auto scan fallback for Spark 4.0.0 --- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 592069fcc6..3a51230d36 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -261,10 +261,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { val fallbackReasons = new ListBuffer[String]() - if (CometSparkSessionExtensions.isSpark40Plus) { - fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT is not implemented for Spark 4.0.0" - } - // native_iceberg_compat only supports local filesystem and S3 if (!scanExec.relation.inputFiles .forall(path => path.startsWith("file://") || path.startsWith("s3a://"))) { From d97cbb29aacd4886d861a2d8e70ac3dadadf6cf0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 1 Jul 2025 15:05:54 -0600 Subject: [PATCH 02/25] format --- spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 3a51230d36..5b2997756c 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, CometSparkSessionExtensions, DataTypeSupport} +import org.apache.comet.{CometConf, DataTypeSupport} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} From 0f3b37ccbd3f9500ee986eda3ba46c385e11a5a3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 2 Jul 2025 08:31:21 -0600 Subject: [PATCH 03/25] fix --- .../apache/comet/rules/CometScanRule.scala | 20 ++++++++------ .../apache/comet/shims/CometTypeShim.scala | 25 ++++++++++++++++++ .../apache/comet/shims/CometTypeShim.scala | 26 +++++++++++++++++++ 3 files changed, 63 insertions(+), 8 deletions(-) create mode 100644 spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala create mode 100644 spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 5b2997756c..3686d36e7a 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -38,11 +38,12 @@ import org.apache.comet.{CometConf, DataTypeSupport} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} +import org.apache.comet.shims.CometTypeShim /** * Spark physical optimizer rule for replacing Spark scans with Comet scans. */ -case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { +case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with CometTypeShim { private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get() @@ -278,21 +279,24 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { case _ => false } - def hasMapsContainingStructs(dataType: DataType): Boolean = { + def hasUnsupportedType(dataType: DataType): Boolean = { dataType match { - case s: StructType => s.exists(field => hasMapsContainingStructs(field.dataType)) - case a: ArrayType => hasMapsContainingStructs(a.elementType) - case m: MapType => isComplexType(m.keyType) || isComplexType(m.valueType) + case s: StructType => s.exists(field => hasUnsupportedType(field.dataType)) + case a: ArrayType => hasUnsupportedType(a.elementType) + case m: MapType => + // maps containing complex types are not supported + isComplexType(m.keyType) || isComplexType(m.valueType) + case dt => isStringCollationType(dt) case _ => false } } val knownIssues = - scanExec.requiredSchema.exists(field => hasMapsContainingStructs(field.dataType)) || - partitionSchema.exists(field => hasMapsContainingStructs(field.dataType)) + scanExec.requiredSchema.exists(field => hasUnsupportedType(field.dataType)) || + partitionSchema.exists(field => hasUnsupportedType(field.dataType)) if (knownIssues) { - fallbackReasons += "There are known issues with maps containing structs when using " + + fallbackReasons += "Schema contains data types that are not supported by " + s"$SCAN_NATIVE_ICEBERG_COMPAT" } diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala new file mode 100644 index 0000000000..14e0881e0e --- /dev/null +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.comet.shims + +import org.apache.spark.sql.types.DataType + +trait CometTypeShim { + def isStringCollationType(dt: DataType): Boolean = false +} \ No newline at end of file diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala new file mode 100644 index 0000000000..7baa35204f --- /dev/null +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.comet.shims + +import org.apache.spark.sql.internal.types.StringTypeWithCollation +import org.apache.spark.sql.types.DataType + +trait CometTypeShim { + def isStringCollationType(dt: DataType): Boolean = dt.instanceOf[StringTypeWithCollation] +} \ No newline at end of file From 4ce9e35d33bd1c91ff8eb3117bca2b2937b1856a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 2 Jul 2025 08:41:51 -0600 Subject: [PATCH 04/25] fix --- .../main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala index 7baa35204f..a2d64b3ea8 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala @@ -22,5 +22,5 @@ import org.apache.spark.sql.internal.types.StringTypeWithCollation import org.apache.spark.sql.types.DataType trait CometTypeShim { - def isStringCollationType(dt: DataType): Boolean = dt.instanceOf[StringTypeWithCollation] + def isStringCollationType(dt: DataType): Boolean = dt.isInstanceOf[StringTypeWithCollation] } \ No newline at end of file From ec029c547706b0566629a8f9404effd5abfe1a7f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 2 Jul 2025 10:03:22 -0600 Subject: [PATCH 05/25] update diff --- dev/diffs/4.0.0.diff | 48 +++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 96deaa789e..dfa6411ecc 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index 443d46a4302..3b8483173f1 100644 +index 443d46a4302..63ec4784625 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -3309,27 +3309,26 @@ index 86c4e49f6f6..2e639e5f38d 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index f0f3f94b811..486a436afb2 100644 +index f0f3f94b811..31cf48f19ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag} +@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._ + import scala.language.implicitConversions + import scala.util.control.NonFatal + ++import org.apache.comet.CometConf + import org.apache.hadoop.fs.Path + import org.scalactic.source.Position + import org.scalatest.{BeforeAndAfterAll, Suite, Tag} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE -@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase - import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - import org.apache.spark.sql.catalyst.util._ - import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits} -+import org.apache.spark.sql.comet._ - import org.apache.spark.sql.execution.FilterExec - import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution - import org.apache.spark.sql.execution.datasources.DataSourceUtils -@@ -128,7 +129,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with +@@ -128,7 +129,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with } } } else { @@ -3337,12 +3336,29 @@ index f0f3f94b811..486a436afb2 100644 + if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { + ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) + } else { -+ super.test(testName, testTags: _*)(testFun) ++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ++ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || ++ cometScanImpl == CometConf.SCAN_AUTO ++ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || ++ cometScanImpl == CometConf.SCAN_AUTO ++ if (isCometEnabled && isNativeIcebergCompat && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { ++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) ++ } else if (isCometEnabled && isNativeDataFusion && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) ++ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", ++ testTags: _*)(testFun) ++ } else { ++ super.test(testName, testTags: _*)(testFun) ++ } + } } } -@@ -248,8 +253,33 @@ private[sql] trait SQLTestUtilsBase +@@ -248,8 +270,33 @@ private[sql] trait SQLTestUtilsBase override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter } @@ -3376,7 +3392,7 @@ index f0f3f94b811..486a436afb2 100644 super.withSQLConf(pairs: _*)(f) } -@@ -451,6 +481,8 @@ private[sql] trait SQLTestUtilsBase +@@ -451,6 +498,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child From 161ce4f1e2c67a6d25cc2ecab067e8be3f8ee367 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 2 Jul 2025 10:49:25 -0600 Subject: [PATCH 06/25] fix diff --- dev/diffs/4.0.0.diff | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index dfa6411ecc..54b7a197d4 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -700,10 +700,10 @@ index 9c529d14221..069b7c5adeb 100644 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala new file mode 100644 -index 00000000000..5eb3fa17ca8 +index 00000000000..5691536c114 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -@@ -0,0 +1,43 @@ +@@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with @@ -732,6 +732,8 @@ index 00000000000..5eb3fa17ca8 + * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). + */ +case class IgnoreComet(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") +case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") + +/** From 7a5cb5f92dbca9aeb088f5ac34f5e74759e82a7e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 2 Jul 2025 12:06:49 -0600 Subject: [PATCH 07/25] fix diff --- dev/diffs/4.0.0.diff | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 54b7a197d4..0208c582c6 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -3311,7 +3311,7 @@ index 86c4e49f6f6..2e639e5f38d 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index f0f3f94b811..31cf48f19ba 100644 +index f0f3f94b811..d64e4e54e22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._ @@ -3330,7 +3330,15 @@ index f0f3f94b811..31cf48f19ba 100644 import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE -@@ -128,7 +129,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with +@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase + import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + import org.apache.spark.sql.catalyst.util._ + import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits} ++import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec} + import org.apache.spark.sql.execution.FilterExec + import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution + import org.apache.spark.sql.execution.datasources.DataSourceUtils +@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with } } } else { @@ -3360,7 +3368,7 @@ index f0f3f94b811..31cf48f19ba 100644 } } -@@ -248,8 +270,33 @@ private[sql] trait SQLTestUtilsBase +@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter } @@ -3394,7 +3402,7 @@ index f0f3f94b811..31cf48f19ba 100644 super.withSQLConf(pairs: _*)(f) } -@@ -451,6 +498,8 @@ private[sql] trait SQLTestUtilsBase +@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child From 9d8bd5f4f899e184248d446d35ac207eaa604117 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 2 Jul 2025 13:51:02 -0600 Subject: [PATCH 08/25] fix --- .../main/scala/org/apache/spark/sql/comet/util/Utils.scala | 6 +++++- .../spark-3.x/org/apache/comet/shims/CometTypeShim.scala | 0 .../spark-4.0/org/apache/comet/shims/CometTypeShim.scala | 0 3 files changed, 5 insertions(+), 1 deletion(-) rename {spark => common}/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala (100%) rename {spark => common}/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala (100%) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 3041ea2c80..14f4411eae 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -39,9 +39,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} +import org.apache.comet.shims.CometTypeShim import org.apache.comet.vector.CometVector -object Utils { +object Utils extends CometTypeShim { def getConfPath(confFileName: String): String = { sys.env .get("COMET_CONF_DIR") @@ -125,6 +126,9 @@ object Utils { case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) case StringType => ArrowType.Utf8.INSTANCE + case dt if isStringCollationType(dt) => + // TODO collation information is lost with this transformation + ArrowType.Utf8.INSTANCE case BinaryType => ArrowType.Binary.INSTANCE case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale, 128) case DateType => new ArrowType.Date(DateUnit.DAY) diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala similarity index 100% rename from spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala rename to common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala similarity index 100% rename from spark/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala rename to common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala From 395eebd163249e85f324febad4e87de10f9e7477 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 8 Jul 2025 16:07:20 -0600 Subject: [PATCH 09/25] improve error message --- .../src/main/scala/org/apache/spark/sql/comet/util/Utils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 14f4411eae..a023083829 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -142,7 +142,8 @@ object Utils extends CometTypeShim { case TimestampNTZType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) case _ => - throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}") + throw new UnsupportedOperationException( + s"Unsupported data type: [${dt.getClass.getName}] ${dt.catalogString}") } /** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */ From c4cf271834a389814e26ea4048cb23569f00632f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Jul 2025 13:38:31 -0600 Subject: [PATCH 10/25] fix --- .../src/main/scala/org/apache/spark/sql/comet/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index a023083829..85c1ccd1f1 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -125,7 +125,7 @@ object Utils extends CometTypeShim { case LongType => new ArrowType.Int(8 * 8, true) case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case StringType => ArrowType.Utf8.INSTANCE + case _: StringType => ArrowType.Utf8.INSTANCE case dt if isStringCollationType(dt) => // TODO collation information is lost with this transformation ArrowType.Utf8.INSTANCE From fd4cdb6944b37257b516a1e864399b9ac96c858c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Jul 2025 16:01:14 -0600 Subject: [PATCH 11/25] fix --- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 3686d36e7a..ec81cad7d2 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -287,6 +287,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com // maps containing complex types are not supported isComplexType(m.keyType) || isComplexType(m.valueType) case dt => isStringCollationType(dt) + case _: StringType => + // we only support `case object StringType` and not other instances of `class StringType` + dataType != StringType case _ => false } } From 3c6db947272387ce7a12e8375d587169c2370bf1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 19 Aug 2025 16:39:37 -0600 Subject: [PATCH 12/25] format --- spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index e42e7bf625..e6f514be60 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.types._ import org.apache.comet.{CometConf, DataTypeSupport} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos} -import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.shims.CometTypeShim From cd95aed975ad9412c75e114dbab1c7f34e8f6b9f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Aug 2025 13:26:57 -0600 Subject: [PATCH 13/25] possible fix --- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index e6f514be60..bfe02aa515 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -347,7 +347,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com } -case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport { +case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with CometTypeShim { // this class is intended to be used with a specific scan impl assert(scanImpl != CometConf.SCAN_AUTO) @@ -365,6 +365,8 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport { false case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => false + case dt => isStringCollationType(dt) + false case _ => super.isTypeSupported(dt, name, fallbackReasons) } From a78874b9ad40b139ff036c44d3021b50473fb645 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Aug 2025 13:47:51 -0600 Subject: [PATCH 14/25] fix build? --- .github/workflows/benchmark.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 30a7ea0cc0..8699a5eafe 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -42,7 +42,7 @@ env: jobs: prepare: name: Build native and prepare data - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 container: image: amd64/rust env: @@ -100,7 +100,7 @@ jobs: benchmark: name: Run TPCDSQuerySuite - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 needs: [prepare] container: image: amd64/rust From dbc3df500619c16c261d6fd12d054c6e07517168 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Aug 2025 13:50:38 -0600 Subject: [PATCH 15/25] format --- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index bfe02aa515..7ffcbbda53 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -365,7 +365,8 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C false case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => false - case dt => isStringCollationType(dt) + case dt => + isStringCollationType(dt) false case _ => super.isTypeSupported(dt, name, fallbackReasons) From d7f89092d2eef986945256251ce7ad576e2adf3a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Aug 2025 14:46:13 -0600 Subject: [PATCH 16/25] debug --- .github/workflows/benchmark.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 8699a5eafe..bf8005835e 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -42,7 +42,7 @@ env: jobs: prepare: name: Build native and prepare data - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest container: image: amd64/rust env: @@ -91,6 +91,10 @@ jobs: if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' run: | apt-get install -y yacc bison flex + gcc --version + yacc --version + bison --version + flex --version cd tpcds-kit/tools && make OS=LINUX - name: Generate TPC-DS (SF=1) table data if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' @@ -100,7 +104,7 @@ jobs: benchmark: name: Run TPCDSQuerySuite - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest needs: [prepare] container: image: amd64/rust From f8b96198c7a3882b772a85947febe2fe7d7eee58 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Aug 2025 15:09:33 -0600 Subject: [PATCH 17/25] fix? --- .github/workflows/benchmark.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index bf8005835e..189002f9bb 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -90,11 +90,9 @@ jobs: - name: Build tpcds-kit if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' run: | - apt-get install -y yacc bison flex + apt-get install -y yacc bison flex gcc-12 g++-12 + sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 120 --slave /usr/bin/g++ g++ /usr/bin/g++-12 gcc --version - yacc --version - bison --version - flex --version cd tpcds-kit/tools && make OS=LINUX - name: Generate TPC-DS (SF=1) table data if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' From 47ae97ef4391daaf3879f52222a1f98d58ce7dfc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Aug 2025 15:33:49 -0600 Subject: [PATCH 18/25] fix? --- .github/workflows/benchmark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 189002f9bb..4228be86bb 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -91,7 +91,7 @@ jobs: if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' run: | apt-get install -y yacc bison flex gcc-12 g++-12 - sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 120 --slave /usr/bin/g++ g++ /usr/bin/g++-12 + update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 120 --slave /usr/bin/g++ g++ /usr/bin/g++-12 gcc --version cd tpcds-kit/tools && make OS=LINUX - name: Generate TPC-DS (SF=1) table data From 466b452c1c07b402190b7d4954fb1bba1854d563 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Aug 2025 19:39:40 -0600 Subject: [PATCH 19/25] update Spark CollationSuite to expect Comet plans --- dev/diffs/4.0.0.diff | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 6d400ebf83..9c5a2c4aed 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -1280,10 +1280,10 @@ index 2e33f6505ab..e1e93ab3bad 100644 withTable("t1", "t2") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -index 11e9547dfc5..df5678c8d82 100644 +index 11e9547dfc5..1a21a30fcac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -@@ -20,7 +20,7 @@ package org.apache.spark.sql.collation +@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation import scala.jdk.CollectionConverters.MapHasAsJava import org.apache.spark.SparkException @@ -1292,7 +1292,21 @@ index 11e9547dfc5..df5678c8d82 100644 import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.CollationFactory -@@ -1505,7 +1505,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { ++import org.apache.spark.sql.comet.{CometBroadcastHashJoinExec, CometHashJoinExec, CometSortMergeJoinExec} + import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2ProviderWithCustomSchema} + import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable} + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper +@@ -55,7 +56,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + assert( + collectFirst(queryPlan) { + case _: SortMergeJoinExec => assert(isSortMergeForced) ++ case _: CometSortMergeJoinExec => assert(isSortMergeForced) + case _: HashJoin => assert(!isSortMergeForced) ++ case _: CometHashJoinExec | _: CometBroadcastHashJoinExec => assert(!isSortMergeForced) + }.nonEmpty + ) + } +@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } @@ -1303,7 +1317,15 @@ index 11e9547dfc5..df5678c8d82 100644 val t1 = "T_1" val t2 = "T_2" -@@ -1815,7 +1817,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { +@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + } else { + assert(!collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head ++ case b: CometBroadcastHashJoinExec => b.leftKeys.head + }.head.isInstanceOf[ArrayTransform]) + } + } +@@ -1815,7 +1821,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } From 24b216f9fb33522b96ff44b6c17364521aa34071 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Aug 2025 19:45:46 -0600 Subject: [PATCH 20/25] update Spark CollationSuite to expect Comet plans --- dev/diffs/4.0.0.diff | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 9c5a2c4aed..5357ce65b7 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -1280,7 +1280,7 @@ index 2e33f6505ab..e1e93ab3bad 100644 withTable("t1", "t2") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -index 11e9547dfc5..1a21a30fcac 100644 +index 11e9547dfc5..be9ae40ab3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.collation @@ -1325,7 +1325,15 @@ index 11e9547dfc5..1a21a30fcac 100644 }.head.isInstanceOf[ArrayTransform]) } } -@@ -1815,7 +1821,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { +@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + } else { + assert(!collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head ++ case b: CometBroadcastHashJoinExec => b.leftKeys.head + }.head.isInstanceOf[ArrayTransform]) + } + } +@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } From e14f6be056ebc92b5857ebafe24ce04f018081dc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 21 Aug 2025 08:39:26 -0600 Subject: [PATCH 21/25] fix one regression --- dev/diffs/4.0.0.diff | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 5357ce65b7..7a3f335cd6 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -2668,10 +2668,23 @@ index 22839d3f0d2..7e66d100e90 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..4f33ce4b3f2 100644 +index bba71f1c48d..38c60ee2584 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -1060,7 +1060,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + Seq(Some("A"), Some("A"), None).toDF().repartition(1) + .write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) +- checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) ++ // Similar to Spark's vectorized reader, Comet doesn't do row-level filtering but relies ++ // on Spark to apply the data filters after columnar batches are returned ++ if (!isCometEnabled) { ++ checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) ++ } + } + } + } +@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2681,7 +2694,7 @@ index bba71f1c48d..4f33ce4b3f2 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1085,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) From b061418fe7eec8b580a8af33ae4d79819bd7cc49 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 21 Aug 2025 08:51:22 -0600 Subject: [PATCH 22/25] ignore tests in VariantShreddingSuite --- dev/diffs/4.0.0.diff | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 7a3f335cd6..7bf594733c 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -1279,6 +1279,21 @@ index 2e33f6505ab..e1e93ab3bad 100644 } withTable("t1", "t2") { +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala +index fee375db10a..8c2c24e2c5f 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala +@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._ + import org.apache.spark.types.variant._ + import org.apache.spark.unsafe.types.{UTF8String, VariantVal} + +-class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest { ++class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest ++ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed ++ with IgnoreCometSuite { + def parseJson(s: String): VariantVal = { + val v = VariantBuilder.parseJson(s, false) + new VariantVal(v.getValue, v.getMetadata) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala index 11e9547dfc5..be9ae40ab3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala From fd395db3eddf94da20745921987081d089e3b1a9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 21 Aug 2025 08:55:11 -0600 Subject: [PATCH 23/25] ignore test that uses non utf-8 strings --- dev/diffs/4.0.0.diff | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 7bf594733c..4a4b958c48 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -3593,6 +3593,20 @@ index 4b27082e188..09f591dfed3 100644 Utils.withContextClassLoader(Utils.getSparkClassLoader) { withUserDefinedFunction(udfInfo.funcName -> false) { val sparkClassLoader = Thread.currentThread().getContextClassLoader +diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +index cc7bb193731..06555d48da7 100644 +--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ++++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +@@ -818,7 +818,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter + } + } + +- test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT") { ++ test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT", ++ IgnoreComet("Comet does not support reading non UTF-8 strings")) { + withTable("t1", "t2") { + withTempDir { dir => + val file = new File(dir, "test.hex") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index b67370f6eb9..746b3974b29 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala From a2a862e19884b6df2ea95ddb1785d070b4f643b0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Aug 2025 14:07:35 -0600 Subject: [PATCH 24/25] add comments --- .../main/scala/org/apache/spark/sql/comet/util/Utils.scala | 4 +--- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 85c1ccd1f1..a72208db27 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -126,9 +126,7 @@ object Utils extends CometTypeShim { case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) case _: StringType => ArrowType.Utf8.INSTANCE - case dt if isStringCollationType(dt) => - // TODO collation information is lost with this transformation - ArrowType.Utf8.INSTANCE + case dt if isStringCollationType(dt) => ArrowType.Utf8.INSTANCE case BinaryType => ArrowType.Binary.INSTANCE case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale, 128) case DateType => new ArrowType.Date(DateUnit.DAY) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index b08f647174..9c2d7ff326 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -366,6 +366,8 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => false case dt if isStringCollationType(dt) => + // we don't need specific support for collation in scans, but this + // is a convenient place to force the whole query to fall back to Spark for now false case s: StructType if s.fields.isEmpty => false From df9d42f96d679dd32fbd6ef1a3d15bca408fba95 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Aug 2025 14:09:18 -0600 Subject: [PATCH 25/25] use DataTypeSupport.isComplexType --- .../main/scala/org/apache/comet/rules/CometScanRule.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 9c2d7ff326..e1511b289b 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.{CometConf, DataTypeSupport} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos} +import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.shims.CometTypeShim @@ -298,11 +299,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com val partitionSchemaSupported = typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) - def isComplexType(dt: DataType): Boolean = dt match { - case _: StructType | _: ArrayType | _: MapType => true - case _ => false - } - def hasUnsupportedType(dataType: DataType): Boolean = { dataType match { case s: StructType => s.exists(field => hasUnsupportedType(field.dataType))