diff --git a/common/src/main/spark-4.1/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-4.1/org/apache/comet/shims/CometTypeShim.scala
deleted file mode 100644
index f75fde7de9..0000000000
--- a/common/src/main/spark-4.1/org/apache/comet/shims/CometTypeShim.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.execution.datasources.VariantMetadata
-import org.apache.spark.sql.types.{DataType, StringType, StructType}
-
-trait CometTypeShim {
- // A `StringType` carries collation metadata in Spark 4.0. Only non-default (non-UTF8_BINARY)
- // collations have semantics Comet's byte-level hashing/sorting/equality cannot honor. The
- // default `StringType` object is `StringType(UTF8_BINARY_COLLATION_ID)`, so comparing
- // `collationId` against that instance's id picks out non-default collations without needing
- // `private[sql]` helpers on `StringType`.
- def isStringCollationType(dt: DataType): Boolean = dt match {
- case st: StringType => st.collationId != StringType.collationId
- case _ => false
- }
-
- // Spark 4.0's `PushVariantIntoScan` rewrites `VariantType` columns into a `StructType` whose
- // fields each carry `__VARIANT_METADATA_KEY` metadata, then pushes `variant_get` paths down as
- // ordinary struct field accesses. Comet's native scans don't understand the on-disk Parquet
- // variant shredding layout, so reading such a struct natively returns nulls. Detect the marker
- // and force scan fallback.
- def isVariantStruct(s: StructType): Boolean = VariantMetadata.isVariantStruct(s)
-}
diff --git a/common/src/main/spark-4.1/org/apache/comet/shims/ShimBatchReader.scala b/common/src/main/spark-4.1/org/apache/comet/shims/ShimBatchReader.scala
deleted file mode 100644
index 8ce51b31b6..0000000000
--- a/common/src/main/spark-4.1/org/apache/comet/shims/ShimBatchReader.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.paths.SparkPath
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-
-object ShimBatchReader {
- def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
- PartitionedFile(
- partitionValues,
- SparkPath.fromUrlString(file),
- -1, // -1 means we read the entire file
- -1,
- Array.empty[String],
- 0,
- 0)
-}
diff --git a/common/src/main/spark-4.1/org/apache/comet/shims/ShimCometConf.scala b/common/src/main/spark-4.1/org/apache/comet/shims/ShimCometConf.scala
deleted file mode 100644
index 0eb57c52b4..0000000000
--- a/common/src/main/spark-4.1/org/apache/comet/shims/ShimCometConf.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-trait ShimCometConf {
- protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
-}
diff --git a/common/src/main/spark-4.1/org/apache/comet/shims/ShimFileFormat.scala b/common/src/main/spark-4.1/org/apache/comet/shims/ShimFileFormat.scala
deleted file mode 100644
index 1702db135a..0000000000
--- a/common/src/main/spark-4.1/org/apache/comet/shims/ShimFileFormat.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
-import org.apache.spark.sql.types.StructType
-
-object ShimFileFormat {
- // A name for a temporary column that holds row indexes computed by the file format reader
- // until they can be placed in the _metadata struct.
- val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
-
- def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
- ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
-}
diff --git a/common/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala b/common/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
deleted file mode 100644
index b6a1b56d97..0000000000
--- a/common/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.AccumulatorV2
-
-object ShimTaskMetrics {
-
- def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
- taskMetrics._externalAccums.lastOption
-}
diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala
similarity index 100%
rename from common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala
rename to common/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala
diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/ShimBatchReader.scala b/common/src/main/spark-4.x/org/apache/comet/shims/ShimBatchReader.scala
similarity index 100%
rename from common/src/main/spark-4.0/org/apache/comet/shims/ShimBatchReader.scala
rename to common/src/main/spark-4.x/org/apache/comet/shims/ShimBatchReader.scala
diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala b/common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala
similarity index 100%
rename from common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala
rename to common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala
diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/ShimFileFormat.scala b/common/src/main/spark-4.x/org/apache/comet/shims/ShimFileFormat.scala
similarity index 100%
rename from common/src/main/spark-4.0/org/apache/comet/shims/ShimFileFormat.scala
rename to common/src/main/spark-4.x/org/apache/comet/shims/ShimFileFormat.scala
diff --git a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala b/common/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
similarity index 100%
rename from common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
rename to common/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
diff --git a/pom.xml b/pom.xml
index 795a3311c9..d9afe8fd1c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -673,8 +673,8 @@ under the License.
1.15.2
4.13.6
2.0.16
- spark-4.0
- not-needed-yet
+ spark-4.x
+ spark-4.0
17
${java.version}
@@ -697,8 +697,8 @@ under the License.
1.16.0
4.13.6
2.0.17
- spark-4.1
- not-needed-yet
+ spark-4.x
+ spark-4.1
17
${java.version}
diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala
deleted file mode 100644
index e88cbf1f53..0000000000
--- a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.SparkContext
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
-import org.apache.spark.sql.internal.SQLConf
-
-import org.apache.comet.shims.ShimCometBroadcastExchangeExec.SPARK_MAX_BROADCAST_TABLE_SIZE
-
-trait ShimCometBroadcastExchangeExec {
-
- def setJobGroupOrTag(sc: SparkContext, broadcastExchange: BroadcastExchangeLike): Unit = {
- // Setup a job tag here so later it may get cancelled by tag if necessary.
- sc.addJobTag(broadcastExchange.jobTag)
- sc.setInterruptOnCancel(true)
- }
-
- def cancelJobGroup(sc: SparkContext, broadcastExchange: BroadcastExchangeLike): Unit = {
- sc.cancelJobsWithTag(broadcastExchange.jobTag)
- }
-
- def maxBroadcastTableBytes(conf: SQLConf): Long = {
- JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB"))
- }
-
-}
-
-object ShimCometBroadcastExchangeExec {
- val SPARK_MAX_BROADCAST_TABLE_SIZE = "spark.sql.maxBroadcastTableSize"
-}
diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
deleted file mode 100644
index a8ae94cd7c..0000000000
--- a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.ShuffleDependency
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleExchangeExec, ShuffleType}
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.apache.spark.sql.types.StructType
-
-trait ShimCometShuffleExchangeExec {
- def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType): CometShuffleExchangeExec = {
- CometShuffleExchangeExec(
- s.outputPartitioning,
- s.child,
- s,
- s.shuffleOrigin,
- shuffleType,
- s.advisoryPartitionSize)
- }
-
- protected def fromAttributes(attributes: Seq[Attribute]): StructType =
- DataTypeUtils.fromAttributes(attributes)
-
- protected def getShuffleId(shuffleDependency: ShuffleDependency[Int, _, _]): Int =
- shuffleDependency.shuffleId
-}
diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
deleted file mode 100644
index cac636c45c..0000000000
--- a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.internal.SQLConf
-
-trait ShimCometSparkSessionExtensions {
- protected def getPushedAggregate(scan: ParquetScan): Option[Aggregation] = scan.pushedAggregate
-
- protected def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = true
-
- protected val EXTENDED_EXPLAIN_PROVIDERS_KEY = SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key
-
- def injectQueryStageOptimizerRuleShim(
- extensions: SparkSessionExtensions,
- rule: Rule[SparkPlan]): Unit = {
- extensions.injectQueryStageOptimizerRule(_ => rule)
- }
-}
diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSQLConf.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSQLConf.scala
deleted file mode 100644
index bdb2739460..0000000000
--- a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSQLConf.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.internal.LegacyBehaviorPolicy
-
-trait ShimSQLConf {
- protected val LEGACY = LegacyBehaviorPolicy.LEGACY
- protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
-}
diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSubqueryBroadcast.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSubqueryBroadcast.scala
deleted file mode 100644
index 73d9e53c4a..0000000000
--- a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSubqueryBroadcast.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.execution.{SubqueryAdaptiveBroadcastExec, SubqueryBroadcastExec}
-
-trait ShimSubqueryBroadcast {
-
- /**
- * Gets the build key indices from SubqueryAdaptiveBroadcastExec. Spark 3.x has `index: Int`,
- * Spark 4.x has `indices: Seq[Int]`.
- */
- def getSubqueryBroadcastIndices(sab: SubqueryAdaptiveBroadcastExec): Seq[Int] = {
- sab.indices
- }
-
- /** Same version shim for SubqueryBroadcastExec. */
- def getSubqueryBroadcastExecIndices(sub: SubqueryBroadcastExec): Seq[Int] = {
- sub.indices
- }
-}
diff --git a/spark/src/main/spark-4.1/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala b/spark/src/main/spark-4.1/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
deleted file mode 100644
index 4e48744fc4..0000000000
--- a/spark/src/main/spark-4.1/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.comet.shims
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config.EXECUTOR_MIN_MEMORY_OVERHEAD
-
-trait ShimCometDriverPlugin {
- protected def getMemoryOverheadMinMib(sparkConf: SparkConf): Long =
- sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)
-}
diff --git a/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
deleted file mode 100644
index 3d9b963a93..0000000000
--- a/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceConstantMetadataAttribute, Literal}
-import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, ScalarSubquery}
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-
-trait ShimCometScanExec extends ShimStreamSourceAwareSparkPlan {
- def wrapped: FileSourceScanExec
-
- lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
- wrapped.fileConstantMetadataColumns
-
- protected def newFileScanRDD(
- fsRelation: HadoopFsRelation,
- readFunction: PartitionedFile => Iterator[InternalRow],
- filePartitions: Seq[FilePartition],
- readSchema: StructType,
- options: ParquetOptions): FileScanRDD = {
- new FileScanRDD(
- fsRelation.sparkSession,
- readFunction,
- filePartitions,
- readSchema,
- fileConstantMetadataColumns,
- fsRelation.fileFormat.fileConstantMetadataExtractors,
- options)
- }
-
- // see SPARK-39634
- protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
-
- protected def getPartitionedFile(
- f: FileStatusWithMetadata,
- p: PartitionDirectory): PartitionedFile =
- PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen)
-
- protected def splitFiles(
- sparkSession: SparkSession,
- file: FileStatusWithMetadata,
- filePath: Path,
- isSplitable: Boolean,
- maxSplitBytes: Long,
- partitionValues: InternalRow): Seq[PartitionedFile] =
- PartitionedFileUtil.splitFiles(file, filePath, isSplitable, maxSplitBytes, partitionValues)
-
- protected def getPushedDownFilters(
- relation: HadoopFsRelation,
- dataFilters: Seq[Expression]): Seq[Filter] = {
- translateToV1Filters(relation, dataFilters, _.toLiteral)
- }
-
- // From Spark FileSourceScanLike
- private def translateToV1Filters(
- relation: HadoopFsRelation,
- dataFilters: Seq[Expression],
- scalarSubqueryToLiteral: ScalarSubquery => Literal): Seq[Filter] = {
- val scalarSubqueryReplaced = dataFilters.map(_.transform {
- // Replace scalar subquery to literal so that `DataSourceStrategy.translateFilter` can
- // support translating it.
- case scalarSubquery: ScalarSubquery => scalarSubqueryToLiteral(scalarSubquery)
- })
-
- val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
- // `dataFilters` should not include any constant metadata col filters
- // because the metadata struct has been flatted in FileSourceStrategy
- // and thus metadata col filters are invalid to be pushed down. Metadata that is generated
- // during the scan can be used for filters.
- scalarSubqueryReplaced
- .filterNot(_.references.exists {
- case FileSourceConstantMetadataAttribute(_) => true
- case _ => false
- })
- .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
- }
-
-}
diff --git a/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala b/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala
deleted file mode 100644
index 41ccdd0402..0000000000
--- a/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.shuffle.ShuffleWriteProcessor
-
-trait ShimCometShuffleWriteProcessor extends ShuffleWriteProcessor {}
diff --git a/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
deleted file mode 100644
index 46bdd2ec03..0000000000
--- a/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import java.io.FileNotFoundException
-
-import scala.util.matching.Regex
-
-import org.apache.spark.QueryContext
-import org.apache.spark.SparkException
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-object ShimSparkErrorConverter {
- val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
-}
-
-/**
- * Spark 4.0-specific implementation for converting error types to proper Spark exceptions.
- */
-trait ShimSparkErrorConverter {
-
- private def parseFloatLiteral(value: String): Float = {
- value.toLowerCase match {
- case "inf" | "+inf" | "infinity" | "+infinity" => Float.PositiveInfinity
- case "-inf" | "-infinity" => Float.NegativeInfinity
- case "nan" | "+nan" | "-nan" => Float.NaN
- case _ => value.toFloat
- }
- }
-
- private def parseDoubleLiteral(value: String): Double = {
- val normalized = value.toLowerCase.stripSuffix("d")
- normalized match {
- case "inf" | "+inf" | "infinity" | "+infinity" => Double.PositiveInfinity
- case "-inf" | "-infinity" => Double.NegativeInfinity
- case "nan" | "+nan" | "-nan" => Double.NaN
- case _ => normalized.toDouble
- }
- }
-
- /**
- * Convert error type string and parameters to appropriate Spark exception. Version-specific
- * implementations call the correct QueryExecutionErrors.* methods.
- *
- * @param errorType
- * The error type from JSON (e.g., "DivideByZero")
- * @param errorClass
- * The Spark error class (e.g., "DIVIDE_BY_ZERO")
- * @param params
- * Error parameters from JSON
- * @param context
- * QueryContext array with SQL text and position information
- * @param summary
- * Formatted summary string showing error location
- * @return
- * Throwable (specific exception type from QueryExecutionErrors), or None if unknown
- */
- def convertErrorType(
- errorType: String,
- _errorClass: String,
- params: Map[String, Any],
- context: Array[QueryContext],
- _summary: String): Option[Throwable] = {
-
- errorType match {
-
- case "DivideByZero" =>
- Some(QueryExecutionErrors.divideByZeroError(context.headOption.orNull))
-
- case "RemainderByZero" =>
- // SPARK 4.0 REMOVED remainderByZeroError so we use generic arithmetic exception
- Some(
- new SparkException(
- errorClass = "REMAINDER_BY_ZERO",
- messageParameters = params.map { case (k, v) => (k, v.toString) },
- cause = null))
-
- case "IntervalDividedByZero" =>
- Some(QueryExecutionErrors.intervalDividedByZeroError(context.headOption.orNull))
-
- case "BinaryArithmeticOverflow" =>
- Some(
- QueryExecutionErrors.binaryArithmeticCauseOverflowError(
- params("value1").toString.toShort,
- params("symbol").toString,
- params("value2").toString.toShort,
- params("functionName").toString))
-
- case "ArithmeticOverflow" =>
- val fromType = params("fromType").toString
- Some(
- QueryExecutionErrors
- .arithmeticOverflowError(fromType + " overflow", "", context.headOption.orNull))
-
- case "IntegralDivideOverflow" =>
- Some(QueryExecutionErrors.overflowInIntegralDivideError(context.headOption.orNull))
-
- case "DecimalSumOverflow" =>
- Some(QueryExecutionErrors.overflowInSumOfDecimalError(context.headOption.orNull, ""))
-
- case "NumericValueOutOfRange" =>
- val decimal = Decimal(params("value").toString)
- Some(
- QueryExecutionErrors.cannotChangeDecimalPrecisionError(
- decimal,
- params("precision").toString.toInt,
- params("scale").toString.toInt,
- context.headOption.orNull))
-
- case "DatetimeOverflow" =>
- // Spark 4.0 doesn't have datetimeOverflowError
- Some(
- new SparkException(
- errorClass = "DATETIME_OVERFLOW",
- messageParameters = params.map { case (k, v) => (k, v.toString) },
- cause = null))
-
- case "InvalidArrayIndex" =>
- Some(
- QueryExecutionErrors.invalidArrayIndexError(
- params("indexValue").toString.toInt,
- params("arraySize").toString.toInt,
- context.headOption.orNull))
-
- case "InvalidElementAtIndex" =>
- Some(
- QueryExecutionErrors.invalidElementAtIndexError(
- params("indexValue").toString.toInt,
- params("arraySize").toString.toInt,
- context.headOption.orNull))
-
- case "InvalidIndexOfZero" =>
- Some(QueryExecutionErrors.invalidIndexOfZeroError(context.headOption.orNull))
-
- case "InvalidBitmapPosition" =>
- Some(
- QueryExecutionErrors.invalidBitmapPositionError(
- params("bitPosition").toString.toLong,
- params("bitmapNumBytes").toString.toLong))
-
- case "DuplicatedMapKey" =>
- Some(QueryExecutionErrors.duplicateMapKeyFoundError(params("key")))
-
- case "NullMapKey" =>
- Some(QueryExecutionErrors.nullAsMapKeyNotAllowedError())
-
- case "MapKeyValueDiffSizes" =>
- Some(QueryExecutionErrors.mapDataKeyArrayLengthDiffersFromValueArrayLengthError())
-
- case "ExceedMapSizeLimit" =>
- Some(QueryExecutionErrors.exceedMapSizeLimitError(params("size").toString.toInt))
-
- case "CollectionSizeLimitExceeded" =>
- Some(
- QueryExecutionErrors.createArrayWithElementsExceedLimitError(
- "array",
- params("numElements").toString.toLong))
-
- case "NotNullAssertViolation" =>
- Some(
- QueryExecutionErrors.foundNullValueForNotNullableFieldError(
- params("fieldName").toString))
-
- case "ValueIsNull" =>
- Some(
- QueryExecutionErrors.fieldCannotBeNullError(
- params.getOrElse("rowIndex", 0).toString.toInt,
- params("fieldName").toString))
-
- case "CannotParseTimestamp" =>
- Some(
- QueryExecutionErrors.ansiDateTimeParseError(
- new Exception(params("message").toString),
- params("suggestedFunc").toString))
-
- case "InvalidFractionOfSecond" =>
- Some(QueryExecutionErrors.invalidFractionOfSecondError(params("value").toString.toDouble))
-
- case "CastInvalidValue" =>
- val str = UTF8String.fromString(params("value").toString)
- val targetType = getDataType(params("toType").toString)
- Some(
- QueryExecutionErrors
- .invalidInputInCastToNumberError(targetType, str, context.headOption.orNull))
-
- case "InvalidInputInCastToDatetime" =>
- val str = UTF8String.fromString(params("value").toString)
- val targetType = getDataType(params("toType").toString)
- Some(
- QueryExecutionErrors
- .invalidInputInCastToDatetimeError(str, targetType, context.headOption.orNull))
-
- case "CastOverFlow" =>
- val fromType = getDataType(params("fromType").toString)
- val toType = getDataType(params("toType").toString)
- val valueStr = params("value").toString
-
- // Convert string value to appropriate type for toSQLValue
- val typedValue: Any = fromType match {
- case _: DecimalType =>
- // Parse decimal string (may have "BD" suffix from BigDecimal.toString)
- val cleanStr = if (valueStr.endsWith("BD")) valueStr.dropRight(2) else valueStr
- Decimal(cleanStr)
- case ByteType =>
- // Strip "T" suffix for TINYINT literals
- val cleanStr = if (valueStr.endsWith("T")) valueStr.dropRight(1) else valueStr
- cleanStr.toByte
- case ShortType =>
- // Strip "S" suffix for SMALLINT literals
- val cleanStr = if (valueStr.endsWith("S")) valueStr.dropRight(1) else valueStr
- cleanStr.toShort
- case IntegerType => valueStr.toInt
- case LongType =>
- // Strip "L" suffix for BIGINT literals
- val cleanStr = if (valueStr.endsWith("L")) valueStr.dropRight(1) else valueStr
- cleanStr.toLong
- case FloatType => parseFloatLiteral(valueStr)
- case DoubleType => parseDoubleLiteral(valueStr)
- case StringType => UTF8String.fromString(valueStr)
- case _ => valueStr // Fallback to string
- }
-
- Some(QueryExecutionErrors.castingCauseOverflowError(typedValue, fromType, toType))
-
- case "CannotParseDecimal" =>
- Some(QueryExecutionErrors.cannotParseDecimalError())
-
- case "InvalidUtf8String" =>
- val hexStr = UTF8String.fromString(params("hexString").toString)
- Some(QueryExecutionErrors.invalidUTF8StringError(hexStr))
-
- case "UnexpectedPositiveValue" =>
- Some(
- QueryExecutionErrors.unexpectedValueForStartInFunctionError(
- params("parameterName").toString))
-
- case "UnexpectedNegativeValue" =>
- Some(
- QueryExecutionErrors.unexpectedValueForLengthInFunctionError(
- params("parameterName").toString,
- params("actualValue").toString.toInt))
-
- case "InvalidRegexGroupIndex" =>
- Some(
- QueryExecutionErrors.invalidRegexGroupIndexError(
- params("functionName").toString,
- params("groupCount").toString.toInt,
- params("groupIndex").toString.toInt))
-
- case "DatatypeCannotOrder" =>
- Some(
- QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError(
- params("dataType").toString))
-
- case "ScalarSubqueryTooManyRows" =>
- Some(QueryExecutionErrors.multipleRowScalarSubqueryError(context.headOption.orNull))
-
- case "IntervalArithmeticOverflowWithSuggestion" =>
- Some(
- QueryExecutionErrors.withSuggestionIntervalArithmeticOverflowError(
- params.get("functionName").map(_.toString).getOrElse(""),
- context.headOption.orNull))
-
- case "IntervalArithmeticOverflowWithoutSuggestion" =>
- Some(
- QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
- context.headOption.orNull))
-
- case "DuplicateFieldCaseInsensitive" =>
- Some(
- QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
- params("requiredFieldName").toString,
- params("matchedOrcFields").toString))
-
- case "FileNotFound" =>
- val msg = params("message").toString
- val path = ShimSparkErrorConverter.ObjectLocationPattern
- .findFirstMatchIn(msg)
- .map(_.group(1))
- .getOrElse(msg)
- Some(
- QueryExecutionErrors
- .fileNotExistError(path, new FileNotFoundException(s"File $path does not exist")))
-
- case _ =>
- // Unknown error type - return None to trigger fallback
- None
- }
- }
-
- private def getDataType(typeName: String): DataType = {
- typeName.toUpperCase match {
- case "BYTE" | "TINYINT" => ByteType
- case "SHORT" | "SMALLINT" => ShortType
- case "INT" | "INTEGER" => IntegerType
- case "LONG" | "BIGINT" => LongType
- case "FLOAT" | "REAL" => FloatType
- case "DOUBLE" => DoubleType
- case "DECIMAL" => DecimalType.SYSTEM_DEFAULT
- case "STRING" | "VARCHAR" => StringType
- case "BINARY" => BinaryType
- case "BOOLEAN" => BooleanType
- case "DATE" => DateType
- case "TIMESTAMP" => TimestampType
- case _ =>
- try {
- DataType.fromDDL(typeName)
- } catch {
- case _: Exception =>
- // fromDDL rejects types that are syntactically invalid in SQL DDL, such as
- // DECIMAL(p,s) with a negative scale (valid when allowNegativeScaleOfDecimal=true).
- // Parse those manually rather than silently falling back to StringType.
- if (typeName.toUpperCase.startsWith("DECIMAL(") && typeName.endsWith(")")) {
- val inner = typeName.substring("DECIMAL(".length, typeName.length - 1)
- val parts = inner.split(",")
- if (parts.length == 2) {
- try {
- DataTypes.createDecimalType(parts(0).trim.toInt, parts(1).trim.toInt)
- } catch {
- case _: Exception => StringType
- }
- } else StringType
- } else StringType
- }
- }
- }
-}
diff --git a/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala b/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala
deleted file mode 100644
index 93552fc00f..0000000000
--- a/spark/src/main/spark-4.1/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.sql.connector.read.streaming.SparkDataStream
-import org.apache.spark.sql.execution.StreamSourceAwareSparkPlan
-
-trait ShimStreamSourceAwareSparkPlan extends StreamSourceAwareSparkPlan {
- override def getStream: Option[SparkDataStream] = None
-}
diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala
rename to spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala
diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
rename to spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
rename to spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/ShimSQLConf.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala
rename to spark/src/main/spark-4.x/org/apache/comet/shims/ShimSQLConf.scala
diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSubqueryBroadcast.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/ShimSubqueryBroadcast.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/comet/shims/ShimSubqueryBroadcast.scala
rename to spark/src/main/spark-4.x/org/apache/comet/shims/ShimSubqueryBroadcast.scala
diff --git a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala b/spark/src/main/spark-4.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
rename to spark/src/main/spark-4.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
rename to spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala
rename to spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala
diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
rename to spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala
similarity index 100%
rename from spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala
rename to spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala
diff --git a/spark/src/test/spark-4.1/org/apache/comet/exec/CometShuffle4_0Suite.scala b/spark/src/test/spark-4.1/org/apache/comet/exec/CometShuffle4_0Suite.scala
deleted file mode 100644
index 3d0ec5006d..0000000000
--- a/spark/src/test/spark-4.1/org/apache/comet/exec/CometShuffle4_0Suite.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.exec
-
-import java.util.Collections
-
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryCatalog, InMemoryTableCatalog}
-import org.apache.spark.sql.connector.expressions.Expressions.identity
-import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{FloatType, LongType, StringType, TimestampType}
-
-class CometShuffle4_0Suite extends CometColumnarShuffleSuite {
- override protected val asyncShuffleEnable: Boolean = false
-
- protected val adaptiveExecutionEnabled: Boolean = true
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
- }
-
- override def afterAll(): Unit = {
- spark.sessionState.conf.unsetConf("spark.sql.catalog.testcat")
- super.afterAll()
- }
-
- private val emptyProps: java.util.Map[String, String] = {
- Collections.emptyMap[String, String]
- }
- private val items: String = "items"
- private val itemsColumns: Array[Column] = Array(
- Column.create("id", LongType),
- Column.create("name", StringType),
- Column.create("price", FloatType),
- Column.create("arrive_time", TimestampType))
-
- private val purchases: String = "purchases"
- private val purchasesColumns: Array[Column] = Array(
- Column.create("item_id", LongType),
- Column.create("price", FloatType),
- Column.create("time", TimestampType))
-
- protected def catalog: InMemoryCatalog = {
- val catalog = spark.sessionState.catalogManager.catalog("testcat")
- catalog.asInstanceOf[InMemoryCatalog]
- }
-
- private def createTable(
- table: String,
- columns: Array[Column],
- partitions: Array[Transform],
- catalog: InMemoryTableCatalog = catalog): Unit = {
- catalog.createTable(Identifier.of(Array("ns"), table), columns, partitions, emptyProps)
- }
-
- private def selectWithMergeJoinHint(t1: String, t2: String): String = {
- s"SELECT /*+ MERGE($t1, $t2) */ "
- }
-
- private def createJoinTestDF(
- keys: Seq[(String, String)],
- extraColumns: Seq[String] = Nil,
- joinType: String = ""): DataFrame = {
- val extraColList = if (extraColumns.isEmpty) "" else extraColumns.mkString(", ", ", ", "")
- sql(s"""
- |${selectWithMergeJoinHint("i", "p")}
- |id, name, i.price as purchase_price, p.price as sale_price $extraColList
- |FROM testcat.ns.$items i $joinType JOIN testcat.ns.$purchases p
- |ON ${keys.map(k => s"i.${k._1} = p.${k._2}").mkString(" AND ")}
- |ORDER BY id, purchase_price, sale_price $extraColList
- |""".stripMargin)
- }
-
- test("Fallback to Spark for unsupported partitioning") {
- val items_partitions = Array(identity("id"))
- createTable(items, itemsColumns, items_partitions)
-
- sql(
- s"INSERT INTO testcat.ns.$items VALUES " +
- "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
- "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
- "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
-
- createTable(purchases, purchasesColumns, Array.empty)
- sql(
- s"INSERT INTO testcat.ns.$purchases VALUES " +
- "(1, 42.0, cast('2020-01-01' as timestamp)), " +
- "(3, 19.5, cast('2020-02-01' as timestamp)), " +
- "(5, 26.0, cast('2023-01-01' as timestamp)), " +
- "(6, 50.0, cast('2023-02-01' as timestamp))")
-
- Seq(true, false).foreach { shuffle =>
- withSQLConf(
- SQLConf.V2_BUCKETING_ENABLED.key -> "true",
- "spark.sql.sources.v2.bucketing.shuffle.enabled" -> shuffle.toString,
- SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
- SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "true") {
- val df = createJoinTestDF(Seq("id" -> "item_id"))
- checkSparkAnswer(df)
- }
- }
- }
-}
diff --git a/spark/src/test/spark-4.1/org/apache/comet/iceberg/RESTCatalogHelper.scala b/spark/src/test/spark-4.1/org/apache/comet/iceberg/RESTCatalogHelper.scala
deleted file mode 100644
index bd53804b8d..0000000000
--- a/spark/src/test/spark-4.1/org/apache/comet/iceberg/RESTCatalogHelper.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.iceberg
-
-import java.io.File
-import java.nio.file.Files
-
-/** Helper trait for setting up REST catalog with Jetty 11 (jakarta.servlet) for Spark 4.0 */
-trait RESTCatalogHelper {
-
- /** Helper to set up REST catalog with embedded Jetty server (Spark 4.0 / Jetty 11) */
- def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit =
- withRESTCatalog()(f)
-
- /**
- * Helper to set up REST catalog with optional credential vending.
- *
- * @param vendedCredentials
- * Storage credentials to inject into loadTable responses, simulating REST catalog credential
- * vending. When non-empty, these are added to every LoadTableResponse.config().
- * @param warehouseLocation
- * Override the warehouse location (e.g., for S3). Defaults to a local temp directory.
- */
- def withRESTCatalog(
- vendedCredentials: Map[String, String] = Map.empty,
- warehouseLocation: Option[String] = None)(
- f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
- import org.apache.iceberg.inmemory.InMemoryCatalog
- import org.apache.iceberg.CatalogProperties
- import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
- import org.eclipse.jetty.server.Server
- import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
- import org.eclipse.jetty.server.handler.gzip.GzipHandler
-
- val warehouseDir = Files.createTempDirectory("comet-rest-catalog-test").toFile
- val effectiveWarehouse = warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)
-
- val backendCatalog = new InMemoryCatalog()
- backendCatalog.initialize(
- "in-memory",
- java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, effectiveWarehouse))
-
- val adapter = new RESTCatalogAdapter(backendCatalog)
- if (vendedCredentials.nonEmpty) {
- import scala.jdk.CollectionConverters._
- adapter.setVendedCredentials(vendedCredentials.asJava)
- }
- val servlet = new RESTCatalogServlet(adapter)
-
- val servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
- servletContext.setContextPath("/")
- val servletHolder = new ServletHolder(servlet.asInstanceOf[jakarta.servlet.Servlet])
- servletHolder.setInitParameter("jakarta.ws.rs.Application", "ServiceListPublic")
- servletContext.addServlet(servletHolder, "/*")
- servletContext.setVirtualHosts(null)
- servletContext.insertHandler(new GzipHandler())
-
- val httpServer = new Server(0) // random port
- httpServer.setHandler(servletContext)
-
- try {
- httpServer.start()
- val restUri = httpServer.getURI.toString.stripSuffix("/")
- f(restUri, httpServer, warehouseDir)
- } finally {
- try {
- httpServer.stop()
- httpServer.join()
- } catch {
- case _: Exception => // ignore cleanup errors
- }
- try {
- backendCatalog.close()
- } catch {
- case _: Exception => // ignore cleanup errors
- }
- def deleteRecursively(file: File): Unit = {
- if (file.isDirectory) {
- file.listFiles().foreach(deleteRecursively)
- }
- file.delete()
- }
- deleteRecursively(warehouseDir)
- }
- }
-}
diff --git a/spark/src/test/spark-4.1/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala b/spark/src/test/spark-4.1/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
deleted file mode 100644
index 80811d701f..0000000000
--- a/spark/src/test/spark-4.1/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.SQLQueryTestHelper
-
-trait ShimCometTPCHQuerySuite extends SQLQueryTestHelper {}
diff --git a/spark/src/test/spark-4.1/org/apache/iceberg/rest/RESTCatalogServlet.java b/spark/src/test/spark-4.1/org/apache/iceberg/rest/RESTCatalogServlet.java
deleted file mode 100644
index b54dacac48..0000000000
--- a/spark/src/test/spark-4.1/org/apache/iceberg/rest/RESTCatalogServlet.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.rest;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.io.UncheckedIOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import jakarta.servlet.http.HttpServlet;
-import jakarta.servlet.http.HttpServletRequest;
-import jakarta.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.iceberg.exceptions.RESTException;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.io.CharStreams;
-import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
-import org.apache.iceberg.rest.RESTCatalogAdapter.Route;
-import org.apache.iceberg.rest.responses.ErrorResponse;
-import org.apache.iceberg.util.Pair;
-
-import static java.lang.String.format;
-
-/**
- * The RESTCatalogServlet provides a servlet implementation used in combination with a
- * RESTCatalogAdaptor to proxy the REST Spec to any Catalog implementation.
- * Modified version of Iceberg's org/apache/iceberg/rest/RESTCatalogServlet.java
- */
-public class RESTCatalogServlet extends HttpServlet {
- private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogServlet.class);
-
- private final RESTCatalogAdapter restCatalogAdapter;
- private final Map responseHeaders =
- ImmutableMap.of("Content-Type", "application/json");
-
- public RESTCatalogServlet(RESTCatalogAdapter restCatalogAdapter) {
- this.restCatalogAdapter = restCatalogAdapter;
- }
-
- @Override
- protected void doGet(HttpServletRequest request, HttpServletResponse response)
- throws IOException {
- execute(ServletRequestContext.from(request), response);
- }
-
- @Override
- protected void doHead(HttpServletRequest request, HttpServletResponse response)
- throws IOException {
- execute(ServletRequestContext.from(request), response);
- }
-
- @Override
- protected void doPost(HttpServletRequest request, HttpServletResponse response)
- throws IOException {
- execute(ServletRequestContext.from(request), response);
- }
-
- @Override
- protected void doDelete(HttpServletRequest request, HttpServletResponse response)
- throws IOException {
- execute(ServletRequestContext.from(request), response);
- }
-
- protected void execute(ServletRequestContext context, HttpServletResponse response)
- throws IOException {
- response.setStatus(HttpServletResponse.SC_OK);
- responseHeaders.forEach(response::setHeader);
-
- if (context.error().isPresent()) {
- response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- RESTObjectMapper.mapper().writeValue(response.getWriter(), context.error().get());
- return;
- }
-
- try {
- Object responseBody =
- restCatalogAdapter.execute(
- context.method(),
- context.path(),
- context.queryParams(),
- context.body(),
- context.route().responseClass(),
- context.headers(),
- handle(response));
-
- if (responseBody != null) {
- RESTObjectMapper.mapper().writeValue(response.getWriter(), responseBody);
- }
- } catch (RESTException e) {
- LOG.error("Error processing REST request", e);
- response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- } catch (Exception e) {
- LOG.error("Unexpected exception when processing REST request", e);
- response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- }
- }
-
- protected Consumer handle(HttpServletResponse response) {
- return (errorResponse) -> {
- response.setStatus(errorResponse.code());
- try {
- RESTObjectMapper.mapper().writeValue(response.getWriter(), errorResponse);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- };
- }
-
- public static class ServletRequestContext {
- private HTTPMethod method;
- private Route route;
- private String path;
- private Map headers;
- private Map queryParams;
- private Object body;
-
- private ErrorResponse errorResponse;
-
- private ServletRequestContext(ErrorResponse errorResponse) {
- this.errorResponse = errorResponse;
- }
-
- private ServletRequestContext(
- HTTPMethod method,
- Route route,
- String path,
- Map headers,
- Map queryParams,
- Object body) {
- this.method = method;
- this.route = route;
- this.path = path;
- this.headers = headers;
- this.queryParams = queryParams;
- this.body = body;
- }
-
- static ServletRequestContext from(HttpServletRequest request) throws IOException {
- HTTPMethod method = HTTPMethod.valueOf(request.getMethod());
- String path = request.getRequestURI().substring(1);
- Pair> routeContext = Route.from(method, path);
-
- if (routeContext == null) {
- return new ServletRequestContext(
- ErrorResponse.builder()
- .responseCode(400)
- .withType("BadRequestException")
- .withMessage(format("No route for request: %s %s", method, path))
- .build());
- }
-
- Route route = routeContext.first();
- Object requestBody = null;
- if (route.requestClass() != null) {
- requestBody =
- RESTObjectMapper.mapper().readValue(request.getReader(), route.requestClass());
- } else if (route == Route.TOKENS) {
- try (Reader reader = new InputStreamReader(request.getInputStream())) {
- requestBody = RESTUtil.decodeFormData(CharStreams.toString(reader));
- }
- }
-
- Map queryParams =
- request.getParameterMap().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()[0]));
- Map headers =
- Collections.list(request.getHeaderNames()).stream()
- .collect(Collectors.toMap(Function.identity(), request::getHeader));
-
- return new ServletRequestContext(method, route, path, headers, queryParams, requestBody);
- }
-
- public HTTPMethod method() {
- return method;
- }
-
- public Route route() {
- return route;
- }
-
- public String path() {
- return path;
- }
-
- public Map headers() {
- return headers;
- }
-
- public Map queryParams() {
- return queryParams;
- }
-
- public Object body() {
- return body;
- }
-
- public Optional error() {
- return Optional.ofNullable(errorResponse);
- }
- }
-}
diff --git a/spark/src/test/spark-4.1/org/apache/spark/comet/shims/ShimTestUtils.scala b/spark/src/test/spark-4.1/org/apache/spark/comet/shims/ShimTestUtils.scala
deleted file mode 100644
index 923ae68f2e..0000000000
--- a/spark/src/test/spark-4.1/org/apache/spark/comet/shims/ShimTestUtils.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.comet.shims
-
-import java.io.File
-
-object ShimTestUtils {
- def listDirectory(path: File): Array[String] =
- org.apache.spark.TestUtils.listDirectory(path)
-}
diff --git a/spark/src/test/spark-4.1/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-4.1/org/apache/spark/sql/CometToPrettyStringSuite.scala
deleted file mode 100644
index e7f1757bf6..0000000000
--- a/spark/src/test/spark-4.1/org/apache/spark/sql/CometToPrettyStringSuite.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString}
-import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.classic.Dataset
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle
-import org.apache.spark.sql.types.DataTypes
-
-import org.apache.comet.CometFuzzTestBase
-import org.apache.comet.expressions.{CometCast, CometEvalMode}
-import org.apache.comet.serde.Compatible
-
-class CometToPrettyStringSuite extends CometFuzzTestBase {
-
- test("ToPrettyString") {
- val style = List(
- BinaryOutputStyle.UTF8,
- BinaryOutputStyle.BASIC,
- BinaryOutputStyle.BASE64,
- BinaryOutputStyle.HEX,
- BinaryOutputStyle.HEX_DISCRETE)
- style.foreach(s =>
- withSQLConf(SQLConf.BINARY_OUTPUT_STYLE.key -> s.toString) {
- val df = spark.read.parquet(filename)
- df.createOrReplaceTempView("t1")
- val table = spark.sessionState.catalog.lookupRelation(TableIdentifier("t1"))
-
- for (field <- df.schema.fields) {
- val col = field.name
- val prettyExpr = Alias(ToPrettyString(UnresolvedAttribute(col)), s"pretty_$col")()
- val plan = Project(Seq(prettyExpr), table)
- val analyzed = spark.sessionState.analyzer.execute(plan)
- val result: DataFrame = Dataset.ofRows(spark, analyzed)
- val supportLevel = CometCast.isSupported(
- field.dataType,
- DataTypes.StringType,
- Some(spark.sessionState.conf.sessionLocalTimeZone),
- CometEvalMode.TRY)
- supportLevel match {
- case _: Compatible => checkSparkAnswerAndOperator(result)
- case _ => checkSparkAnswer(result)
- }
- }
- })
- }
-}
diff --git a/spark/src/test/spark-4.1/org/apache/spark/sql/ShimCometTestBase.scala b/spark/src/test/spark-4.1/org/apache/spark/sql/ShimCometTestBase.scala
deleted file mode 100644
index 5ad4543220..0000000000
--- a/spark/src/test/spark-4.1/org/apache/spark/sql/ShimCometTestBase.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.catalyst.expressions.{Expression, MakeDecimal}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.classic.{Dataset, ExpressionColumnNode, SparkSession}
-
-trait ShimCometTestBase {
- type SparkSessionType = SparkSession
-
- def createSparkSessionWithExtensions(conf: SparkConf): SparkSessionType = {
- SparkSession
- .builder()
- .config(conf)
- .master("local[1]")
- .withExtensions(new org.apache.comet.CometSparkSessionExtensions)
- .getOrCreate()
- }
-
- def datasetOfRows(spark: SparkSession, plan: LogicalPlan): DataFrame = {
- Dataset.ofRows(spark, plan)
- }
-
- def getColumnFromExpression(expr: Expression): Column = {
- new Column(ExpressionColumnNode.apply(expr))
- }
-
- def extractLogicalPlan(df: DataFrame): LogicalPlan = {
- df.queryExecution.analyzed
- }
-
- def createMakeDecimalColumn(child: Expression, precision: Int, scale: Int): Column = {
- new Column(ExpressionColumnNode.apply(MakeDecimal(child, precision, scale, true)))
- }
-}
diff --git a/spark/src/test/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala b/spark/src/test/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala
deleted file mode 100644
index f03608d3e3..0000000000
--- a/spark/src/test/spark-4.1/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-trait ShimCometTPCDSQuerySuite {}
diff --git a/spark/src/test/spark-4.0/org/apache/comet/exec/CometShuffle4_0Suite.scala b/spark/src/test/spark-4.x/org/apache/comet/exec/CometShuffle4_0Suite.scala
similarity index 100%
rename from spark/src/test/spark-4.0/org/apache/comet/exec/CometShuffle4_0Suite.scala
rename to spark/src/test/spark-4.x/org/apache/comet/exec/CometShuffle4_0Suite.scala
diff --git a/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala b/spark/src/test/spark-4.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
similarity index 100%
rename from spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala
rename to spark/src/test/spark-4.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
diff --git a/spark/src/test/spark-4.0/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala b/spark/src/test/spark-4.x/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
similarity index 100%
rename from spark/src/test/spark-4.0/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
rename to spark/src/test/spark-4.x/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
diff --git a/spark/src/test/spark-4.0/org/apache/iceberg/rest/RESTCatalogServlet.java b/spark/src/test/spark-4.x/org/apache/iceberg/rest/RESTCatalogServlet.java
similarity index 100%
rename from spark/src/test/spark-4.0/org/apache/iceberg/rest/RESTCatalogServlet.java
rename to spark/src/test/spark-4.x/org/apache/iceberg/rest/RESTCatalogServlet.java
diff --git a/spark/src/test/spark-4.0/org/apache/spark/comet/shims/ShimTestUtils.scala b/spark/src/test/spark-4.x/org/apache/spark/comet/shims/ShimTestUtils.scala
similarity index 100%
rename from spark/src/test/spark-4.0/org/apache/spark/comet/shims/ShimTestUtils.scala
rename to spark/src/test/spark-4.x/org/apache/spark/comet/shims/ShimTestUtils.scala
diff --git a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-4.x/org/apache/spark/sql/CometToPrettyStringSuite.scala
similarity index 100%
rename from spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala
rename to spark/src/test/spark-4.x/org/apache/spark/sql/CometToPrettyStringSuite.scala
diff --git a/spark/src/test/spark-4.0/org/apache/spark/sql/ShimCometTestBase.scala b/spark/src/test/spark-4.x/org/apache/spark/sql/ShimCometTestBase.scala
similarity index 100%
rename from spark/src/test/spark-4.0/org/apache/spark/sql/ShimCometTestBase.scala
rename to spark/src/test/spark-4.x/org/apache/spark/sql/ShimCometTestBase.scala
diff --git a/spark/src/test/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala b/spark/src/test/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala
similarity index 100%
rename from spark/src/test/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala
rename to spark/src/test/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala