From 3b0a28b9626b2fac65688cf3a2702a96131e4307 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 8 Aug 2016 13:10:38 +0200 Subject: [PATCH 1/5] Improve inline table processing. --- .../sql/catalyst/optimizer/Optimizer.scala | 18 ++++-- .../sql/catalyst/parser/AstBuilder.scala | 55 +++++++++---------- .../ConvertToLocalRelationSuite.scala | 23 +++++++- .../sql/catalyst/parser/PlanParserSuite.scala | 29 ++++++---- 4 files changed, 78 insertions(+), 47 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 75130007b963a..843575832fd9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -22,7 +22,7 @@ import scala.collection.immutable.HashSet import scala.collection.mutable.ArrayBuffer import org.apache.spark.api.java.function.FilterFunction -import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} +import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow, SimpleCatalystConf} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ @@ -1563,17 +1563,23 @@ object DecimalAggregates extends Rule[LogicalPlan] { } /** - * Converts local operations (i.e. ones that don't require data exchange) on LocalRelation to - * another LocalRelation. - * - * This is relatively simple as it currently handles only a single case: Project. + * Converts local operations (i.e. ones that don't require data exchange) on LocalRelation or + * OneRowRelation to a new LocalRelation. */ object ConvertToLocalRelation extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case Project(projectList, LocalRelation(output, data)) if !projectList.exists(hasUnevaluableExpr) => val projection = new InterpretedProjection(projectList, output) LocalRelation(projectList.map(_.toAttribute), data.map(projection)) + case Project(projectList, OneRowRelation) if !projectList.exists(hasUnevaluableExpr) => + val row = InternalRow.fromSeq(projectList.map(_.eval())) + LocalRelation(projectList.map(_.toAttribute), Seq(row)) + case u @ Union(children) if children.forall(_.isInstanceOf[LocalRelation]) => + val data = children.flatMap { + case LocalRelation(_, rows) => rows + } + LocalRelation(u.output, data) } private def hasUnevaluableExpr(expr: Expression): Boolean = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 679adf2717b51..5e1789cd3f3f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -656,40 +656,37 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * Create an inline table (a virtual table in Hive parlance). */ override def visitInlineTable(ctx: InlineTableContext): LogicalPlan = withOrigin(ctx) { - // Get the backing expressions. - val expressions = ctx.expression.asScala.map { eCtx => - val e = expression(eCtx) - assert(e.foldable, "All expressions in an inline table must be constants.", eCtx) - e - } - - // Validate and evaluate the rows. - val (structType, structConstructor) = expressions.head.dataType match { - case st: StructType => - (st, (e: Expression) => e) - case dt => - val st = CreateStruct(Seq(expressions.head)).dataType - (st, (e: Expression) => CreateStruct(Seq(e))) - } - val rows = expressions.map { - case expression => - val safe = Cast(structConstructor(expression), structType) - safe.eval().asInstanceOf[InternalRow] + // Create expressions. + val rows = ctx.expression.asScala.map { e => + expression(e) match { + case CreateStruct(children) => children + case child => Seq(child) + } } - // Construct attributes. - val baseAttributes = structType.toAttributes.map(_.withNullability(true)) - val attributes = if (ctx.identifierList != null) { - val aliases = visitIdentifierList(ctx.identifierList) - assert(aliases.size == baseAttributes.size, - "Number of aliases must match the number of fields in an inline table.", ctx) - baseAttributes.zip(aliases).map(p => p._1.withName(p._2)) + // Resolve aliases. + val numExpectedColumns = rows.head.size + val aliases = if (ctx.identifierList != null) { + val names = visitIdentifierList(ctx.identifierList) + assert(names.size == numExpectedColumns, + s"Number of aliases '${names.size}' must match the number of fields " + + s"'$numExpectedColumns' in an inline table", ctx) + names } else { - baseAttributes + Seq.tabulate(numExpectedColumns)(i => s"col${i + 1}") } - // Create plan and add an alias if a name has been defined. - LocalRelation(attributes, rows).optionalMap(ctx.identifier)(aliasPlan) + // Create the UNION. + val union = Union(rows.zipWithIndex.map { case (expressions, index) => + assert(expressions.size == numExpectedColumns, + s"Number of values '${expressions.size}' in row '${index + 1}' does not match the " + + s"expected number of values '$numExpectedColumns' in a row", ctx) + val namedExpressions = expressions.zip(aliases).map { + case (expression, name) => Alias(expression, name)() + } + Project(namedExpressions, OneRowRelation) + }) + union.optionalMap(ctx.identifier)(aliasPlan) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala index 049a19b86f7cd..354f914cf6642 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala @@ -21,8 +21,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, OneRowRelation, Union} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -52,4 +53,24 @@ class ConvertToLocalRelationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Project on OneRowRelation should be turned into a single LocalRelation") { + val testRelation = OneRowRelation.select(Literal(1).as("a")) + val correctAnswer = LocalRelation( + LocalRelation('a.int.withNullability(false)).output, + InternalRow(1) :: Nil) + val optimized = Optimize.execute(testRelation.analyze) + comparePlans(optimized, correctAnswer) + } + + test("Union of LocalRelations should be turned into a single LocalRelation") { + val testRelation = Union( + OneRowRelation.select(Literal(1).as("a")) :: + OneRowRelation.select(Literal(2).as("a")) :: + OneRowRelation.select(Literal(3).as("a")) :: Nil) + val correctAnswer = LocalRelation( + LocalRelation('a.int.withNullability(false)).output, + InternalRow(1) :: InternalRow(2) :: InternalRow(3) :: Nil) + val optimized = Optimize.execute(testRelation.analyze) + comparePlans(optimized, correctAnswer) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index fbe236e196268..36294eda25208 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedGenerator} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -424,19 +424,26 @@ class PlanParserSuite extends PlanTest { } test("inline table") { - assertEqual("values 1, 2, 3, 4", LocalRelation.fromExternalRows( - Seq('col1.int), - Seq(1, 2, 3, 4).map(x => Row(x)))) + def project(values: Seq[Any], names: Seq[String]): Project = { + val expressions = names match { + case Seq() => + values.zipWithIndex.map(x => Alias(Literal(x._1), s"col${x._2 + 1}")()) + case aliases if aliases.size == values.size => + values.zip(aliases).map(x => Alias(Literal(x._1), x._2)()) + } + Project(expressions, OneRowRelation) + } + + assertEqual( + "values 1, 2, 3, 4", + Union(Seq(1, 2, 3, 4).map(x => project(Seq(x), Seq.empty)))) assertEqual( "values (1, 'a'), (2, 'b'), (3, 'c') as tbl(a, b)", - LocalRelation.fromExternalRows( - Seq('a.int, 'b.string), - Seq((1, "a"), (2, "b"), (3, "c")).map(x => Row(x._1, x._2))).as("tbl")) - intercept("values (a, 'a'), (b, 'b')", - "All expressions in an inline table must be constants.") + Union(Seq(Seq(1, "a"), Seq(2, "b"), Seq(3, "c")).map(project(_, Seq("a", "b")))).as("tbl")) intercept("values (1, 'a'), (2, 'b') as tbl(a, b, c)", - "Number of aliases must match the number of fields in an inline table.") - intercept[ArrayIndexOutOfBoundsException](parsePlan("values (1, 'a'), (2, 'b', 5Y)")) + "Number of aliases", "must match the number of fields", "in an inline table") + intercept("values (1, 'a'), (2, 'b', 5Y)", + "Number of values", "in row", "does not match the expected number of values", "in a row") } test("simple select query with !> and !<") { From 3f3aa9334de6c82ace7358cb3d4775ccec0dacc9 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 8 Aug 2016 15:04:21 +0200 Subject: [PATCH 2/5] Use project instead of direct expression evaluation. --- .../sql/catalyst/optimizer/Optimizer.scala | 4 +-- .../sql/hive/execution/HiveExplainSuite.scala | 27 ++++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 843575832fd9c..2e599265e00bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1573,8 +1573,8 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { val projection = new InterpretedProjection(projectList, output) LocalRelation(projectList.map(_.toAttribute), data.map(projection)) case Project(projectList, OneRowRelation) if !projectList.exists(hasUnevaluableExpr) => - val row = InternalRow.fromSeq(projectList.map(_.eval())) - LocalRelation(projectList.map(_.toAttribute), Seq(row)) + val projection = new InterpretedProjection(projectList, Seq.empty) + LocalRelation(projectList.map(_.toAttribute), Seq(projection(EmptyRow))) case u @ Union(children) if children.forall(_.isInstanceOf[LocalRelation]) => val data = children.flatMap { case LocalRelation(_, rows) => rows diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index 98afd99a203ac..c4e6493105253 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -104,20 +104,23 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } test("EXPLAIN CODEGEN command") { - checkKeywordsExist(sql("EXPLAIN CODEGEN SELECT 1"), - "WholeStageCodegen", - "Generated code:", - "/* 001 */ public Object generate(Object[] references) {", - "/* 002 */ return new GeneratedIterator(references);", - "/* 003 */ }" - ) + withTempView("x") { + spark.range(0, 1).createOrReplaceTempView("x") + checkKeywordsExist(sql("EXPLAIN CODEGEN SELECT 1 FROM X"), + "WholeStageCodegen", + "Generated code:", + "/* 001 */ public Object generate(Object[] references) {", + "/* 002 */ return new GeneratedIterator(references);", + "/* 003 */ }" + ) - checkKeywordsNotExist(sql("EXPLAIN CODEGEN SELECT 1"), - "== Physical Plan ==" - ) + checkKeywordsNotExist(sql("EXPLAIN CODEGEN SELECT 1 FROM X"), + "== Physical Plan ==" + ) - intercept[ParseException] { - sql("EXPLAIN EXTENDED CODEGEN SELECT 1") + intercept[ParseException] { + sql("EXPLAIN EXTENDED CODEGEN SELECT 1 FROM X") + } } } } From 9a827ce1453a58caa898d1c1a565cec4fedb6327 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 8 Aug 2016 17:56:08 +0200 Subject: [PATCH 3/5] Fix tests using Union with foldable children. --- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 4 ++-- .../apache/spark/sql/execution/SparkPlannerSuite.scala | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 62cfd24041b3d..5db1d89e7d0bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1192,8 +1192,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-10740: handle nondeterministic expressions correctly for set operations") { - val df1 = (1 to 20).map(Tuple1.apply).toDF("i") - val df2 = (1 to 10).map(Tuple1.apply).toDF("i") + val df1 = spark.range(1, 20).select('id.cast("int").as("i")) + val df2 = spark.range(1, 10).select('id.cast("int").as("i")) // When generating expected results at here, we need to follow the implementation of // Rand expression. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala index aecfd3062147c..b03f54a275390 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, ReturnAnswer, Union} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.test.SharedSQLContext class SparkPlannerSuite extends SharedSQLContext { @@ -40,9 +40,9 @@ class SparkPlannerSuite extends SharedSQLContext { case Union(children) => planned += 1 UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil - case LocalRelation(output, data) => + case r: Range => planned += 1 - LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil + RangeExec(r) :: planLater(NeverPlanned) :: Nil case NeverPlanned => fail("QueryPlanner should not go down to this branch.") case _ => Nil @@ -52,9 +52,9 @@ class SparkPlannerSuite extends SharedSQLContext { try { spark.experimental.extraStrategies = TestStrategy :: Nil - val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS()) + val ds = spark.range(1, 3).union(spark.range(3, 6)) - assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f")) + assert(ds.collect().toSeq === Seq(1, 2, 3, 4, 5)) assert(planned === 4) } finally { spark.experimental.extraStrategies = Nil From 392eb0ab82dba29c53dea098f767a7960d6a9b6d Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 8 Aug 2016 23:12:50 +0200 Subject: [PATCH 4/5] Add fixed one row relation --- .../plans/logical/basicLogicalOperators.scala | 22 +++++++++++-------- .../expressions/ExpressionEvalHelper.scala | 4 ++-- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index eb612c4c12c75..718b728397afb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -756,16 +756,20 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) /** * A relation with one row. This is used in "SELECT ..." without a from clause. */ -case object OneRowRelation extends LeafNode { +abstract class AbstractOneRowRelation extends LeafNode { override def maxRows: Option[Long] = Some(1) override def output: Seq[Attribute] = Nil - - /** - * Computes [[Statistics]] for this plan. The default implementation assumes the output - * cardinality is the product of of all child plan's cardinality, i.e. applies in the case - * of cartesian joins. - * - * [[LeafNode]]s must override this. - */ override lazy val statistics: Statistics = Statistics(sizeInBytes = 1) } + +/** + * A relation with one row. This relation might be eliminated during optimization in favor of a + * LocalRelation. + */ +case object OneRowRelation extends AbstractOneRowRelation + +/** + * A one row relation that should not be rewritten (during optimization). This is only for + * testing purposes. + */ +case object FixedOneRowRelation extends AbstractOneRowRelation diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index d6a9672d1f186..48e1f9f454596 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer -import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} +import org.apache.spark.sql.catalyst.plans.logical.{FixedOneRowRelation, Project} import org.apache.spark.sql.catalyst.util.MapData import org.apache.spark.sql.types.DataType import org.apache.spark.util.Utils @@ -165,7 +165,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { - val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) + val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, FixedOneRowRelation) val optimizedPlan = SimpleTestOptimizer.execute(plan) checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow) } From daa01a2753d4b4af4623a3c50961b651ed96cd4c Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 9 Aug 2016 15:57:05 +0200 Subject: [PATCH 5/5] Create an InlineTable LogicalPlan --- .../sql/catalyst/analysis/Analyzer.scala | 4 ++ .../sql/catalyst/analysis/CheckAnalysis.scala | 20 +++++++ .../sql/catalyst/analysis/TypeCoercion.scala | 36 ++++++++----- .../sql/catalyst/optimizer/Optimizer.scala | 12 ++--- .../sql/catalyst/parser/AstBuilder.scala | 9 ++-- .../plans/logical/basicLogicalOperators.scala | 54 +++++++++++++++---- .../expressions/ExpressionEvalHelper.scala | 4 +- .../ConvertToLocalRelationSuite.scala | 21 +++----- .../sql/catalyst/parser/PlanParserSuite.scala | 22 ++++---- .../org/apache/spark/sql/DataFrameSuite.scala | 4 +- .../sql/execution/SparkPlannerSuite.scala | 10 ++-- .../sql/hive/execution/HiveExplainSuite.scala | 27 +++++----- 12 files changed, 136 insertions(+), 87 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 660f523698e7f..11a7dd2d4ca3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2102,6 +2102,10 @@ object CleanupAliases extends Rule[LogicalPlan] { Window(cleanedWindowExprs, partitionSpec.map(trimAliases), orderSpec.map(trimAliases(_).asInstanceOf[SortOrder]), child) + case InlineTable(rows) => + val cleanedRows = rows.map(_.map(trimNonTopLevelAliases(_).asInstanceOf[NamedExpression])) + InlineTable(cleanedRows) + // Operators that operate on objects should only have expressions from encoders, which should // never have extra aliases. case o: ObjectConsumer => o diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 41b7e62d8ccea..cd390eebe7a77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -298,6 +298,26 @@ trait CheckAnalysis extends PredicateHelper { } } + case InlineTable(rows) if rows.length > 1 => + val expectedDataTypes = rows.head.map(_.dataType) + rows.zipWithIndex.tail.foreach { case (row, ri) => + // Check the number of columns. + if (row.length != expectedDataTypes.length) { + failAnalysis( + s"An inline table must have the same number of columns on every row. " + + s"Row '${ri + 1}' has '${row.length}' columns while " + + s"'${expectedDataTypes.length}' columns were expected.") + } + // Check the data + row.map(_.dataType).zip(expectedDataTypes).zipWithIndex.collect { + case ((dt1, dt2), ci) if dt1 != dt2 => + failAnalysis( + s"Data type '$dt1' of column '${rows.head(ci).name}' at row '${ri + 1}' " + + s"does not match the expected data type '$dt2' for that column. " + + s"Expressions for an inline table's column must have the same data type.") + } + } + case _ => // Fallbacks to the following checks } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 021952e7166f9..8ce44dc0c39b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -243,6 +243,14 @@ object TypeCoercion { s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children) s.makeCopy(Array(newChildren)) + + case s @ InlineTable(rows) if !s.resolved && s.expressionsResolved && s.validDimensions => + val targetTypes = getWidestTypes(rows.map(_.map(_.dataType))) + if (targetTypes.nonEmpty) { + s.copy(rows = rows.map(widenTypes(_, targetTypes))) + } else { + s + } } /** Build new children with the widest types for each attribute among all the children */ @@ -251,12 +259,11 @@ object TypeCoercion { // Get a sequence of data types, each of which is the widest type of this specific attribute // in all the children - val targetTypes: Seq[DataType] = - getWidestTypes(children, attrIndex = 0, mutable.Queue[DataType]()) + val targetTypes: Seq[DataType] = getWidestTypes(children.map(_.output.map(_.dataType))) if (targetTypes.nonEmpty) { // Add an extra Project if the targetTypes are different from the original types. - children.map(widenTypes(_, targetTypes)) + children.map(child => Project(widenTypes(child.output, targetTypes), child)) } else { // Unable to find a target type to widen, then just return the original set. children @@ -265,30 +272,31 @@ object TypeCoercion { /** Get the widest type for each attribute in all the children */ @tailrec private def getWidestTypes( - children: Seq[LogicalPlan], - attrIndex: Int, - castedTypes: mutable.Queue[DataType]): Seq[DataType] = { + dataTypes: Seq[Seq[DataType]], + attrIndex: Int = 0, + castedTypes: mutable.Queue[DataType] = mutable.Queue.empty): Seq[DataType] = { // Return the result after the widen data types have been found for all the children - if (attrIndex >= children.head.output.length) return castedTypes.toSeq + if (attrIndex >= dataTypes.head.length) return castedTypes // For the attrIndex-th attribute, find the widest type - findWiderCommonType(children.map(_.output(attrIndex).dataType)) match { + findWiderCommonType(dataTypes.map(_(attrIndex))) match { // If unable to find an appropriate widen type for this column, return an empty Seq case None => Seq.empty[DataType] // Otherwise, record the result in the queue and find the type for the next column case Some(widenType) => castedTypes.enqueue(widenType) - getWidestTypes(children, attrIndex + 1, castedTypes) + getWidestTypes(dataTypes, attrIndex + 1, castedTypes) } } - /** Given a plan, add an extra project on top to widen some columns' data types. */ - private def widenTypes(plan: LogicalPlan, targetTypes: Seq[DataType]): LogicalPlan = { - val casted = plan.output.zip(targetTypes).map { - case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() + /** Cast the expressions to the given dataTypes (if we need to). */ + private def widenTypes( + expressions: Seq[NamedExpression], + targetTypes: Seq[DataType]): Seq[NamedExpression] = { + expressions.zip(targetTypes).map { + case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.toString)() case (e, _) => e } - Project(casted, plan) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2e599265e00bc..04f304376a76d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1572,14 +1572,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { if !projectList.exists(hasUnevaluableExpr) => val projection = new InterpretedProjection(projectList, output) LocalRelation(projectList.map(_.toAttribute), data.map(projection)) - case Project(projectList, OneRowRelation) if !projectList.exists(hasUnevaluableExpr) => - val projection = new InterpretedProjection(projectList, Seq.empty) - LocalRelation(projectList.map(_.toAttribute), Seq(projection(EmptyRow))) - case u @ Union(children) if children.forall(_.isInstanceOf[LocalRelation]) => - val data = children.flatMap { - case LocalRelation(_, rows) => rows + case table: InlineTable => + val data = table.rows.map { row => + val projection = new InterpretedProjection(row) + projection(EmptyRow) } - LocalRelation(u.output, data) + LocalRelation(table.output, data) } private def hasUnevaluableExpr(expr: Expression): Boolean = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5e1789cd3f3f2..a99f08b092bbd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -676,17 +676,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { Seq.tabulate(numExpectedColumns)(i => s"col${i + 1}") } - // Create the UNION. - val union = Union(rows.zipWithIndex.map { case (expressions, index) => + // Create the inline table. + val table = InlineTable(rows.zipWithIndex.map { case (expressions, index) => assert(expressions.size == numExpectedColumns, s"Number of values '${expressions.size}' in row '${index + 1}' does not match the " + s"expected number of values '$numExpectedColumns' in a row", ctx) - val namedExpressions = expressions.zip(aliases).map { + expressions.zip(aliases).map { case (expression, name) => Alias(expression, name)() } - Project(namedExpressions, OneRowRelation) }) - union.optionalMap(ctx.identifier)(aliasPlan) + table.optionalMap(ctx.identifier)(aliasPlan) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 718b728397afb..68443baf582db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -756,20 +756,54 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) /** * A relation with one row. This is used in "SELECT ..." without a from clause. */ -abstract class AbstractOneRowRelation extends LeafNode { +case object OneRowRelation extends LeafNode { override def maxRows: Option[Long] = Some(1) override def output: Seq[Attribute] = Nil + + /** + * Computes [[Statistics]] for this plan. The default implementation assumes the output + * cardinality is the product of of all child plan's cardinality, i.e. applies in the case + * of cartesian joins. + * + * [[LeafNode]]s must override this. + */ override lazy val statistics: Statistics = Statistics(sizeInBytes = 1) } /** - * A relation with one row. This relation might be eliminated during optimization in favor of a - * LocalRelation. + * An inline table that holds a number of foldable expressions, which can be materialized into + * rows. This is semantically the same as a Union of one row relations. */ -case object OneRowRelation extends AbstractOneRowRelation +case class InlineTable(rows: Seq[Seq[NamedExpression]]) extends LeafNode { + lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved)) -/** - * A one row relation that should not be rewritten (during optimization). This is only for - * testing purposes. - */ -case object FixedOneRowRelation extends AbstractOneRowRelation + lazy val validDimensions: Boolean = { + val size = rows.headOption.map(_.size).getOrElse(0) + rows.tail.forall(_.size == size) + } + + override lazy val resolved: Boolean = { + def allRowsCompatible: Boolean = { + val expectedDataTypes = rows.headOption.toSeq.flatMap(_.map(_.dataType)) + rows.tail.forall { row => + row.map(_.dataType).zip(expectedDataTypes).forall { + case (dt1, dt2) => dt1 == dt2 + } + } + } + expressionsResolved && validDimensions && allRowsCompatible + } + + override def maxRows: Option[Long] = Some(rows.size) + + override def output: Seq[Attribute] = rows.transpose.map { + case column if column.forall(_.resolved) => + column.head.toAttribute.withNullability(column.exists(_.nullable)) + case column => + UnresolvedAttribute(column.head.name) + } + + override lazy val statistics: Statistics = { + Statistics(output.map(_.dataType.defaultSize).sum * rows.size) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 48e1f9f454596..d6a9672d1f186 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer -import org.apache.spark.sql.catalyst.plans.logical.{FixedOneRowRelation, Project} +import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.catalyst.util.MapData import org.apache.spark.sql.types.DataType import org.apache.spark.util.Utils @@ -165,7 +165,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { - val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, FixedOneRowRelation) + val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) val optimizedPlan = SimpleTestOptimizer.execute(plan) checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala index 354f914cf6642..118f5627ff374 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, OneRowRelation, Union} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -53,20 +53,11 @@ class ConvertToLocalRelationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("Project on OneRowRelation should be turned into a single LocalRelation") { - val testRelation = OneRowRelation.select(Literal(1).as("a")) - val correctAnswer = LocalRelation( - LocalRelation('a.int.withNullability(false)).output, - InternalRow(1) :: Nil) - val optimized = Optimize.execute(testRelation.analyze) - comparePlans(optimized, correctAnswer) - } - - test("Union of LocalRelations should be turned into a single LocalRelation") { - val testRelation = Union( - OneRowRelation.select(Literal(1).as("a")) :: - OneRowRelation.select(Literal(2).as("a")) :: - OneRowRelation.select(Literal(3).as("a")) :: Nil) + test("InlineTable should be turned into a single LocalRelation") { + val testRelation = InlineTable( + Seq(Literal(1).as("a")) :: + Seq(Literal(2).as("a")) :: + Seq(Literal(3).as("a")) :: Nil) val correctAnswer = LocalRelation( LocalRelation('a.int.withNullability(false)).output, InternalRow(1) :: InternalRow(2) :: InternalRow(3) :: Nil) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 36294eda25208..c9ba4d3a8e9f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.catalyst.parser -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedGenerator} +import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -424,22 +423,21 @@ class PlanParserSuite extends PlanTest { } test("inline table") { - def project(values: Seq[Any], names: Seq[String]): Project = { - val expressions = names match { - case Seq() => - values.zipWithIndex.map(x => Alias(Literal(x._1), s"col${x._2 + 1}")()) - case aliases if aliases.size == values.size => - values.zip(aliases).map(x => Alias(Literal(x._1), x._2)()) + def rows(names: String*)(values: Any*): Seq[Seq[NamedExpression]] = { + def row(values: Seq[Any]): Seq[NamedExpression] = values.zip(names).map { + case (value, name) => Alias(Literal(value), name)() + } + values.map { + case elements: Seq[Any] => row(elements) + case element => row(Seq(element)) } - Project(expressions, OneRowRelation) } - assertEqual( "values 1, 2, 3, 4", - Union(Seq(1, 2, 3, 4).map(x => project(Seq(x), Seq.empty)))) + InlineTable(rows("col1")(1, 2, 3, 4))) assertEqual( "values (1, 'a'), (2, 'b'), (3, 'c') as tbl(a, b)", - Union(Seq(Seq(1, "a"), Seq(2, "b"), Seq(3, "c")).map(project(_, Seq("a", "b")))).as("tbl")) + InlineTable(rows("a", "b")(Seq(1, "a"), Seq(2, "b"), Seq(3, "c"))).as("tbl")) intercept("values (1, 'a'), (2, 'b') as tbl(a, b, c)", "Number of aliases", "must match the number of fields", "in an inline table") intercept("values (1, 'a'), (2, 'b', 5Y)", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5db1d89e7d0bb..62cfd24041b3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1192,8 +1192,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-10740: handle nondeterministic expressions correctly for set operations") { - val df1 = spark.range(1, 20).select('id.cast("int").as("i")) - val df2 = spark.range(1, 10).select('id.cast("int").as("i")) + val df1 = (1 to 20).map(Tuple1.apply).toDF("i") + val df2 = (1 to 10).map(Tuple1.apply).toDF("i") // When generating expected results at here, we need to follow the implementation of // Rand expression. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala index b03f54a275390..aecfd3062147c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, ReturnAnswer, Union} import org.apache.spark.sql.test.SharedSQLContext class SparkPlannerSuite extends SharedSQLContext { @@ -40,9 +40,9 @@ class SparkPlannerSuite extends SharedSQLContext { case Union(children) => planned += 1 UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil - case r: Range => + case LocalRelation(output, data) => planned += 1 - RangeExec(r) :: planLater(NeverPlanned) :: Nil + LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil case NeverPlanned => fail("QueryPlanner should not go down to this branch.") case _ => Nil @@ -52,9 +52,9 @@ class SparkPlannerSuite extends SharedSQLContext { try { spark.experimental.extraStrategies = TestStrategy :: Nil - val ds = spark.range(1, 3).union(spark.range(3, 6)) + val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS()) - assert(ds.collect().toSeq === Seq(1, 2, 3, 4, 5)) + assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f")) assert(planned === 4) } finally { spark.experimental.extraStrategies = Nil diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index c4e6493105253..98afd99a203ac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -104,23 +104,20 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } test("EXPLAIN CODEGEN command") { - withTempView("x") { - spark.range(0, 1).createOrReplaceTempView("x") - checkKeywordsExist(sql("EXPLAIN CODEGEN SELECT 1 FROM X"), - "WholeStageCodegen", - "Generated code:", - "/* 001 */ public Object generate(Object[] references) {", - "/* 002 */ return new GeneratedIterator(references);", - "/* 003 */ }" - ) + checkKeywordsExist(sql("EXPLAIN CODEGEN SELECT 1"), + "WholeStageCodegen", + "Generated code:", + "/* 001 */ public Object generate(Object[] references) {", + "/* 002 */ return new GeneratedIterator(references);", + "/* 003 */ }" + ) - checkKeywordsNotExist(sql("EXPLAIN CODEGEN SELECT 1 FROM X"), - "== Physical Plan ==" - ) + checkKeywordsNotExist(sql("EXPLAIN CODEGEN SELECT 1"), + "== Physical Plan ==" + ) - intercept[ParseException] { - sql("EXPLAIN EXTENDED CODEGEN SELECT 1 FROM X") - } + intercept[ParseException] { + sql("EXPLAIN EXTENDED CODEGEN SELECT 1") } } }