Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,32 @@ trait CheckAnalysis extends PredicateHelper {
|${s.catalogTable.identifier}
""".stripMargin)

// TODO: We need to consolidate this kind of checks for InsertIntoTable
// with the rule of PreWriteCheck defined in extendedCheckRules.
case InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) =>
failAnalysis(
s"""
|Hive support is required to insert into the following tables:
|${s.catalogTable.identifier}
""".stripMargin)

case InsertIntoTable(t, _, _, _, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we move these checks from PreWriteCheck to here?

if !t.isInstanceOf[LeafNode] ||
t == OneRowRelation ||
t.isInstanceOf[LocalRelation] =>
failAnalysis(s"Inserting into an RDD-based table is not allowed.")

case i @ InsertIntoTable(table, partitions, query, _, _) =>
val numStaticPartitions = partitions.values.count(_.isDefined)
if (table.output.size != (query.output.size + numStaticPartitions)) {
failAnalysis(
s"$table requires that the data to be inserted have the same number of " +
s"columns as the target table: target table has ${table.output.size} " +
s"column(s) but the inserted data has " +
s"${query.output.size + numStaticPartitions} column(s), including " +
s"$numStaticPartitions partition column(s) having constant value(s).")
}

case o if !o.resolved =>
failAnalysis(
s"unresolved operator ${operator.simpleString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS
Expand All @@ -43,8 +43,127 @@ import org.apache.spark.unsafe.types.UTF8String
* Replaces generic operations with specific variants that are designed to work with Spark
* SQL Data Sources.
*/
private[sql] object DataSourceAnalysis extends Rule[LogicalPlan] {
private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {

def resolver: Resolver = {
if (conf.caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}

// The access modifier is used to expose this method to tests.
private[sql] def convertStaticPartitions(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new rule to use dynamic partition insert to evaluate insert statements having static partitions when the target table is a HadoopFsRelation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this rule only applied to HadoopFsRelation? It would be fine to change Hive to write using this rule as well and we would need fewer relation-specific rules.

This isn't a huge issue, but I'm concerned about the proliferation of fixes for either Hive or data sources that are never applied to the other. We should be consolidating the implementation wherever possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's part of this: https://issues.apache.org/jira/browse/SPARK-15691

Let me know if you want to help out.

sourceAttributes: Seq[Attribute],
providedPartitions: Map[String, Option[String]],
targetAttributes: Seq[Attribute],
targetPartitionSchema: StructType): Seq[NamedExpression] = {

assert(providedPartitions.exists(_._2.isDefined))

val staticPartitions = providedPartitions.flatMap {
case (partKey, Some(partValue)) => (partKey, partValue) :: Nil
case (_, None) => Nil
}

// The sum of the number of static partition columns and columns provided in the SELECT
// clause needs to match the number of columns of the target table.
if (staticPartitions.size + sourceAttributes.size != targetAttributes.size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we already have this check somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in PreprocessTableInsertion

throw new AnalysisException(
s"The data to be inserted needs to have the same number of " +
s"columns as the target table: target table has ${targetAttributes.size} " +
s"column(s) but the inserted data has ${sourceAttributes.size + staticPartitions.size} " +
s"column(s), which contain ${staticPartitions.size} partition column(s) having " +
s"assigned constant values.")
}

if (providedPartitions.size != targetPartitionSchema.fields.size) {
throw new AnalysisException(
s"The data to be inserted needs to have the same number of " +
s"partition columns as the target table: target table " +
s"has ${targetPartitionSchema.fields.size} partition column(s) but the inserted " +
s"data has ${providedPartitions.size} partition columns specified.")
}

staticPartitions.foreach {
case (partKey, partValue) =>
if (!targetPartitionSchema.fields.exists(field => resolver(field.name, partKey))) {
throw new AnalysisException(
s"$partKey is not a partition column. Partition columns are " +
s"${targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")}")
}
}

val partitionList = targetPartitionSchema.fields.map { field =>
val potentialSpecs = staticPartitions.filter {
case (partKey, partValue) => resolver(field.name, partKey)
}
if (potentialSpecs.size == 0) {
None
} else if (potentialSpecs.size == 1) {
val partValue = potentialSpecs.head._2
Some(Alias(Cast(Literal(partValue), field.dataType), "_staticPart")())
} else {
throw new AnalysisException(
s"Partition column ${field.name} have multiple values specified, " +
s"${potentialSpecs.mkString("[", ", ", "]")}. Please only specify a single value.")
}
}

// We first drop all leading static partitions using dropWhile and check if there is
// any static partition appear after dynamic partitions.
partitionList.dropWhile(_.isDefined).collectFirst {
case Some(_) =>
throw new AnalysisException(
s"The ordering of partition columns is " +
s"${targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")}. " +
"All partition columns having constant values need to appear before other " +
"partition columns that do not have an assigned constant value.")
}

assert(partitionList.take(staticPartitions.size).forall(_.isDefined))
val projectList =
sourceAttributes.take(targetAttributes.size - targetPartitionSchema.fields.size) ++
partitionList.take(staticPartitions.size).map(_.get) ++
sourceAttributes.takeRight(targetPartitionSchema.fields.size - staticPartitions.size)

projectList
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
// to include those constant column values in the query result.
//
// Example:
// Let's say that we have a table "t", which is created by
// CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)
// The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1, 3"
// will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 1, 2, 3".
//
// Basically, we will put those partition columns having a assigned value back
// to the SELECT clause. The output of the SELECT clause is organized as
// normal_columns static_partitioning_columns dynamic_partitioning_columns.
// static_partitioning_columns are partitioning columns having assigned
// values in the PARTITION clause (e.g. b in the above example).
// dynamic_partitioning_columns are partitioning columns that do not assigned
// values in the PARTITION clause (e.g. c in the above example).
case insert @ logical.InsertIntoTable(
relation @ LogicalRelation(t: HadoopFsRelation, _, _), parts, query, overwrite, false)
if query.resolved && parts.exists(_._2.isDefined) =>

val projectList = convertStaticPartitions(
sourceAttributes = query.output,
providedPartitions = parts,
targetAttributes = relation.output,
targetPartitionSchema = t.partitionSchema)

// We will remove all assigned values to static partitions because they have been
// moved to the projectList.
insert.copy(partition = parts.map(p => (p._1, None)), child = Project(projectList, query))


case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false)
if query.resolved && t.schema.asNullable == query.schema.asNullable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,6 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")

case logical.InsertIntoTable(t, _, _, _, _) =>
if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
} else {
// OK
}

case c: CreateTableUsingAsSelect =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
override val extendedResolutionRules =
PreprocessTableInsertion(conf) ::
new FindDataSourceTable(sparkSession) ::
DataSourceAnalysis ::
DataSourceAnalysis(conf) ::
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)

override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
Expand Down
Loading