From 1687dd37327104f4cc4eb8c37a15aff741165908 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 16 Jul 2025 21:13:54 +0800 Subject: [PATCH 1/6] [wip] try to decouple rss dependency from GlutenConfig --- ...celeborn.CelebornColumnarSerializerFactory | 1 + ...CelebornColumnarBatchSerlizerFactory.scala | 26 ++++++++++++++++++ .../velox/VeloxSparkPlanExecApi.scala | 10 +++---- ...elebornColumnarBatchSerializerFactory.java | 23 ++++++++++++++++ .../celeborn/CelebornShuffleManager.java | 27 ++++++++++++++++++- .../shuffle/SupportsColumnarShuffle.scala | 4 +++ 6 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarSerializerFactory create mode 100644 backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala create mode 100644 gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java diff --git a/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarSerializerFactory b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarSerializerFactory new file mode 100644 index 000000000000..eb150c494252 --- /dev/null +++ b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarSerializerFactory @@ -0,0 +1 @@ +org.apache.spark.shuffle.VeloxCelebornColumnarBatchSerlizerFactory diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala new file mode 100644 index 000000000000..51450fbaca74 --- /dev/null +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.gluten.backendsapi.velox.VeloxBackend +import org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory + +class VeloxCelebornColumnarBatchSerlizerFactory extends CelebornColumnarBatchSerializerFactory { + override def backendName(): String = VeloxBackend.BACKEND_NAME + + override def columnarBatchSerializerClass(): String = "org.apache.spark.shuffle.CelebornColumnarBatchSerializer" +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 12e2c806074a..f8a0993748dd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -24,9 +24,8 @@ import org.apache.gluten.expression._ import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.gluten.vectorized.{ColumnarBatchSerializer, ColumnarBatchSerializeResult} - -import org.apache.spark.{ShuffleDependency, SparkException} +import org.apache.gluten.vectorized.{ColumnarBatchSerializeResult, ColumnarBatchSerializer} +import org.apache.spark.{ShuffleDependency, SparkEnv, SparkException} import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec, PullOutArrowEvalPythonPreProjectHelper} import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.rdd.RDD @@ -54,11 +53,10 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.task.TaskResources - import org.apache.commons.lang3.ClassUtils +import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer import javax.ws.rs.core.UriBuilder - import java.util.Locale class VeloxSparkPlanExecApi extends SparkPlanExecApi { @@ -553,6 +551,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { partitioning: Partitioning, output: Seq[Attribute]): ShuffleWriterType = { val conf = GlutenConfig.get + //todo: remove isUseCelebornShuffleManager here if (conf.isUseCelebornShuffleManager) { if (conf.celebornShuffleWriterType == ReservedKeys.GLUTEN_SORT_SHUFFLE_WRITER) { if (conf.useCelebornRssSort) { @@ -629,6 +628,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val deserializeTime = metrics("deserializeTime") val readBatchNumRows = metrics("avgReadBatchNumRows") val decompressTime = metrics("decompressTime") + //todo: remove isUseCelebornShuffleManager here if (GlutenConfig.get.isUseCelebornShuffleManager) { val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer") val constructor = diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java new file mode 100644 index 000000000000..b15f0d6dbfc2 --- /dev/null +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.gluten.celeborn; + +public interface CelebornColumnarBatchSerializerFactory { + String backendName(); + + String columnarBatchSerializerClass(); +} diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index c963e4be208b..b053e015baaa 100644 --- a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -19,6 +19,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager; import org.apache.gluten.config.GlutenConfig; import org.apache.gluten.exception.GlutenException; +import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer; import org.apache.gluten.shuffle.SupportsColumnarShuffle; import com.google.common.base.Preconditions; @@ -42,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarShuffle { +public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarShuffle, NeedCustomColumnarBatchSerializer { private static final Logger logger = LoggerFactory.getLogger(CelebornShuffleManager.class); @@ -63,6 +64,8 @@ public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarS private static final CelebornShuffleWriterFactory writerFactory; + private static final CelebornColumnarBatchSerializerFactory columnarBatchSerializerFactory; + static { final ServiceLoader loader = ServiceLoader.load(CelebornShuffleWriterFactory.class); @@ -81,6 +84,23 @@ public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarS "No Celeborn shuffle writer factory found for backend " + backendName); } writerFactory = factoryMap.get(backendName); + + final ServiceLoader celebornColumnarBatchSerializerFactoriesLoader = + ServiceLoader.load(CelebornColumnarBatchSerializerFactory.class); + final List columnarBatchSerializerFactoryList = + Arrays.stream(Iterators.toArray(celebornColumnarBatchSerializerFactoriesLoader.iterator(), CelebornColumnarBatchSerializerFactory.class)) + .collect(Collectors.toList()); + Preconditions.checkState( + !columnarBatchSerializerFactoryList.isEmpty(), "No factory found for Celeborn columnar batch serializer"); + final Map columanrBatchSerilizerFactoryMap = + columnarBatchSerializerFactoryList.stream() + .collect(Collectors.toMap(CelebornColumnarBatchSerializerFactory::backendName, f -> f)); + + if (!columanrBatchSerilizerFactoryMap.containsKey(backendName)) { + throw new UnsupportedOperationException( + "No Celeborn columnar batch serializer writer factory found for backend " + backendName); + } + columnarBatchSerializerFactory = columanrBatchSerilizerFactoryMap.get(backendName); } private final SparkConf conf; @@ -408,4 +428,9 @@ public ShuffleReader getReader( .getReader( handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics); } + + @Override + public String columnarBatchSerializerClass() { + return columnarBatchSerializerFactory.columnarBatchSerializerClass(); + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala b/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala index a5b2719a6d9c..ac13b7e87413 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala @@ -17,3 +17,7 @@ package org.apache.gluten.shuffle trait SupportsColumnarShuffle + +trait NeedCustomColumnarBatchSerializer { + def columnarBatchSerializerClass(): String +} From 8d704bd34376763f3ccdd0174775f78fa70e51b3 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 13 Aug 2025 15:31:17 +0800 Subject: [PATCH 2/6] fix style problem --- .../gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index f8a0993748dd..fc1f06f821ae 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -551,7 +551,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { partitioning: Partitioning, output: Seq[Attribute]): ShuffleWriterType = { val conf = GlutenConfig.get - //todo: remove isUseCelebornShuffleManager here + // todo: remove isUseCelebornShuffleManager here if (conf.isUseCelebornShuffleManager) { if (conf.celebornShuffleWriterType == ReservedKeys.GLUTEN_SORT_SHUFFLE_WRITER) { if (conf.useCelebornRssSort) { @@ -628,7 +628,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val deserializeTime = metrics("deserializeTime") val readBatchNumRows = metrics("avgReadBatchNumRows") val decompressTime = metrics("decompressTime") - //todo: remove isUseCelebornShuffleManager here + // todo: remove isUseCelebornShuffleManager here if (GlutenConfig.get.isUseCelebornShuffleManager) { val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer") val constructor = From 0f116e38c259590f1258436c35c9535fa62add12 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 13 Aug 2025 16:04:46 +0800 Subject: [PATCH 3/6] fix style --- ...CelebornColumnarBatchSerlizerFactory.scala | 4 +++- .../velox/VeloxSparkPlanExecApi.scala | 7 ++++-- .../celeborn/CelebornShuffleManager.java | 22 ++++++++++++------- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala index 51450fbaca74..a8f27d761651 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala @@ -17,10 +17,12 @@ package org.apache.spark.shuffle import org.apache.gluten.backendsapi.velox.VeloxBackend + import org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory class VeloxCelebornColumnarBatchSerlizerFactory extends CelebornColumnarBatchSerializerFactory { override def backendName(): String = VeloxBackend.BACKEND_NAME - override def columnarBatchSerializerClass(): String = "org.apache.spark.shuffle.CelebornColumnarBatchSerializer" + override def columnarBatchSerializerClass(): String = + "org.apache.spark.shuffle.CelebornColumnarBatchSerializer" } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index fc1f06f821ae..2c5d22165034 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -23,8 +23,10 @@ import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension.columnar.FallbackTags +import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.gluten.vectorized.{ColumnarBatchSerializeResult, ColumnarBatchSerializer} +import org.apache.gluten.vectorized.{ColumnarBatchSerializer, ColumnarBatchSerializeResult} + import org.apache.spark.{ShuffleDependency, SparkEnv, SparkException} import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec, PullOutArrowEvalPythonPreProjectHelper} import org.apache.spark.memory.SparkMemoryUtil @@ -53,10 +55,11 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.task.TaskResources + import org.apache.commons.lang3.ClassUtils -import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer import javax.ws.rs.core.UriBuilder + import java.util.Locale class VeloxSparkPlanExecApi extends SparkPlanExecApi { diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index b053e015baaa..4401657f1519 100644 --- a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -43,7 +43,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarShuffle, NeedCustomColumnarBatchSerializer { +public class CelebornShuffleManager + implements ShuffleManager, SupportsColumnarShuffle, NeedCustomColumnarBatchSerializer { private static final Logger logger = LoggerFactory.getLogger(CelebornShuffleManager.class); @@ -85,20 +86,25 @@ public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarS } writerFactory = factoryMap.get(backendName); - final ServiceLoader celebornColumnarBatchSerializerFactoriesLoader = + final ServiceLoader + celebornColumnarBatchSerializerFactoriesLoader = ServiceLoader.load(CelebornColumnarBatchSerializerFactory.class); final List columnarBatchSerializerFactoryList = - Arrays.stream(Iterators.toArray(celebornColumnarBatchSerializerFactoriesLoader.iterator(), CelebornColumnarBatchSerializerFactory.class)) - .collect(Collectors.toList()); + Arrays.stream( + Iterators.toArray( + celebornColumnarBatchSerializerFactoriesLoader.iterator(), + CelebornColumnarBatchSerializerFactory.class)) + .collect(Collectors.toList()); Preconditions.checkState( - !columnarBatchSerializerFactoryList.isEmpty(), "No factory found for Celeborn columnar batch serializer"); + !columnarBatchSerializerFactoryList.isEmpty(), + "No factory found for Celeborn columnar batch serializer"); final Map columanrBatchSerilizerFactoryMap = - columnarBatchSerializerFactoryList.stream() - .collect(Collectors.toMap(CelebornColumnarBatchSerializerFactory::backendName, f -> f)); + columnarBatchSerializerFactoryList.stream() + .collect(Collectors.toMap(CelebornColumnarBatchSerializerFactory::backendName, f -> f)); if (!columanrBatchSerilizerFactoryMap.containsKey(backendName)) { throw new UnsupportedOperationException( - "No Celeborn columnar batch serializer writer factory found for backend " + backendName); + "No Celeborn columnar batch serializer writer factory found for backend " + backendName); } columnarBatchSerializerFactory = columanrBatchSerilizerFactoryMap.get(backendName); } From 2c49e5ebdfe0c57474e3d856e147b8b9c67d5843 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 13 Aug 2025 17:04:16 +0800 Subject: [PATCH 4/6] fix not work --- .../velox/VeloxSparkPlanExecApi.scala | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 2c5d22165034..815ed97dd288 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -631,26 +631,27 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val deserializeTime = metrics("deserializeTime") val readBatchNumRows = metrics("avgReadBatchNumRows") val decompressTime = metrics("decompressTime") - // todo: remove isUseCelebornShuffleManager here - if (GlutenConfig.get.isUseCelebornShuffleManager) { - val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer") - val constructor = - clazz.getConstructor( - classOf[StructType], - classOf[SQLMetric], - classOf[SQLMetric], - classOf[ShuffleWriterType]) - constructor - .newInstance(schema, readBatchNumRows, numOutputRows, shuffleWriterType) - .asInstanceOf[Serializer] - } else { - new ColumnarBatchSerializer( - schema, - readBatchNumRows, - numOutputRows, - deserializeTime, - decompressTime, - shuffleWriterType) + SparkEnv.get.shuffleManager match { + case serializer: NeedCustomColumnarBatchSerializer => + val className = serializer.columnarBatchSerializerClass() + val clazz = ClassUtils.getClass(className) + val constructor = + clazz.getConstructor( + classOf[StructType], + classOf[SQLMetric], + classOf[SQLMetric], + classOf[ShuffleWriterType]) + constructor + .newInstance(schema, readBatchNumRows, numOutputRows, shuffleWriterType) + .asInstanceOf[Serializer] + case _ => + new ColumnarBatchSerializer( + schema, + readBatchNumRows, + numOutputRows, + deserializeTime, + decompressTime, + shuffleWriterType) } } From bb8d9d57b60aac51efbb6d1011fedc4958d2a2b2 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 13 Aug 2025 17:17:11 +0800 Subject: [PATCH 5/6] skip check for ch backend --- .../gluten/celeborn/CelebornShuffleManager.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index 4401657f1519..a04c19d6e2f8 100644 --- a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -95,17 +95,20 @@ public class CelebornShuffleManager celebornColumnarBatchSerializerFactoriesLoader.iterator(), CelebornColumnarBatchSerializerFactory.class)) .collect(Collectors.toList()); - Preconditions.checkState( - !columnarBatchSerializerFactoryList.isEmpty(), - "No factory found for Celeborn columnar batch serializer"); + // for now, we ignore check since CH backend has not support this feature yet. + // Preconditions.checkState( + // !columnarBatchSerializerFactoryList.isEmpty(), + // "No factory found for Celeborn columnar batch serializer"); final Map columanrBatchSerilizerFactoryMap = columnarBatchSerializerFactoryList.stream() .collect(Collectors.toMap(CelebornColumnarBatchSerializerFactory::backendName, f -> f)); - if (!columanrBatchSerilizerFactoryMap.containsKey(backendName)) { - throw new UnsupportedOperationException( - "No Celeborn columnar batch serializer writer factory found for backend " + backendName); - } + // for now, we ignore check since CH backend has not support this feature yet. + // if (!columanrBatchSerilizerFactoryMap.containsKey(backendName)) { + // throw new UnsupportedOperationException( + // "No Celeborn columnar batch serializer writer factory found for backend " + + // backendName); + // } columnarBatchSerializerFactory = columanrBatchSerilizerFactoryMap.get(backendName); } From aa5738076dc6ad288f7f6697a1dc4c2618241385 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 13 Aug 2025 19:59:04 +0800 Subject: [PATCH 6/6] fix spi error --- ...huffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename backends-velox/src-celeborn/main/resources/META-INF/services/{org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarSerializerFactory => org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory} (100%) diff --git a/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarSerializerFactory b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory similarity index 100% rename from backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarSerializerFactory rename to backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory