diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/IteratorApiImpl.scala index 59d2e88c736e..eef89f70466c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/IteratorApiImpl.scala @@ -22,15 +22,13 @@ import org.apache.gluten.execution._ import org.apache.gluten.metrics.IMetrics import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode -import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo} -import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, RawSplitInfo, SplitInfo} import org.apache.gluten.utils._ import org.apache.gluten.vectorized._ import org.apache.spark.{SparkConf, SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.softaffinity.SoftAffinity import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.connector.read.InputPartition @@ -44,6 +42,7 @@ import org.apache.spark.util.ExecutorManager import java.lang.{Long => JLong} import java.nio.charset.StandardCharsets import java.time.ZoneOffset +import java.util import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} import java.util.concurrent.TimeUnit @@ -54,23 +53,11 @@ class IteratorApiImpl extends IteratorApi with Logging { override def genSplitInfo( partition: InputPartition, partitionSchema: StructType, - fileFormat: ReadFileFormat, + fileFormat: LocalFilesNode.ReadFileFormat, metadataColumnNames: Seq[String]): SplitInfo = { partition match { case f: FilePartition => - val (paths, starts, lengths, partitionColumns, metadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames) - val preferredLocations = - SoftAffinity.getFilePartitionLocations(f) - LocalFilesBuilder.makeLocalFiles( - f.index, - paths, - starts, - lengths, - partitionColumns, - metadataColumns, - fileFormat, - preferredLocations.toList.asJava) + new RawSplitInfo(f, partitionSchema, fileFormat, metadataColumnNames.asJava) case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") } @@ -86,15 +73,38 @@ class IteratorApiImpl extends IteratorApi with Logging { splitInfos.zipWithIndex.map { case (splitInfos, index) => - GlutenPartition( + GlutenRawPartition( index, planByteArray, - splitInfos.map(_.asInstanceOf[LocalFilesNode].toProtobuf.toByteArray).toArray, + splitInfos.map(_.asInstanceOf[RawSplitInfo]), splitInfos.flatMap(_.preferredLocations().asScala).toArray ) } } + override def toLocalFilesNodeByteArray(p: GlutenRawPartition): Array[Array[Byte]] = { + p.splitInfos.map { + splitInfo => + val (paths, starts, lengths, partitionColumns, metadataColumns) = + constructSplitInfo( + splitInfo.getPartitionSchema, + splitInfo.getFilePartition.files, + splitInfo.getMetadataColumns.asScala) + LocalFilesBuilder + .makeLocalFiles( + splitInfo.getFilePartition.index, + paths, + starts, + lengths, + partitionColumns, + metadataColumns, + splitInfo.getReadFileFormat, + new util.ArrayList[String]()) + .toProtobuf + .toByteArray + }.toArray + } + private def constructSplitInfo( schema: StructType, files: Array[PartitionedFile], @@ -158,7 +168,7 @@ class IteratorApiImpl extends IteratorApi with Logging { partitionIndex: Int, inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = { assert( - inputPartition.isInstanceOf[GlutenPartition], + inputPartition.isInstanceOf[GlutenRawPartition], "Velox backend only accept GlutenPartition.") val beforeBuild = System.nanoTime() @@ -168,9 +178,8 @@ class IteratorApiImpl extends IteratorApi with Logging { }.asJava) val transKernel = NativePlanEvaluator.create() - val splitInfoByteArray = inputPartition - .asInstanceOf[GlutenPartition] - .splitInfosByteArray + val splitInfoByteArray = toLocalFilesNodeByteArray( + inputPartition.asInstanceOf[GlutenRawPartition]) val resIter: GeneralOutIterator = transKernel.createKernelWithBatchIterator( inputPartition.plan, diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java new file mode 100644 index 000000000000..746940b4149e --- /dev/null +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.rel; + +import org.apache.gluten.exception.GlutenException; + +import com.google.protobuf.MessageOrBuilder; +import org.apache.spark.sql.execution.datasources.FilePartition; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.List; + +public class RawSplitInfo implements SplitInfo { + private FilePartition filePartition; + private StructType partitionSchema; + private LocalFilesNode.ReadFileFormat readFileFormat; + + private List metadataColumns; + + public RawSplitInfo( + FilePartition filePartition, + StructType partitionSchema, + LocalFilesNode.ReadFileFormat readFileFormat, + List metadataColumns) { + this.filePartition = filePartition; + this.partitionSchema = partitionSchema; + this.readFileFormat = readFileFormat; + this.metadataColumns = metadataColumns; + } + + public FilePartition getFilePartition() { + return filePartition; + } + + public List getMetadataColumns() { + return metadataColumns; + } + + public StructType getPartitionSchema() { + return partitionSchema; + } + + public LocalFilesNode.ReadFileFormat getReadFileFormat() { + return readFileFormat; + } + + @Override + public List preferredLocations() { + return Arrays.asList(filePartition.preferredLocations()); + } + + @Override + public MessageOrBuilder toProtobuf() { + throw new GlutenException("RawSpiltInfo.toProtobuf should not be used"); + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index d999948d7047..e6400a7d20d2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -17,7 +17,7 @@ package org.apache.gluten.backendsapi import org.apache.gluten.GlutenNumaBindingInfo -import org.apache.gluten.execution.{BaseGlutenPartition, BasicScanExecTransformer, WholeStageTransformContext} +import org.apache.gluten.execution.{BaseGlutenPartition, BasicScanExecTransformer, GlutenRawPartition, WholeStageTransformContext} import org.apache.gluten.metrics.IMetrics import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat @@ -91,4 +91,6 @@ trait IteratorApi { numOutputRows: SQLMetric, numOutputBatches: SQLMetric, scanTime: SQLMetric): RDD[ColumnarBatch] + + def toLocalFilesNodeByteArray(p: GlutenRawPartition): Array[Array[Byte]] } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index da59f8ec9157..da126b4982b2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -19,6 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics} +import org.apache.gluten.substrait.rel.RawSplitInfo import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.rdd.RDD @@ -48,6 +49,15 @@ case class GlutenPartition( override def preferredLocations(): Array[String] = locations } +case class GlutenRawPartition( + index: Int, + plan: Array[Byte], + splitInfos: Seq[RawSplitInfo], + locations: Array[String] = Array.empty[String]) + extends BaseGlutenPartition { + override def preferredLocations(): Array[String] = locations +} + case class GlutenFilePartition(index: Int, files: Array[PartitionedFile], plan: Array[Byte]) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = {