Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ class Analyzer(override val catalogManager: CatalogManager)
override protected def isPlanIntegral(
previousPlan: LogicalPlan,
currentPlan: LogicalPlan): Boolean = {
!Utils.isTesting || LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan)
import org.apache.spark.sql.catalyst.util._
!Utils.isTesting || (LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) &&
(!LogicalPlanIntegrity.canGetOutputAttrs(currentPlan) ||
!currentPlan.output.exists(_.qualifiedAccessOnly)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sure that only metadata column resolution will apply the qualified-access-only restriction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be straightforward to read/understand what it means. Maybe put your comment in the code.

}

override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts)
Expand Down Expand Up @@ -984,7 +987,6 @@ class Analyzer(override val catalogManager: CatalogManager)
* projecting away metadata columns prematurely.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {

import org.apache.spark.sql.catalyst.util._

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning(
Expand Down Expand Up @@ -1039,7 +1041,8 @@ class Analyzer(override val catalogManager: CatalogManager)
s.withMetadataColumns()
case p: Project if p.metadataOutput.exists(a => requiredAttrIds.contains(a.exprId)) =>
val newProj = p.copy(
projectList = p.projectList ++ p.metadataOutput,
// Do not leak the qualified-access-only restriction to normal plan outputs.
projectList = p.projectList ++ p.metadataOutput.map(_.markAsAllowAnyAccess()),
child = addMetadataCol(p.child, requiredAttrIds))
newProj.copyTagsFrom(p)
newProj
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu

// If there is a table specified, use hidden input attributes as well
val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly)
// Remove the qualified-access-only restriction immediately. The expanded attributes will be
// put in a logical plan node and becomes normal attributes. They can still keep the special
// attribute metadata to indicate that they are from metadata columns, but they should not
// keep any restrictions that may break column resolution for normal attributes.
// See SPARK-42084 for more details.
.map(_.markAsAllowAnyAccess())
val expandedAttributes = (hiddenOutput ++ input.output).filter(
matchedQualifier(_, target.get, resolver))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.trees.TreePattern
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.collection.ImmutableBitSet
Expand Down Expand Up @@ -190,10 +190,7 @@ case class Alias(child: Expression, name: String)(

override def toAttribute: Attribute = {
if (resolved) {
val a = AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier)
// Alias has its own qualifier. It doesn't make sense to still restrict the hidden columns
// of natural/using join to be accessed by qualified name only.
if (a.qualifiedAccessOnly) a.markAsAllowAnyAccess() else a
AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier)
} else {
UnresolvedAttribute.quoted(name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,12 @@ package object expressions {
}

def name = UnresolvedAttribute(nameParts).name
prunedCandidates match {
// We may have resolved the attributes from metadata columns. The resolved attributes will be
// put in a logical plan node and becomes normal attributes. They can still keep the special
// attribute metadata to indicate that they are from metadata columns, but they should not
// keep any restrictions that may break column resolution for normal attributes.
// See SPARK-42084 for more details.
prunedCandidates.map(_.markAsAllowAnyAccess()) match {
case Seq(a) if nestedFields.nonEmpty =>
// One match, but we also need to extract the requested nested field.
// The foldLeft adds ExtractValues for every remaining parts of the identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ abstract class OrderPreservingUnaryNode extends UnaryNode {

object LogicalPlanIntegrity {

private def canGetOutputAttrs(p: LogicalPlan): Boolean = {
def canGetOutputAttrs(p: LogicalPlan): Boolean = {
p.resolved && !p.expressions.exists { e =>
e.exists {
// We cannot call `output` in plans with a `ScalarSubquery` expr having no column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,15 +1664,7 @@ case class SubqueryAlias(

override def output: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
child.output.map { attr =>
// `SubqueryAlias` sets a new qualifier for its output columns. It doesn't make sense to still
// restrict the hidden columns of natural/using join to be accessed by qualified name only.
if (attr.qualifiedAccessOnly) {
attr.markAsAllowAnyAccess().withQualifier(qualifierList)
} else {
attr.withQualifier(qualifierList)
}
}
child.output.map(_.withQualifier(qualifierList))
}

override def metadataOutput: Seq[Attribute] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,17 @@ package object util extends Logging {
.build()
)

def markAsAllowAnyAccess(): Attribute = attr.withMetadata(
new MetadataBuilder()
.withMetadata(attr.metadata)
.remove(QUALIFIED_ACCESS_ONLY)
.build()
)
def markAsAllowAnyAccess(): Attribute = {
if (qualifiedAccessOnly) {
attr.withMetadata(
new MetadataBuilder()
.withMetadata(attr.metadata)
.remove(QUALIFIED_ACCESS_ONLY)
.build()
)
} else {
attr
}
}
}
}