From 7cd9a123b454b6f3a62413fb0d9bce9d81844fb9 Mon Sep 17 00:00:00 2001 From: yikaifei Date: Tue, 5 Nov 2024 14:12:37 +0800 Subject: [PATCH] Replace attr with different exprId but the same name. --- .../sql/execution/ColumnarBuildSideRelation.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index feaf72f64fb2..d5c848dd9700 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -25,7 +25,7 @@ import org.apache.gluten.utils.ArrowAbiUtil import org.apache.gluten.vectorized.{ColumnarBatchSerializerJniWrapper, NativeColumnarToRowJniWrapper} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.utils.SparkArrowUtil @@ -103,6 +103,18 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra var closed = false + val exprIds = output.map(_.exprId) + val projExpr = key.transformDown { + case attr: AttributeReference if !exprIds.contains(attr.exprId) => + val i = output.count(_.name == attr.name) + if (i != 1) { + throw new IllegalArgumentException(s"Only one attr with the same name is supported: $key") + } else { + output.find(_.name == attr.name).get + } + } + val proj = UnsafeProjection.create(Seq(projExpr), output) + // Convert columnar to Row. val jniWrapper = NativeColumnarToRowJniWrapper.create(runtime) val c2rId = jniWrapper.nativeColumnarToRowInit() @@ -141,7 +153,6 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra ColumnarBatches.getNativeHandle(batch), 0) batch.close() - val proj = UnsafeProjection.create(Seq(key), output) new Iterator[InternalRow] { var rowId = 0