diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index d8355e1c419f..b6aa2ea4d580 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.execution._ import org.apache.gluten.metrics.IMetrics import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode -import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo} +import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, RawSplitInfo, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.utils._ import org.apache.gluten.utils.iterator.Iterators @@ -30,7 +30,6 @@ import org.apache.gluten.vectorized._ import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.softaffinity.SoftAffinity import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.connector.read.InputPartition @@ -58,30 +57,12 @@ class VeloxIteratorApi extends IteratorApi with Logging { properties: Map[String, String]): SplitInfo = { partition match { case f: FilePartition => - val ( - paths, - starts, - lengths, - fileSizes, - modificationTimes, - partitionColumns, - metadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames) - val preferredLocations = - SoftAffinity.getFilePartitionLocations(f) - LocalFilesBuilder.makeLocalFiles( - f.index, - paths, - starts, - lengths, - fileSizes, - modificationTimes, - partitionColumns, - metadataColumns, + new RawSplitInfo( + f, + partitionSchema, fileFormat, - preferredLocations.toList.asJava, - mapAsJavaMap(properties) - ) + metadataColumnNames.asJava, + properties.asJava) case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") } @@ -97,15 +78,49 @@ class VeloxIteratorApi extends IteratorApi with Logging { splitInfos.zipWithIndex.map { case (splitInfos, index) => - GlutenPartition( + GlutenRawPartition( index, planByteArray, - splitInfos.map(_.asInstanceOf[LocalFilesNode].toProtobuf.toByteArray).toArray, - splitInfos.flatMap(_.preferredLocations().asScala).toArray + splitInfos ) } } + private def toSplitInfoByteArray(splitInfos: Seq[SplitInfo]): Array[Array[Byte]] = { + splitInfos.map { + case rawSplitInfo: RawSplitInfo => + val ( + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns) = + constructSplitInfo( + rawSplitInfo.getPartitionSchema, + rawSplitInfo.getFilePartition.files, + rawSplitInfo.getMetadataColumn.asScala) + LocalFilesBuilder + .makeLocalFiles( + rawSplitInfo.getFilePartition.index, + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns, + rawSplitInfo.getFileFormat, + new JArrayList[String](), + rawSplitInfo.getProperties + ) + .toProtobuf + .toByteArray + case localFilesNode: LocalFilesNode => localFilesNode.toProtobuf.toByteArray + }.toArray + } + private def constructSplitInfo( schema: StructType, files: Array[PartitionedFile], @@ -179,8 +194,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionIndex: Int, inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = { assert( - inputPartition.isInstanceOf[GlutenPartition], - "Velox backend only accept GlutenPartition.") + inputPartition.isInstanceOf[GlutenRawPartition], + "Velox backend only accept GlutenRawPartition.") val columnarNativeIterators = new JArrayList[GeneralInIterator](inputIterators.map { @@ -188,9 +203,10 @@ class VeloxIteratorApi extends IteratorApi with Logging { }.asJava) val transKernel = NativePlanEvaluator.create() - val splitInfoByteArray = inputPartition - .asInstanceOf[GlutenPartition] - .splitInfosByteArray + val splitInfoByteArray = toSplitInfoByteArray( + inputPartition + .asInstanceOf[GlutenRawPartition] + .splitInfos) val resIter: GeneralOutIterator = transKernel.createKernelWithBatchIterator( inputPartition.plan, diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java new file mode 100644 index 000000000000..962723d12d9a --- /dev/null +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.rel; + +import org.apache.gluten.exception.GlutenException; + +import com.google.protobuf.MessageOrBuilder; +import org.apache.spark.softaffinity.SoftAffinity; +import org.apache.spark.sql.execution.datasources.FilePartition; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public final class RawSplitInfo implements SplitInfo { + + private final FilePartition filePartition; + private final StructType partitionSchema; + private final LocalFilesNode.ReadFileFormat fileFormat; + private final List metadataColumn; + private final Map properties; + + public RawSplitInfo( + FilePartition filePartition, + StructType partitionSchema, + LocalFilesNode.ReadFileFormat fileFormat, + List metadataColumn, + Map properties) { + this.filePartition = filePartition; + this.partitionSchema = partitionSchema; + this.fileFormat = fileFormat; + this.metadataColumn = metadataColumn; + this.properties = properties; + } + + public FilePartition getFilePartition() { + return filePartition; + } + + public StructType getPartitionSchema() { + return partitionSchema; + } + + public LocalFilesNode.ReadFileFormat getFileFormat() { + return fileFormat; + } + + public List getMetadataColumn() { + return metadataColumn; + } + + public Map getProperties() { + return properties; + } + + @Override + public List preferredLocations() { + return Arrays.asList(SoftAffinity.getFilePartitionLocations(filePartition)); + } + + @Override + public MessageOrBuilder toProtobuf() { + throw new GlutenException("RawSpiltInfo.toProtobuf should not be called"); + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index 25293e918327..e401a83d5085 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -19,6 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics} +import org.apache.gluten.substrait.rel.SplitInfo import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.rdd.RDD @@ -45,6 +46,19 @@ case class GlutenPartition( override def preferredLocations(): Array[String] = locations } +case class GlutenRawPartition( + index: Int, + plan: Array[Byte], + splitInfos: Seq[SplitInfo], + files: Array[String] = + Array.empty[String] // touched files, for implementing UDF input_file_names +) extends BaseGlutenPartition { + + import scala.collection.JavaConverters._ + override def preferredLocations(): Array[String] = + splitInfos.flatMap(_.preferredLocations().asScala).toArray +} + case class FirstZippedPartitionsPartition( index: Int, inputPartition: InputPartition,