diff --git a/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory new file mode 100644 index 000000000000..eb150c494252 --- /dev/null +++ b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory @@ -0,0 +1 @@ +org.apache.spark.shuffle.VeloxCelebornColumnarBatchSerlizerFactory diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala new file mode 100644 index 000000000000..a8f27d761651 --- /dev/null +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.gluten.backendsapi.velox.VeloxBackend + +import org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory + +class VeloxCelebornColumnarBatchSerlizerFactory extends CelebornColumnarBatchSerializerFactory { + override def backendName(): String = VeloxBackend.BACKEND_NAME + + override def columnarBatchSerializerClass(): String = + "org.apache.spark.shuffle.CelebornColumnarBatchSerializer" +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 12e2c806074a..815ed97dd288 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -23,10 +23,11 @@ import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension.columnar.FallbackTags +import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.vectorized.{ColumnarBatchSerializer, ColumnarBatchSerializeResult} -import org.apache.spark.{ShuffleDependency, SparkException} +import org.apache.spark.{ShuffleDependency, SparkEnv, SparkException} import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec, PullOutArrowEvalPythonPreProjectHelper} import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.rdd.RDD @@ -553,6 +554,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { partitioning: Partitioning, output: Seq[Attribute]): ShuffleWriterType = { val conf = GlutenConfig.get + // todo: remove isUseCelebornShuffleManager here if (conf.isUseCelebornShuffleManager) { if (conf.celebornShuffleWriterType == ReservedKeys.GLUTEN_SORT_SHUFFLE_WRITER) { if (conf.useCelebornRssSort) { @@ -629,25 +631,27 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val deserializeTime = metrics("deserializeTime") val readBatchNumRows = metrics("avgReadBatchNumRows") val decompressTime = metrics("decompressTime") - if (GlutenConfig.get.isUseCelebornShuffleManager) { - val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer") - val constructor = - clazz.getConstructor( - classOf[StructType], - classOf[SQLMetric], - classOf[SQLMetric], - classOf[ShuffleWriterType]) - constructor - .newInstance(schema, readBatchNumRows, numOutputRows, shuffleWriterType) - .asInstanceOf[Serializer] - } else { - new ColumnarBatchSerializer( - schema, - readBatchNumRows, - numOutputRows, - deserializeTime, - decompressTime, - shuffleWriterType) + SparkEnv.get.shuffleManager match { + case serializer: NeedCustomColumnarBatchSerializer => + val className = serializer.columnarBatchSerializerClass() + val clazz = ClassUtils.getClass(className) + val constructor = + clazz.getConstructor( + classOf[StructType], + classOf[SQLMetric], + classOf[SQLMetric], + classOf[ShuffleWriterType]) + constructor + .newInstance(schema, readBatchNumRows, numOutputRows, shuffleWriterType) + .asInstanceOf[Serializer] + case _ => + new ColumnarBatchSerializer( + schema, + readBatchNumRows, + numOutputRows, + deserializeTime, + decompressTime, + shuffleWriterType) } } diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java new file mode 100644 index 000000000000..b15f0d6dbfc2 --- /dev/null +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.gluten.celeborn; + +public interface CelebornColumnarBatchSerializerFactory { + String backendName(); + + String columnarBatchSerializerClass(); +} diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index c963e4be208b..a04c19d6e2f8 100644 --- a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -19,6 +19,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager; import org.apache.gluten.config.GlutenConfig; import org.apache.gluten.exception.GlutenException; +import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer; import org.apache.gluten.shuffle.SupportsColumnarShuffle; import com.google.common.base.Preconditions; @@ -42,7 +43,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarShuffle { +public class CelebornShuffleManager + implements ShuffleManager, SupportsColumnarShuffle, NeedCustomColumnarBatchSerializer { private static final Logger logger = LoggerFactory.getLogger(CelebornShuffleManager.class); @@ -63,6 +65,8 @@ public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarS private static final CelebornShuffleWriterFactory writerFactory; + private static final CelebornColumnarBatchSerializerFactory columnarBatchSerializerFactory; + static { final ServiceLoader loader = ServiceLoader.load(CelebornShuffleWriterFactory.class); @@ -81,6 +85,31 @@ public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarS "No Celeborn shuffle writer factory found for backend " + backendName); } writerFactory = factoryMap.get(backendName); + + final ServiceLoader + celebornColumnarBatchSerializerFactoriesLoader = + ServiceLoader.load(CelebornColumnarBatchSerializerFactory.class); + final List columnarBatchSerializerFactoryList = + Arrays.stream( + Iterators.toArray( + celebornColumnarBatchSerializerFactoriesLoader.iterator(), + CelebornColumnarBatchSerializerFactory.class)) + .collect(Collectors.toList()); + // for now, we ignore check since CH backend has not support this feature yet. + // Preconditions.checkState( + // !columnarBatchSerializerFactoryList.isEmpty(), + // "No factory found for Celeborn columnar batch serializer"); + final Map columanrBatchSerilizerFactoryMap = + columnarBatchSerializerFactoryList.stream() + .collect(Collectors.toMap(CelebornColumnarBatchSerializerFactory::backendName, f -> f)); + + // for now, we ignore check since CH backend has not support this feature yet. + // if (!columanrBatchSerilizerFactoryMap.containsKey(backendName)) { + // throw new UnsupportedOperationException( + // "No Celeborn columnar batch serializer writer factory found for backend " + + // backendName); + // } + columnarBatchSerializerFactory = columanrBatchSerilizerFactoryMap.get(backendName); } private final SparkConf conf; @@ -408,4 +437,9 @@ public ShuffleReader getReader( .getReader( handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics); } + + @Override + public String columnarBatchSerializerClass() { + return columnarBatchSerializerFactory.columnarBatchSerializerClass(); + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala b/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala index a5b2719a6d9c..ac13b7e87413 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala @@ -17,3 +17,7 @@ package org.apache.gluten.shuffle trait SupportsColumnarShuffle + +trait NeedCustomColumnarBatchSerializer { + def columnarBatchSerializerClass(): String +}