From f0698475ce81aad0de0f5ac49b58e206ce544bee Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Thu, 11 Jul 2024 11:51:15 +0800 Subject: [PATCH 1/3] stage --- .../backendsapi/velox/VeloxIteratorApi.scala | 77 +++++++++++-------- .../gluten/substrait/rel/RawSplitInfo.java | 72 +++++++++++++++++ .../GlutenWholeStageColumnarRDD.scala | 14 ++++ 3 files changed, 129 insertions(+), 34 deletions(-) create mode 100644 gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index d8355e1c419f..20a37a9079b2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.execution._ import org.apache.gluten.metrics.IMetrics import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode -import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo} +import org.apache.gluten.substrait.rel.{LocalFilesBuilder, RawSplitInfo, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.utils._ import org.apache.gluten.utils.iterator.Iterators @@ -30,7 +30,6 @@ import org.apache.gluten.vectorized._ import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.softaffinity.SoftAffinity import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.connector.read.InputPartition @@ -58,30 +57,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { properties: Map[String, String]): SplitInfo = { partition match { case f: FilePartition => - val ( - paths, - starts, - lengths, - fileSizes, - modificationTimes, - partitionColumns, - metadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames) - val preferredLocations = - SoftAffinity.getFilePartitionLocations(f) - LocalFilesBuilder.makeLocalFiles( - f.index, - paths, - starts, - lengths, - fileSizes, - modificationTimes, - partitionColumns, - metadataColumns, - fileFormat, - preferredLocations.toList.asJava, - mapAsJavaMap(properties) - ) + new RawSplitInfo(f, partitionSchema, fileFormat, metadataColumnNames.asJava) case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") } @@ -97,15 +73,47 @@ class VeloxIteratorApi extends IteratorApi with Logging { splitInfos.zipWithIndex.map { case (splitInfos, index) => - GlutenPartition( + GlutenRawPartition( index, planByteArray, - splitInfos.map(_.asInstanceOf[LocalFilesNode].toProtobuf.toByteArray).toArray, - splitInfos.flatMap(_.preferredLocations().asScala).toArray + splitInfos.map(_.asInstanceOf[RawSplitInfo]) ) } } + private def toSplitInfoByteArray(splitInfos: Seq[RawSplitInfo]): Array[Array[Byte]] = { + splitInfos.map { + splitInfo => + val ( + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns) = + constructSplitInfo( + splitInfo.getPartitionSchema, + splitInfo.getFilePartition.files, + splitInfo.getMetadataColumn.asScala) + LocalFilesBuilder + .makeLocalFiles( + splitInfo.getFilePartition.index, + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns, + splitInfo.getFileFormat, + new JArrayList[String]() + ) + .toProtobuf + .toByteArray + }.toArray + } + private def constructSplitInfo( schema: StructType, files: Array[PartitionedFile], @@ -179,8 +187,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionIndex: Int, inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = { assert( - inputPartition.isInstanceOf[GlutenPartition], - "Velox backend only accept GlutenPartition.") + inputPartition.isInstanceOf[GlutenRawPartition], + "Velox backend only accept GlutenRawPartition.") val columnarNativeIterators = new JArrayList[GeneralInIterator](inputIterators.map { @@ -188,9 +196,10 @@ class VeloxIteratorApi extends IteratorApi with Logging { }.asJava) val transKernel = NativePlanEvaluator.create() - val splitInfoByteArray = inputPartition - .asInstanceOf[GlutenPartition] - .splitInfosByteArray + val splitInfoByteArray = toSplitInfoByteArray( + inputPartition + .asInstanceOf[GlutenRawPartition] + .splitInfos) val resIter: GeneralOutIterator = transKernel.createKernelWithBatchIterator( inputPartition.plan, diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java new file mode 100644 index 000000000000..38f85a8e8328 --- /dev/null +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.rel; + +import org.apache.gluten.exception.GlutenException; + +import com.google.protobuf.MessageOrBuilder; +import org.apache.spark.softaffinity.SoftAffinity; +import org.apache.spark.sql.execution.datasources.FilePartition; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.List; + +public final class RawSplitInfo implements SplitInfo { + + private final FilePartition filePartition; + private final StructType partitionSchema; + private final LocalFilesNode.ReadFileFormat fileFormat; + private final List metadataColumn; + + public RawSplitInfo( + FilePartition filePartition, + StructType partitionSchema, + LocalFilesNode.ReadFileFormat fileFormat, + List metadataColumn) { + this.filePartition = filePartition; + this.partitionSchema = partitionSchema; + this.fileFormat = fileFormat; + this.metadataColumn = metadataColumn; + } + + public FilePartition getFilePartition() { + return filePartition; + } + + public StructType getPartitionSchema() { + return partitionSchema; + } + + public LocalFilesNode.ReadFileFormat getFileFormat() { + return fileFormat; + } + + public List getMetadataColumn() { + return metadataColumn; + } + + @Override + public List preferredLocations() { + return Arrays.asList(SoftAffinity.getFilePartitionLocations(filePartition)); + } + + @Override + public MessageOrBuilder toProtobuf() { + throw new GlutenException("RawSpiltInfo.toProtobuf should not be called"); + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index 25293e918327..f675f6360a55 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -19,6 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics} +import org.apache.gluten.substrait.rel.RawSplitInfo import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.rdd.RDD @@ -45,6 +46,19 @@ case class GlutenPartition( override def preferredLocations(): Array[String] = locations } +case class GlutenRawPartition( + index: Int, + plan: Array[Byte], + splitInfos: Seq[RawSplitInfo], + files: Array[String] = + Array.empty[String] // touched files, for implementing UDF input_file_names +) extends BaseGlutenPartition { + + import scala.collection.JavaConverters._ + override def preferredLocations(): Array[String] = + splitInfos.flatMap(_.preferredLocations().asScala).toArray +} + case class FirstZippedPartitionsPartition( index: Int, inputPartition: InputPartition, From 551582dfc883faf40b34b5d26f563fee5177d98f Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Wed, 24 Jul 2024 14:05:14 +0800 Subject: [PATCH 2/3] support raw split info --- .../gluten/backendsapi/velox/VeloxIteratorApi.scala | 10 ++++++++-- .../org/apache/gluten/substrait/rel/RawSplitInfo.java | 10 +++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 20a37a9079b2..bf1ed0f87901 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -57,7 +57,12 @@ class VeloxIteratorApi extends IteratorApi with Logging { properties: Map[String, String]): SplitInfo = { partition match { case f: FilePartition => - new RawSplitInfo(f, partitionSchema, fileFormat, metadataColumnNames.asJava) + new RawSplitInfo( + f, + partitionSchema, + fileFormat, + metadataColumnNames.asJava, + properties.asJava) case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") } @@ -107,7 +112,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionColumns, metadataColumns, splitInfo.getFileFormat, - new JArrayList[String]() + new JArrayList[String](), + splitInfo.getProperties ) .toProtobuf .toByteArray diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java index 38f85a8e8328..962723d12d9a 100644 --- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/RawSplitInfo.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; public final class RawSplitInfo implements SplitInfo { @@ -32,16 +33,19 @@ public final class RawSplitInfo implements SplitInfo { private final StructType partitionSchema; private final LocalFilesNode.ReadFileFormat fileFormat; private final List metadataColumn; + private final Map properties; public RawSplitInfo( FilePartition filePartition, StructType partitionSchema, LocalFilesNode.ReadFileFormat fileFormat, - List metadataColumn) { + List metadataColumn, + Map properties) { this.filePartition = filePartition; this.partitionSchema = partitionSchema; this.fileFormat = fileFormat; this.metadataColumn = metadataColumn; + this.properties = properties; } public FilePartition getFilePartition() { @@ -60,6 +64,10 @@ public List getMetadataColumn() { return metadataColumn; } + public Map getProperties() { + return properties; + } + @Override public List preferredLocations() { return Arrays.asList(SoftAffinity.getFilePartitionLocations(filePartition)); From 6772c26098babc47667420a1a6605e5848f91764 Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Thu, 25 Jul 2024 10:34:07 +0800 Subject: [PATCH 3/3] fix iceberg issue. --- .../backendsapi/velox/VeloxIteratorApi.scala | 21 ++++++++++--------- .../GlutenWholeStageColumnarRDD.scala | 4 ++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index bf1ed0f87901..b6aa2ea4d580 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.execution._ import org.apache.gluten.metrics.IMetrics import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode -import org.apache.gluten.substrait.rel.{LocalFilesBuilder, RawSplitInfo, SplitInfo} +import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, RawSplitInfo, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.utils._ import org.apache.gluten.utils.iterator.Iterators @@ -81,14 +81,14 @@ class VeloxIteratorApi extends IteratorApi with Logging { GlutenRawPartition( index, planByteArray, - splitInfos.map(_.asInstanceOf[RawSplitInfo]) + splitInfos ) } } - private def toSplitInfoByteArray(splitInfos: Seq[RawSplitInfo]): Array[Array[Byte]] = { + private def toSplitInfoByteArray(splitInfos: Seq[SplitInfo]): Array[Array[Byte]] = { splitInfos.map { - splitInfo => + case rawSplitInfo: RawSplitInfo => val ( paths, starts, @@ -98,12 +98,12 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionColumns, metadataColumns) = constructSplitInfo( - splitInfo.getPartitionSchema, - splitInfo.getFilePartition.files, - splitInfo.getMetadataColumn.asScala) + rawSplitInfo.getPartitionSchema, + rawSplitInfo.getFilePartition.files, + rawSplitInfo.getMetadataColumn.asScala) LocalFilesBuilder .makeLocalFiles( - splitInfo.getFilePartition.index, + rawSplitInfo.getFilePartition.index, paths, starts, lengths, @@ -111,12 +111,13 @@ class VeloxIteratorApi extends IteratorApi with Logging { modificationTimes, partitionColumns, metadataColumns, - splitInfo.getFileFormat, + rawSplitInfo.getFileFormat, new JArrayList[String](), - splitInfo.getProperties + rawSplitInfo.getProperties ) .toProtobuf .toByteArray + case localFilesNode: LocalFilesNode => localFilesNode.toProtobuf.toByteArray }.toArray } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index f675f6360a55..e401a83d5085 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -19,7 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics} -import org.apache.gluten.substrait.rel.RawSplitInfo +import org.apache.gluten.substrait.rel.SplitInfo import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.rdd.RDD @@ -49,7 +49,7 @@ case class GlutenPartition( case class GlutenRawPartition( index: Int, plan: Array[Byte], - splitInfos: Seq[RawSplitInfo], + splitInfos: Seq[SplitInfo], files: Array[String] = Array.empty[String] // touched files, for implementing UDF input_file_names ) extends BaseGlutenPartition {