From 4012fd9e086d35813b6d9f218dada22b23d3661b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 21 Dec 2022 15:29:32 +0800 Subject: [PATCH] SPARK-41660: only propagate metadata columns if they are used --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 +++++++++------ .../sql/connector/MetadataColumnSuite.scala | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e959e7208a42f..c21ff7bd90f74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -978,7 +978,7 @@ class Analyzer(override val catalogManager: CatalogManager) if (metaCols.isEmpty) { node } else { - val newNode = addMetadataCol(node) + val newNode = addMetadataCol(node, attr => metaCols.exists(_.exprId == attr.exprId)) // We should not change the output schema of the plan. We should project away the extra // metadata columns if necessary. if (newNode.sameOutput(node)) { @@ -1012,15 +1012,18 @@ class Analyzer(override val catalogManager: CatalogManager) }) } - private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match { - case s: ExposesMetadataColumns => s.withMetadataColumns() - case p: Project => + private def addMetadataCol( + plan: LogicalPlan, + isRequired: Attribute => Boolean): LogicalPlan = plan match { + case s: ExposesMetadataColumns if s.metadataOutput.exists(isRequired) => + s.withMetadataColumns() + case p: Project if p.metadataOutput.exists(isRequired) => val newProj = p.copy( projectList = p.projectList ++ p.metadataOutput, - child = addMetadataCol(p.child)) + child = addMetadataCol(p.child, isRequired)) newProj.copyTagsFrom(p) newProj - case _ => plan.withNewChildren(plan.children.map(addMetadataCol)) + case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_, isRequired))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala index 8454b9f85ecdd..9abf0fd59e6b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.struct class MetadataColumnSuite extends DatasourceV2SQLBase { @@ -232,4 +233,20 @@ class MetadataColumnSuite extends DatasourceV2SQLBase { ) } } + + test("SPARK-41660: only propagate metadata columns if they are used") { + withTable(tbl) { + prepareTable() + val df = sql(s"SELECT t2.id FROM $tbl t1 JOIN $tbl t2 USING (id)") + val scans = df.logicalPlan.collect { + case d: DataSourceV2Relation => d + } + assert(scans.length == 2) + scans.foreach { scan => + // The query only access join hidden columns, and scan nodes should not expose its metadata + // columns. + assert(scan.output.map(_.name) == Seq("id", "data")) + } + } + } }