Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,6 @@ dist/
metastore_db/

.ipynb_checkpoints

# For Spark warehouse
gluten-ut/*/spark-warehouse/
4 changes: 4 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
"session window groupBy with multiple keys statement - keys overlapped with sessions")
.excludeCH("SPARK-36465: filter out events with negative/zero gap duration")
.excludeCH("SPARK-36724: Support timestamp_ntz as a type of time column for SessionWindow")
.excludeCH(
"SPARK-49836 using window fn with window as parameter should preserve parent operator")
enableSuite[GlutenDataFrameSetOperationsSuite]
.exclude("SPARK-37371: UnionExec should support columnar if all children support columnar")
// Result depends on the implementation for nondeterministic expression rand.
Expand Down Expand Up @@ -1659,6 +1661,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.excludeCH("full outer join with unique keys using SortMergeJoin (whole-stage-codegen off)")
.excludeCH("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)")
.excludeCH("SPARK-32717: AQEOptimizer should respect excludedRules configuration")
.excludeCH("SPARK-46037: ShuffledHashJoin build left with left outer join, "
+ "codegen off (whole-stage-codegen off)")
.excludeCH("SPARK-46037: ShuffledHashJoin build left with left outer join, "
+ "codegen off (whole-stage-codegen on)")
enableSuite[GlutenOuterJoinSuiteForceShjOn]
.excludeCH("basic left outer join using ShuffledHashJoin (whole-stage-codegen off)")
.excludeCH("basic left outer join using ShuffledHashJoin (whole-stage-codegen on)")
Expand All @@ -1684,6 +1690,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.excludeCH("full outer join with unique keys using ShuffledHashJoin (whole-stage-codegen on)")
.excludeCH("full outer join with unique keys using SortMergeJoin (whole-stage-codegen off)")
.excludeCH("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)")
.excludeCH("SPARK-46037: ShuffledHashJoin build left with left outer join, "
+ "codegen off (whole-stage-codegen off)")
.excludeCH("SPARK-46037: ShuffledHashJoin build left with left outer join, "
+ "codegen off (whole-stage-codegen on)")
enableSuite[GlutenParametersSuite]
enableSuite[GlutenParquetCodecSuite]
// codec not supported in native
Expand Down
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@
</activation>
<properties>
<java.version>1.8</java.version>
<iceberg.version>1.5.0</iceberg.version>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it works well on ch backends? For velox backends, the iceberg version must use 1.8.0

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Velox backend tests, we already upgraded to JDK17. I think the intention here is to enable JDK8 + Spark-355

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we try to build with JD8 and Spark 355, iceberg should not be included. The changes here don't seem necessary to me.
Besides, we need to define iceberg.version in spark config file instead of jdk config file so that we can use different iceberg versions for different sparks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Iceberg and Spark versions are bound together, it means that after upgrading Spark to version 3.5.5, it will no longer be possible to use JDK 8. However, Spark 3.5.5 itself still supports the use of JDK 8.
Use different versions of JDK and Spark to support different versions of Iceberg.

Copy link
Copy Markdown
Contributor

@jackylee-ch jackylee-ch Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not get your mind. I have runed Iceberg 1.5.0 with vanilla Spark 3.5.5 before, and it can't work that way. Thus If we want use vanilla Spark 3.5.5 with JDK8, then iceberg can not be involved.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we try to build with JD8 and Spark 355, iceberg should not be included. The changes here don't seem necessary to me. Besides, we need to define iceberg.version in spark config file instead of jdk config file so that we can use different iceberg versions for different sparks.

+1
it seems we should not bind JDK version with iceberg version as this will impact with Spark-344 also.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackylee-ch I made a quick patch to test iceberg 1.5 + Spark-355 + JDK8, #9001

and it looks like all the unit tests passed: https://github.com/apache/incubator-gluten/actions/runs/13859404046/job/38785108628

It seems you were using #8890 to test it, sorry for that as I have removed the -Piceberg from spark-3.5.5 and jdk8 GA.

BTW, current PR has already met the problem I described before, the vanilla spark would core dump in VeloxTPCHIcebergSuite
https://github.com/apache/incubator-gluten/actions/runs/13916861429/job/38941504564?pr=9031

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackylee-ch Thanks for point out, I didnt realize this change - so it looks like Velox backend does not support this combination JDK8 + Spark355 + Iceberg1.5

Not sure if this applies to the CK backend also. In case CK backend can work with this, could we add a new special profile so it wont impact Velox backend?

Cc: @baibaichen

Copy link
Copy Markdown
Contributor

@jackylee-ch jackylee-ch Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this core dump issue occurs in the vanilla Spark 3.5.5 environment with Iceberg 1.5.0, and it is unrelated to the Velox backend. Similar problems can also be encountered when using the CH backend.
@jlfsdtc you can double check for this~

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we try to build with JD8 and Spark 355, iceberg should not be included. The changes here don't seem necessary to me. Besides, we need to define iceberg.version in spark config file instead of jdk config file so that we can use different iceberg versions for different sparks.

+1 it seems we should not bind JDK version with iceberg version as this will impact with Spark-344 also.

en, i lose of that. Spark-344 is another version of Iceberg

</properties>
</profile>
<profile>
Expand All @@ -288,6 +289,7 @@
<properties>
<java.version>11</java.version>
<caffeine.version>3.1.8</caffeine.version>
<iceberg.version>1.8.0</iceberg.version>
</properties>
</profile>
<profile>
Expand All @@ -298,6 +300,7 @@
<properties>
<java.version>17</java.version>
<caffeine.version>3.1.8</caffeine.version>
<iceberg.version>1.8.0</iceberg.version>
</properties>
</profile>
<profile>
Expand Down Expand Up @@ -347,8 +350,7 @@
<properties>
<sparkbundle.version>3.5</sparkbundle.version>
<sparkshim.artifactId>spark-sql-columnar-shims-spark35</sparkshim.artifactId>
<spark.version>3.5.2</spark.version>
<iceberg.version>1.5.0</iceberg.version>
<spark.version>3.5.5</spark.version>
<delta.package.name>delta-spark</delta.package.name>
<delta.version>3.2.0</delta.version>
<delta.binary.version>32</delta.binary.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class Spark35Shims extends SparkShims {
override def filesGroupedToBuckets(
selectedPartitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = {
selectedPartitions
.flatMap(p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values)))
.flatMap(
p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)))
.groupBy {
f =>
BucketingUtils
Expand Down Expand Up @@ -418,6 +419,7 @@ class Spark35Shims extends SparkShims {
PartitionedFileUtil.splitFiles(
sparkSession,
FileStatusWithMetadata(file, metadata),
filePath,
isSplitable,
maxSplitBytes,
partitionValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.sql.shims.{SparkShimDescriptor, SparkShims}
import org.apache.gluten.sql.shims.spark35.SparkShimProvider.DESCRIPTOR

object SparkShimProvider {
val DESCRIPTOR = SparkShimDescriptor(3, 5, 2)
val DESCRIPTOR = SparkShimDescriptor(3, 5, 5)
}

class SparkShimProvider extends org.apache.gluten.sql.shims.SparkShimProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ abstract class AbstractFileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions
.flatMap(p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values)))
.flatMap(
p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)))
.groupBy {
f =>
BucketingUtils
Expand Down Expand Up @@ -264,14 +265,14 @@ abstract class AbstractFileSourceScanExec(
partition =>
partition.files.flatMap {
file =>
if (shouldProcess(file.getPath)) {
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession,
relation.options,
file.getPath)
val filePath = file.getPath
if (shouldProcess(filePath)) {
val isSplitable =
relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
Expand Down
Loading