Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
dad709c
Support LeafRunnableCommand as sub query
beliefer May 12, 2021
0338a23
Support LeafRunnableCommand as sub query
beliefer May 12, 2021
40cea6b
Support LeafRunnableCommand as sub query
beliefer May 12, 2021
a82ed76
Unify the behavior eagerly execute the commands
beliefer May 14, 2021
bf296fb
Update code
beliefer May 17, 2021
babe0d0
Merge branch 'master' into SPARK-35378
beliefer May 17, 2021
803a12a
Update code
beliefer May 17, 2021
4c9b3cf
Merge branch 'SPARK-35378' of github.com:beliefer/spark into SPARK-35378
beliefer May 17, 2021
e3b8454
Update code
beliefer May 17, 2021
6516cc4
Update code
beliefer May 17, 2021
ddbb5bb
Optimize code
beliefer May 18, 2021
bde1062
Optimize code
beliefer May 18, 2021
7a7bc55
Update code
beliefer May 18, 2021
2ad5391
Update code
beliefer May 19, 2021
33f0297
Optimize code
beliefer May 19, 2021
309fb0f
Update code
beliefer May 20, 2021
8e9277d
Update code
beliefer May 20, 2021
d006b2a
Update code
beliefer May 20, 2021
fc3afe3
Update code
beliefer May 20, 2021
fde3c31
Update code
beliefer May 21, 2021
b58c0ea
Update code
beliefer May 21, 2021
4dcc759
Update code
beliefer May 21, 2021
f47dda6
Update code
beliefer May 22, 2021
2a61f23
Update code
beliefer May 25, 2021
cd7d39c
Update code
beliefer May 25, 2021
6c74474
Merge branch 'master' into SPARK-35378
beliefer May 25, 2021
26c2341
Update code
beliefer May 25, 2021
d11b00d
Update code
beliefer May 26, 2021
a61c267
Update code
beliefer May 26, 2021
8fe6c06
Update code
beliefer May 26, 2021
b20b000
Update code
beliefer May 26, 2021
b60e04e
Update code
beliefer May 26, 2021
94ba930
Update code
beliefer May 26, 2021
0ed9485
Update code
beliefer May 26, 2021
911e081
Update code
beliefer May 26, 2021
ecfe9ba
update code
beliefer May 26, 2021
cfeadd1
Update code
beliefer May 27, 2021
6a80674
Update code
beliefer May 29, 2021
c81b082
Update code
beliefer May 29, 2021
ee1e84a
Update code
beliefer May 29, 2021
9c95570
Update code
beliefer May 31, 2021
219abb4
Merge branch 'SPARK-35378' of github.com:beliefer/spark into SPARK-35378
beliefer May 31, 2021
f39f920
Update code
beliefer May 31, 2021
1d10b61
Update code
beliefer May 31, 2021
2bcdddd
Update code
beliefer May 31, 2021
5d9f7ee
Update code
beliefer Jun 1, 2021
6011bbe
Update code
beliefer Jun 2, 2021
0905d84
Update code
beliefer Jun 2, 2021
3f6cb85
Update code
beliefer Jun 2, 2021
1d821e0
Update code
beliefer Jun 4, 2021
ddbc5c4
Update code
beliefer Jun 4, 2021
d545d9b
Update code
beliefer Jun 4, 2021
4b730d4
Update code
beliefer Jun 4, 2021
de55034
Update code
beliefer Jun 4, 2021
1a3ce51
Update code
beliefer Jun 5, 2021
ccf8ba3
Update code
beliefer Jun 7, 2021
35ea747
Merge branch 'master' into SPARK-35378
beliefer Jun 7, 2021
2b2caf7
Update code
beliefer Jun 7, 2021
1db52b9
Update code
beliefer Jun 7, 2021
83d2710
Update QueryExecution.scala
cloud-fan Jun 8, 2021
8054799
Update code
beliefer Jun 8, 2021
d15e166
Update code
beliefer Jun 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ license: |

- In Spark 3.2, `FloatType` is mapped to `FLOAT` in MySQL. Prior to this, it used to be mapped to `REAL`, which is by default a synonym to `DOUBLE PRECISION` in MySQL.

- In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`, `create`, `append`, `overwrite`, `overwritePartitions`, `replace`.

## Upgrading from Spark SQL 3.0 to 3.1

- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.
Expand Down
23 changes: 11 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2._
Expand Down Expand Up @@ -311,13 +310,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
checkPartitioningMatchesV2Table(table)
if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
runCommand(df.sparkSession) {
AppendData.byName(relation, df.logicalPlan, finalOptions)
}
} else {
// Truncate the table. TableCapabilityCheck will throw a nice exception if this
// isn't supported
runCommand(df.sparkSession, "save") {
runCommand(df.sparkSession) {
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), finalOptions)
}
Expand All @@ -332,7 +331,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _)

runCommand(df.sparkSession, "save") {
runCommand(df.sparkSession) {
CreateTableAsSelect(
catalog,
ident,
Expand Down Expand Up @@ -379,7 +378,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val optionsWithPath = getOptionsWithPath(path)

// Code path for data source v1.
runCommand(df.sparkSession, "save") {
runCommand(df.sparkSession) {
DataSource(
sparkSession = df.sparkSession,
className = source,
Expand Down Expand Up @@ -475,13 +474,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
}

runCommand(df.sparkSession, "insertInto") {
runCommand(df.sparkSession) {
command
}
}

private def insertInto(tableIdent: TableIdentifier): Unit = {
runCommand(df.sparkSession, "insertInto") {
runCommand(df.sparkSession) {
InsertIntoStatement(
table = UnresolvedRelation(tableIdent),
partitionSpec = Map.empty[String, Option[String]],
Expand Down Expand Up @@ -631,7 +630,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
external = false)
}

runCommand(df.sparkSession, "saveAsTable") {
runCommand(df.sparkSession) {
command
}
}
Expand Down Expand Up @@ -698,7 +697,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec)

runCommand(df.sparkSession, "saveAsTable")(
runCommand(df.sparkSession)(
CreateTable(tableDesc, mode, Some(df.logicalPlan)))
}

Expand Down Expand Up @@ -856,10 +855,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
*/
private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = {
private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
// call `QueryExecution.toRDD` to trigger the execution of commands.
SQLExecution.withNewExecutionId(qe, Some(name))(qe.toRdd)
// call `QueryExecution.commandExecuted` to trigger the execution of commands.
qe.commandExecuted
}

private def lookupV2Provider(): Option[TableProvider] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}

override def create(): Unit = {
runCommand("create") {
runCommand(
CreateTableAsSelectStatement(
tableName,
logicalPlan,
Expand All @@ -121,8 +121,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
options.toMap,
None,
ifNotExists = false,
external = false)
}
external = false))
}

override def replace(): Unit = {
Expand All @@ -146,7 +145,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
@throws(classOf[NoSuchTableException])
def append(): Unit = {
val append = AppendData.byName(UnresolvedRelation(tableName), logicalPlan, options.toMap)
runCommand("append")(append)
runCommand(append)
}

/**
Expand All @@ -163,7 +162,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
def overwrite(condition: Column): Unit = {
val overwrite = OverwriteByExpression.byName(
UnresolvedRelation(tableName), logicalPlan, condition.expr, options.toMap)
runCommand("overwrite")(overwrite)
runCommand(overwrite)
}

/**
Expand All @@ -183,21 +182,21 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
def overwritePartitions(): Unit = {
val dynamicOverwrite = OverwritePartitionsDynamic.byName(
UnresolvedRelation(tableName), logicalPlan, options.toMap)
runCommand("overwritePartitions")(dynamicOverwrite)
runCommand(dynamicOverwrite)
}

/**
* Wrap an action to track the QueryExecution and time cost, then report to the user-registered
* callback functions.
*/
private def runCommand(name: String)(command: LogicalPlan): Unit = {
private def runCommand(command: LogicalPlan): Unit = {
val qe = sparkSession.sessionState.executePlan(command)
// call `QueryExecution.toRDD` to trigger the execution of commands.
SQLExecution.withNewExecutionId(qe, Some(name))(qe.toRdd)
SQLExecution.withNewExecutionId(qe, Some("command"))(qe.toRdd)
}

private def internalReplace(orCreate: Boolean): Unit = {
runCommand("replace") {
runCommand(
ReplaceTableAsSelectStatement(
tableName,
logicalPlan,
Expand All @@ -210,8 +209,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
None,
options.toMap,
None,
orCreate = orCreate)
}
orCreate = orCreate))
}
}

Expand Down
11 changes: 1 addition & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,7 @@ class Dataset[T] private[sql](
}

@transient private[sql] val logicalPlan: LogicalPlan = {
// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
val plan = queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
case u @ Union(children, _, _) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
case _ =>
queryExecution.analyzed
}
val plan = queryExecution.commandExecuted
if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long])
dsIds.add(id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* Physical plan node for holding data from a command.
*
* `commandPhysicalPlan` is just used to display the plan tree for EXPLAIN.
* `rows` may not be serializable and ideally we should not send `rows` to the executors.
* Thus marking them as transient.
*/
case class CommandResultExec(
output: Seq[Attribute],
Copy link
Member

@yaooqinn yaooqinn May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override def output: Seq[Attribute] = commandPhysicalPlan.output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we obtain the output in QueryExecution first, we not need to define it duplicate.

@transient commandPhysicalPlan: SparkPlan,
@transient rows: Seq[InternalRow]) extends LeafExecNode with InputRDDCodegen {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override def innerChildren: Seq[QueryPlan[_]] = Seq(commandPhysicalPlan)

@transient private lazy val unsafeRows: Array[InternalRow] = {
if (rows.isEmpty) {
Array.empty
} else {
val proj = UnsafeProjection.create(output, output)
rows.map(r => proj(r).copy()).toArray
}
}

@transient private lazy val rdd: RDD[InternalRow] = {
if (rows.isEmpty) {
sqlContext.sparkContext.emptyRDD
} else {
val numSlices = math.min(
unsafeRows.length, sqlContext.sparkSession.leafNodeDefaultParallelism)
sqlContext.sparkContext.parallelize(unsafeRows, numSlices)
}
}

override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
rdd.map { r =>
numOutputRows += 1
r
}
}

override protected def stringArgs: Iterator[Any] = {
if (unsafeRows.isEmpty) {
Iterator("<empty>", output)
} else {
Iterator(output)
}
}

override def executeCollect(): Array[InternalRow] = {
longMetric("numOutputRows").add(rows.size)
rows.toArray
}

override def executeTake(limit: Int): Array[InternalRow] = {
val taken = unsafeRows.take(limit)
longMetric("numOutputRows").add(taken.size)
taken
}

override def executeTail(limit: Int): Array[InternalRow] = {
val taken: Seq[InternalRow] = unsafeRows.takeRight(limit)
longMetric("numOutputRows").add(taken.size)
taken.toArray
}

// Input is already UnsafeRows.
override protected val createUnsafeProjection: Boolean = false

override def inputRDD: RDD[InternalRow] = rdd
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,42 @@ object HiveResult {
TimeFormatters(dateFormatter, timestampFormatter)
}

private def stripRootCommandResult(executedPlan: SparkPlan): SparkPlan = executedPlan match {
case CommandResultExec(_, plan, _) => plan
case other => other
}

/**
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
*/
def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
formatDescribeTableOutput(executedPlan.executeCollectPublic())
case _: DescribeTableExec =>
formatDescribeTableOutput(executedPlan.executeCollectPublic())
// SHOW TABLES in Hive only output table names while our v1 command outputs
// database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
// SHOW TABLES in Hive only output table names while our v2 command outputs
// namespace and table name.
case command : ShowTablesExec =>
command.executeCollect().map(_.getString(1))
// SHOW VIEWS in Hive only outputs view names while our v1 command outputs
// namespace, viewName, and isTemporary.
case command @ ExecutedCommandExec(_: ShowViewsCommand) =>
command.executeCollect().map(_.getString(1))
case other =>
val timeFormatters = getTimeFormatters
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = executedPlan.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(e => toHiveString(e, false, timeFormatters)))
.map(_.mkString("\t"))
}
def hiveResultString(executedPlan: SparkPlan): Seq[String] =
stripRootCommandResult(executedPlan) match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
formatDescribeTableOutput(executedPlan.executeCollectPublic())
case _: DescribeTableExec =>
formatDescribeTableOutput(executedPlan.executeCollectPublic())
// SHOW TABLES in Hive only output table names while our v1 command outputs
// database, table name, isTemp.
case ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
executedPlan.executeCollect().map(_.getString(1))
// SHOW TABLES in Hive only output table names while our v2 command outputs
// namespace and table name.
case _ : ShowTablesExec =>
executedPlan.executeCollect().map(_.getString(1))
// SHOW VIEWS in Hive only outputs view names while our v1 command outputs
// namespace, viewName, and isTemporary.
case ExecutedCommandExec(_: ShowViewsCommand) =>
executedPlan.executeCollect().map(_.getString(1))
case other =>
val timeFormatters = getTimeFormatters
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = executedPlan.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(e => toHiveString(e, false, timeFormatters)))
.map(_.mkString("\t"))
}

private def formatDescribeTableOutput(rows: Array[Row]): Seq[String] = {
rows.map {
Expand Down
Loading