diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d6b68a45e77ae..ba2c2759e2daf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -194,7 +194,10 @@ class Analyzer(override val catalogManager: CatalogManager) override protected def isPlanIntegral( previousPlan: LogicalPlan, currentPlan: LogicalPlan): Boolean = { - !Utils.isTesting || LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) + import org.apache.spark.sql.catalyst.util._ + !Utils.isTesting || (LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) && + (!LogicalPlanIntegrity.canGetOutputAttrs(currentPlan) || + !currentPlan.output.exists(_.qualifiedAccessOnly))) } override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts) @@ -984,7 +987,6 @@ class Analyzer(override val catalogManager: CatalogManager) * projecting away metadata columns prematurely. */ object AddMetadataColumns extends Rule[LogicalPlan] { - import org.apache.spark.sql.catalyst.util._ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning( @@ -1039,7 +1041,8 @@ class Analyzer(override val catalogManager: CatalogManager) s.withMetadataColumns() case p: Project if p.metadataOutput.exists(a => requiredAttrIds.contains(a.exprId)) => val newProj = p.copy( - projectList = p.projectList ++ p.metadataOutput, + // Do not leak the qualified-access-only restriction to normal plan outputs. + projectList = p.projectList ++ p.metadataOutput.map(_.markAsAllowAnyAccess()), child = addMetadataCol(p.child, requiredAttrIds)) newProj.copyTagsFrom(p) newProj diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 4a4028dc4c429..5e20f12747b82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -425,6 +425,12 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu // If there is a table specified, use hidden input attributes as well val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly) + // Remove the qualified-access-only restriction immediately. The expanded attributes will be + // put in a logical plan node and becomes normal attributes. They can still keep the special + // attribute metadata to indicate that they are from metadata columns, but they should not + // keep any restrictions that may break column resolution for normal attributes. + // See SPARK-42084 for more details. + .map(_.markAsAllowAnyAccess()) val expandedAttributes = (hiddenOutput ++ input.output).filter( matchedQualifier(_, target.get, resolver)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index ed820c8056163..d18cfea16296f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.catalyst.trees.TreePattern import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY} import org.apache.spark.sql.types._ import org.apache.spark.util.collection.BitSet import org.apache.spark.util.collection.ImmutableBitSet @@ -190,10 +190,7 @@ case class Alias(child: Expression, name: String)( override def toAttribute: Attribute = { if (resolved) { - val a = AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier) - // Alias has its own qualifier. It doesn't make sense to still restrict the hidden columns - // of natural/using join to be accessed by qualified name only. - if (a.qualifiedAccessOnly) a.markAsAllowAnyAccess() else a + AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier) } else { UnresolvedAttribute.quoted(name) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 44813ac7b614e..74f0875c28539 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -337,7 +337,12 @@ package object expressions { } def name = UnresolvedAttribute(nameParts).name - prunedCandidates match { + // We may have resolved the attributes from metadata columns. The resolved attributes will be + // put in a logical plan node and becomes normal attributes. They can still keep the special + // attribute metadata to indicate that they are from metadata columns, but they should not + // keep any restrictions that may break column resolution for normal attributes. + // See SPARK-42084 for more details. + prunedCandidates.map(_.markAsAllowAnyAccess()) match { case Seq(a) if nestedFields.nonEmpty => // One match, but we also need to extract the requested nested field. // The foldLeft adds ExtractValues for every remaining parts of the identifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 7640d9234c71f..d3df6f0dd989b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -211,7 +211,7 @@ abstract class OrderPreservingUnaryNode extends UnaryNode { object LogicalPlanIntegrity { - private def canGetOutputAttrs(p: LogicalPlan): Boolean = { + def canGetOutputAttrs(p: LogicalPlan): Boolean = { p.resolved && !p.expressions.exists { e => e.exists { // We cannot call `output` in plans with a `ScalarSubquery` expr having no column, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 586e344df5ee6..343fa3517c650 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1664,15 +1664,7 @@ case class SubqueryAlias( override def output: Seq[Attribute] = { val qualifierList = identifier.qualifier :+ alias - child.output.map { attr => - // `SubqueryAlias` sets a new qualifier for its output columns. It doesn't make sense to still - // restrict the hidden columns of natural/using join to be accessed by qualified name only. - if (attr.qualifiedAccessOnly) { - attr.markAsAllowAnyAccess().withQualifier(qualifierList) - } else { - attr.withQualifier(qualifierList) - } - } + child.output.map(_.withQualifier(qualifierList)) } override def metadataOutput: Seq[Attribute] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 1fcd3f7662b50..6466afac619a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -213,11 +213,17 @@ package object util extends Logging { .build() ) - def markAsAllowAnyAccess(): Attribute = attr.withMetadata( - new MetadataBuilder() - .withMetadata(attr.metadata) - .remove(QUALIFIED_ACCESS_ONLY) - .build() - ) + def markAsAllowAnyAccess(): Attribute = { + if (qualifiedAccessOnly) { + attr.withMetadata( + new MetadataBuilder() + .withMetadata(attr.metadata) + .remove(QUALIFIED_ACCESS_ONLY) + .build() + ) + } else { + attr + } + } } }