From 9351d1dea11a510447a3337bc9b2f6a0061dad1b Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 27 Jul 2022 19:44:20 +0800 Subject: [PATCH 01/11] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions) --- .../v2/V2ScanRelationPushDown.scala | 31 +- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 293 +++++++++++++++--- 2 files changed, 282 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index c7b09904df41d..e0582811fe920 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -189,12 +189,14 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // +- ScanBuilderHolder[group_col_0#10, agg_func_0#21, agg_func_1#22] // Later, we build the `Scan` instance and convert ScanBuilderHolder to DataSourceV2ScanRelation. // scalastyle:on - val groupOutput = normalizedGroupingExpr.zipWithIndex.map { case (e, i) => - AttributeReference(s"group_col_$i", e.dataType)() + val groupOutputMap = normalizedGroupingExpr.zipWithIndex.map { case (e, i) => + AttributeReference(s"group_col_$i", e.dataType)() -> e } - val aggOutput = finalAggExprs.zipWithIndex.map { case (e, i) => - AttributeReference(s"agg_func_$i", e.dataType)() + val groupOutput = groupOutputMap.unzip._1 + val aggOutputMap = finalAggExprs.zipWithIndex.map { case (e, i) => + AttributeReference(s"agg_func_$i", e.dataType)() -> e } + val aggOutput = aggOutputMap.unzip._1 val newOutput = groupOutput ++ aggOutput val groupByExprToOutputOrdinal = mutable.HashMap.empty[Expression, Int] normalizedGroupingExpr.zipWithIndex.foreach { case (expr, ordinal) => @@ -204,6 +206,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } holder.pushedAggregate = Some(translatedAgg) + holder.pushedAggregateExpectedOutputMap = (groupOutputMap ++ aggOutputMap).toMap holder.output = newOutput logInfo( s""" @@ -412,10 +415,21 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // push-down, and thus can't push down Top-N which needs to know the ordering column names. // TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same // columns, which we know the resulting column names: the original table columns. - if sHolder.pushedAggregate.isEmpty && - CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => + if CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + def findGroupExprForSortOrder(sortOrder: SortOrder): SortOrder = sortOrder match { + case SortOrder(attr: AttributeReference, _, _, sameOrderExpressions) => + val originAttr = sHolder.pushedAggregateExpectedOutputMap(attr) + sortOrder.withNewChildren(originAttr +: sameOrderExpressions).asInstanceOf[SortOrder] + case _ => sortOrder + } + + val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + val newOrder = if (sHolder.pushedAggregate.isDefined) { + aliasReplacedOrder.map(findGroupExprForSortOrder) + } else { + aliasReplacedOrder + } val normalizedOrders = DataSourceStrategy.normalizeExprs( newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]] val orders = DataSourceStrategy.translateSortOrders(normalizedOrders) @@ -545,6 +559,9 @@ case class ScanBuilderHolder( var pushedPredicates: Seq[Predicate] = Seq.empty[Predicate] var pushedAggregate: Option[Aggregation] = None + + var pushedAggregateExpectedOutputMap: Map[AttributeReference, Expression] = + Map.empty[AttributeReference, Expression] } // A wrapper for v1 scan to carry the translated filters and the handled ones, along with diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 02dff0973fe12..c95e7a0433879 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -775,59 +775,45 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1, "cathy", 9000.00, 1200.0, false), Row(1, "amy", 10000.00, 1000.0, true))) - val df6 = spark.read - .table("h2.test.employee") - .groupBy("DEPT").sum("SALARY") - .orderBy("DEPT") - .limit(1) - checkSortRemoved(df6, false) - checkLimitRemoved(df6, false) - checkPushedInfo(df6, - "PushedAggregates: [SUM(SALARY)]", - "PushedFilters: []", - "PushedGroupByExpressions: [DEPT]") - checkAnswer(df6, Seq(Row(1, 19000.00))) - val name = udf { (x: String) => x.matches("cat|dav|amy") } val sub = udf { (x: String) => x.substring(0, 3) } - val df7 = spark.read + val df6 = spark.read .table("h2.test.employee") .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) .filter(name($"shortName")) .sort($"SALARY".desc) .limit(1) // LIMIT is pushed down only if all the filters are pushed down - checkSortRemoved(df7, false) - checkLimitRemoved(df7, false) - checkPushedInfo(df7, "PushedFilters: []") - checkAnswer(df7, Seq(Row(10000.00, 1000.0, "amy"))) + checkSortRemoved(df6, false) + checkLimitRemoved(df6, false) + checkPushedInfo(df6, "PushedFilters: [], ") + checkAnswer(df6, Seq(Row(10000.00, 1000.0, "amy"))) - val df8 = spark.read + val df7 = spark.read .table("h2.test.employee") .sort(sub($"NAME")) .limit(1) - checkSortRemoved(df8, false) - checkLimitRemoved(df8, false) - checkPushedInfo(df8, "PushedFilters: []") - checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false))) + checkSortRemoved(df7, false) + checkLimitRemoved(df7, false) + checkPushedInfo(df7, "PushedFilters: [], ") + checkAnswer(df7, Seq(Row(2, "alex", 12000.00, 1200.0, false))) - val df9 = spark.read + val df8 = spark.read .table("h2.test.employee") .select($"DEPT", $"name", $"SALARY", when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) .sort("key", "dept", "SALARY") .limit(3) - checkSortRemoved(df9) - checkLimitRemoved(df9) - checkPushedInfo(df9, - "PushedFilters: []", + checkSortRemoved(df8) + checkLimitRemoved(df8) + checkPushedInfo(df8, "PushedFilters: [], " + "PushedTopN: " + "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") - checkAnswer(df9, + checkAnswer(df8, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) - val df10 = spark.read + val df9 = spark.read .option("partitionColumn", "dept") .option("lowerBound", "0") .option("upperBound", "2") @@ -837,14 +823,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) .orderBy($"key", $"dept", $"SALARY") .limit(3) - checkSortRemoved(df10, false) - checkLimitRemoved(df10, false) - checkPushedInfo(df10, - "PushedFilters: []", + checkSortRemoved(df9, false) + checkLimitRemoved(df9, false) + checkPushedInfo(df9, "PushedFilters: [], " + "PushedTopN: " + "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") - checkAnswer(df10, + checkAnswer(df9, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) } @@ -873,6 +858,244 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 10000.00))) } + test("scan with aggregate push-down and top N push-down") { + val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) + checkSortRemoved(df1) + checkLimitRemoved(df1) + checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") + checkAnswer(df1, Seq(Row(1, 19000.00))) + + val df2 = sql( + """ + |SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept + |ORDER BY my_dept + |LIMIT 1 + |""".stripMargin) + checkSortRemoved(df2) + checkLimitRemoved(df2) + checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") + checkAnswer(df2, Seq(Row(1, 19000.00))) + + val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", + when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) + checkSortRemoved(df3) + checkLimitRemoved(df3) + checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + + "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + + "CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + + "ASC NULLS FIRST] LIMIT 1") + checkAnswer(df3, Seq(Row(0, 44000.00))) + + val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) + checkSortRemoved(df4) + checkLimitRemoved(df4) + checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") + checkAnswer(df4, Seq(Row(1, false, 9000.00))) + + val df5 = sql( + """ + |SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept, my_manager + |ORDER BY my_dept, my_manager + |LIMIT 1 + |""".stripMargin) + checkSortRemoved(df5) + checkLimitRemoved(df5) + checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") + checkAnswer(df5, Seq(Row(1, false, 9000.00))) + + val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", + when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) + checkSortRemoved(df6) + checkLimitRemoved(df6) + checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + + "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END, " + + "IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + + "CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + + "ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") + checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + + val df7 = sql( + """ + |SELECT dept, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept + |ORDER BY SUM(SALARY) + |LIMIT 1 + |""".stripMargin) + checkSortRemoved(df7, false) + checkLimitRemoved(df7, false) + checkPushedInfo(df7, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df7, Seq(Row(6, 12000.00))) + + val df8 = sql( + """ + |SELECT dept, SUM(SALARY) AS total FROM h2.test.employee + |GROUP BY dept + |ORDER BY total + |LIMIT 1 + |""".stripMargin) + checkSortRemoved(df8, false) + checkLimitRemoved(df8, false) + checkPushedInfo(df8, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df8, Seq(Row(6, 12000.00))) + } + + test("scan with aggregate push-down and paging push-down") { + val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .offset(1) + .limit(1) + checkSortRemoved(df1) + checkLimitRemoved(df1) + checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2") + checkAnswer(df1, Seq(Row(2, 22000.00))) + + val df2 = sql( + """ + |SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept + |ORDER BY my_dept + |LIMIT 1 + |OFFSET 1 + |""".stripMargin) + checkSortRemoved(df2) + checkLimitRemoved(df2) + checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2") + checkAnswer(df2, Seq(Row(2, 22000.00))) + + val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", + when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .offset(1) + .limit(1) + checkSortRemoved(df3) + checkLimitRemoved(df3) + checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + + "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [" + + "CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + + "ASC NULLS FIRST] LIMIT 2") + checkAnswer(df3, Seq(Row(9000, 9000.00))) + + val df4 = sql( + """ + |SELECT dept AS my_dept, is_manager, SUM(SALARY) FROM h2.test.employee + |GROUP BY my_dept, is_manager + |ORDER BY my_dept, is_manager + |LIMIT 1 + |OFFSET 1 + |""".stripMargin) + checkSortRemoved(df4) + checkLimitRemoved(df4) + checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2") + checkAnswer(df4, Seq(Row(1, true, 10000.00))) + + val df5 = sql( + """ + |SELECT dept, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept + |ORDER BY SUM(SALARY) + |LIMIT 1 + |OFFSET 1 + |""".stripMargin) + checkSortRemoved(df5, false) + checkLimitRemoved(df5, false) + checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df5, Seq(Row(1, 19000.00))) + + val df6 = sql( + """ + |SELECT dept, SUM(SALARY) AS total FROM h2.test.employee + |GROUP BY dept + |ORDER BY total + |LIMIT 1 + |OFFSET 1 + |""".stripMargin) + checkSortRemoved(df6, false) + checkLimitRemoved(df6, false) + checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df6, Seq(Row(1, 19000.00))) + } + test("scan with filter push-down") { val df = spark.table("h2.test.people").filter($"id" > 1) checkFiltersRemoved(df) From a920f4518971293188929b74c0b5dd9d5ddb6b2f Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 27 Jul 2022 19:44:40 +0800 Subject: [PATCH 02/11] Update code --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index e0582811fe920..7de7926f97d44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -413,8 +413,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { case s @ Sort(order, _, operation @ PhysicalOperation(project, Nil, sHolder: ScanBuilderHolder)) // Without building the Scan, we do not know the resulting column names after aggregate // push-down, and thus can't push down Top-N which needs to know the ordering column names. - // TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same - // columns, which we know the resulting column names: the original table columns. + // In particular, we push down the simple cases like GROUP BY expressions directly and + // ORDER BY the same expressions, which we know the original table columns. if CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) def findGroupExprForSortOrder(sortOrder: SortOrder): SortOrder = sortOrder match { From b417cfcb820018b7bb1683e3c3697af392756cc0 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 29 Jul 2022 12:06:37 +0800 Subject: [PATCH 03/11] Update code --- .../datasources/v2/V2ScanRelationPushDown.scala | 16 ++++++++++------ .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 7de7926f97d44..8d035eac6a8c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -206,7 +206,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } holder.pushedAggregate = Some(translatedAgg) - holder.pushedAggregateExpectedOutputMap = (groupOutputMap ++ aggOutputMap).toMap + holder.pushedAggOutputMap = AttributeMap(groupOutputMap ++ aggOutputMap) holder.output = newOutput logInfo( s""" @@ -419,14 +419,19 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { val aliasMap = getAliasMap(project) def findGroupExprForSortOrder(sortOrder: SortOrder): SortOrder = sortOrder match { case SortOrder(attr: AttributeReference, _, _, sameOrderExpressions) => - val originAttr = sHolder.pushedAggregateExpectedOutputMap(attr) + val originAttr = sHolder.pushedAggOutputMap(attr) sortOrder.withNewChildren(originAttr +: sameOrderExpressions).asInstanceOf[SortOrder] case _ => sortOrder } + def replaceAggOutput(sortOrder: SortOrder): SortOrder = { + sortOrder.transform { + case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a) + }.asInstanceOf[SortOrder] + } val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] val newOrder = if (sHolder.pushedAggregate.isDefined) { - aliasReplacedOrder.map(findGroupExprForSortOrder) + aliasReplacedOrder.map(replaceAggOutput(_)) } else { aliasReplacedOrder } @@ -560,8 +565,7 @@ case class ScanBuilderHolder( var pushedAggregate: Option[Aggregation] = None - var pushedAggregateExpectedOutputMap: Map[AttributeReference, Expression] = - Map.empty[AttributeReference, Expression] + var pushedAggOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression] } // A wrapper for v1 scan to carry the translated filters and the handled ones, along with diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index c95e7a0433879..250e168bd6527 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -1009,7 +1009,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val df2 = sql( """ |SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee - |GROUP BY dept + |GROUP BY my_dept |ORDER BY my_dept |LIMIT 1 |OFFSET 1 From 3653fa11443938d9d5859770c86061912e1940d4 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 29 Jul 2022 15:29:49 +0800 Subject: [PATCH 04/11] Update code --- .../v2/V2ScanRelationPushDown.scala | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 8d035eac6a8c9..dac8a616cc089 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -417,23 +417,15 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // ORDER BY the same expressions, which we know the original table columns. if CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - def findGroupExprForSortOrder(sortOrder: SortOrder): SortOrder = sortOrder match { - case SortOrder(attr: AttributeReference, _, _, sameOrderExpressions) => - val originAttr = sHolder.pushedAggOutputMap(attr) - sortOrder.withNewChildren(originAttr +: sameOrderExpressions).asInstanceOf[SortOrder] - case _ => sortOrder - } - def replaceAggOutput(sortOrder: SortOrder): SortOrder = { - sortOrder.transform { - case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a) - }.asInstanceOf[SortOrder] - } - - val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)) val newOrder = if (sHolder.pushedAggregate.isDefined) { - aliasReplacedOrder.map(replaceAggOutput(_)) + aliasReplacedOrder.map { + _.transform { + case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a) + }.asInstanceOf[SortOrder] + } } else { - aliasReplacedOrder + aliasReplacedOrder.asInstanceOf[Seq[SortOrder]] } val normalizedOrders = DataSourceStrategy.normalizeExprs( newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]] From c509e85444e9c8813e36b18da3726b80c886ece7 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 29 Jul 2022 20:48:02 +0800 Subject: [PATCH 05/11] Update code --- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 130 ++++++++++-------- 1 file changed, 70 insertions(+), 60 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 250e168bd6527..2c7bdedc5573c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -873,13 +873,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") checkAnswer(df1, Seq(Row(1, 19000.00))) - val df2 = sql( - """ - |SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee - |GROUP BY dept - |ORDER BY my_dept - |LIMIT 1 - |""".stripMargin) + val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) checkSortRemoved(df2) checkLimitRemoved(df2) checkPushedInfo(df2, @@ -922,13 +921,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") checkAnswer(df4, Seq(Row(1, false, 9000.00))) - val df5 = sql( - """ - |SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM h2.test.employee - |GROUP BY dept, my_manager - |ORDER BY my_dept, my_manager - |LIMIT 1 - |""".stripMargin) + val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") + .groupBy("my_dept", "my_manager").sum("SALARY") + .orderBy("my_dept", "my_manager") + .limit(1) checkSortRemoved(df5) checkLimitRemoved(df5) checkPushedInfo(df5, @@ -958,13 +956,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) - val df7 = sql( - """ - |SELECT dept, SUM(SALARY) FROM h2.test.employee - |GROUP BY dept - |ORDER BY SUM(SALARY) - |LIMIT 1 - |""".stripMargin) + val df7 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) + .limit(1) checkSortRemoved(df7, false) checkLimitRemoved(df7, false) checkPushedInfo(df7, @@ -973,13 +970,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedFilters: []") checkAnswer(df7, Seq(Row(6, 12000.00))) - val df8 = sql( - """ - |SELECT dept, SUM(SALARY) AS total FROM h2.test.employee - |GROUP BY dept - |ORDER BY total - |LIMIT 1 - |""".stripMargin) + val df8 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY").as("total")) + .orderBy("total") + .limit(1) checkSortRemoved(df8, false) checkLimitRemoved(df8, false) checkPushedInfo(df8, @@ -1006,14 +1002,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2") checkAnswer(df1, Seq(Row(2, 22000.00))) - val df2 = sql( - """ - |SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee - |GROUP BY my_dept - |ORDER BY my_dept - |LIMIT 1 - |OFFSET 1 - |""".stripMargin) + val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .offset(1) + .limit(1) checkSortRemoved(df2) checkLimitRemoved(df2) checkPushedInfo(df2, @@ -1045,14 +1040,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "ASC NULLS FIRST] LIMIT 2") checkAnswer(df3, Seq(Row(9000, 9000.00))) - val df4 = sql( - """ - |SELECT dept AS my_dept, is_manager, SUM(SALARY) FROM h2.test.employee - |GROUP BY my_dept, is_manager - |ORDER BY my_dept, is_manager - |LIMIT 1 - |OFFSET 1 - |""".stripMargin) + val df4 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER", $"SALARY") + .groupBy("my_dept", "is_manager").sum("SALARY") + .orderBy("my_dept", "is_manager") + .offset(1) + .limit(1) checkSortRemoved(df4) checkLimitRemoved(df4) checkPushedInfo(df4, @@ -1063,14 +1057,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2") checkAnswer(df4, Seq(Row(1, true, 10000.00))) - val df5 = sql( - """ - |SELECT dept, SUM(SALARY) FROM h2.test.employee - |GROUP BY dept - |ORDER BY SUM(SALARY) - |LIMIT 1 - |OFFSET 1 - |""".stripMargin) + val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) + .offset(1) + .limit(1) checkSortRemoved(df5, false) checkLimitRemoved(df5, false) checkPushedInfo(df5, @@ -1079,14 +1072,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedFilters: []") checkAnswer(df5, Seq(Row(1, 19000.00))) - val df6 = sql( - """ - |SELECT dept, SUM(SALARY) AS total FROM h2.test.employee - |GROUP BY dept - |ORDER BY total - |LIMIT 1 - |OFFSET 1 - |""".stripMargin) + val df6 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY").as("total")) + .orderBy("total") + .offset(1) + .limit(1) checkSortRemoved(df6, false) checkLimitRemoved(df6, false) checkPushedInfo(df6, @@ -1094,6 +1086,24 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedGroupByExpressions: [DEPT]", "PushedFilters: []") checkAnswer(df6, Seq(Row(1, 19000.00))) + + val df7 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"IS_MANAGER", $"SALARY") + .groupBy("dept", "is_manager").sum("SALARY") + .orderBy(when($"is_manager", $"dept").otherwise(0)) + .offset(1) + .limit(1) + checkSortRemoved(df7) + checkLimitRemoved(df7) + checkPushedInfo(df7, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: " + + "ORDER BY [CASE WHEN IS_MANAGER = true THEN DEPT ELSE 0 END ASC NULLS FIRST] LIMIT 2") + checkAnswer(df7, Seq(Row(1, false, 9000.00))) } test("scan with filter push-down") { From e3cd5cb569b393a2c2b3908e3013ef399c91ec15 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 2 Aug 2022 10:31:11 +0800 Subject: [PATCH 06/11] Update cod --- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 2c7bdedc5573c..cd5b2ba2ea4dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -786,7 +786,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel // LIMIT is pushed down only if all the filters are pushed down checkSortRemoved(df6, false) checkLimitRemoved(df6, false) - checkPushedInfo(df6, "PushedFilters: [], ") + checkPushedInfo(df6, "PushedFilters: []") checkAnswer(df6, Seq(Row(10000.00, 1000.0, "amy"))) val df7 = spark.read @@ -795,7 +795,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .limit(1) checkSortRemoved(df7, false) checkLimitRemoved(df7, false) - checkPushedInfo(df7, "PushedFilters: [], ") + checkPushedInfo(df7, "PushedFilters: []") checkAnswer(df7, Seq(Row(2, "alex", 12000.00, 1200.0, false))) val df8 = spark.read @@ -806,10 +806,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .limit(3) checkSortRemoved(df8) checkLimitRemoved(df8) - checkPushedInfo(df8, "PushedFilters: [], " + - "PushedTopN: " + - "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + - "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") + checkPushedInfo(df8, + "PushedFilters: []", + "PushedTopN: ORDER BY " + + "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END" + + " ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3") checkAnswer(df8, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) @@ -825,10 +826,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .limit(3) checkSortRemoved(df9, false) checkLimitRemoved(df9, false) - checkPushedInfo(df9, "PushedFilters: [], " + - "PushedTopN: " + - "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + - "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") + checkPushedInfo(df9, + "PushedFilters: []", + "PushedTopN: ORDER BY " + + "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3") checkAnswer(df9, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) } From e4d4eb57ac5a1b71349422f80a14f5d7d33e0d89 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 2 Aug 2022 11:55:48 +0800 Subject: [PATCH 07/11] Update code --- .../v2/V2ScanRelationPushDown.scala | 12 ++++--- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 33 ++++++++++--------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index dac8a616cc089..9bda5a2acf690 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -411,14 +411,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } (operation, isPushed && !isPartiallyPushed) case s @ Sort(order, _, operation @ PhysicalOperation(project, Nil, sHolder: ScanBuilderHolder)) - // Without building the Scan, we do not know the resulting column names after aggregate - // push-down, and thus can't push down Top-N which needs to know the ordering column names. - // In particular, we push down the simple cases like GROUP BY expressions directly and - // ORDER BY the same expressions, which we know the original table columns. - if CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => + if CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)) val newOrder = if (sHolder.pushedAggregate.isDefined) { + // Without building the Scan, Aggregate push-down give the expected output starts with + // `group_col_` or `agg_func_`. When Aggregate push-down working with Sort for top n + // push-down, we need replace these expected output with the origin expressions. aliasReplacedOrder.map { _.transform { case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a) @@ -429,6 +428,9 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } val normalizedOrders = DataSourceStrategy.normalizeExprs( newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]] + // Because V2ExpressionBuilder can't translate aggregate functions, so we can't + // translate the sort with aggregate functions. + // TODO V2ExpressionBuilder could translate aggregate functions. val orders = DataSourceStrategy.translateSortOrders(normalizedOrders) if (orders.length == order.length) { val (isPushed, isPartiallyPushed) = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index cd5b2ba2ea4dd..53b27df4a0037 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -909,25 +909,26 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "ASC NULLS FIRST] LIMIT 1") checkAnswer(df3, Seq(Row(0, 44000.00))) - val df4 = spark.read - .table("h2.test.employee") - .groupBy("DEPT", "IS_MANAGER").sum("SALARY") - .orderBy("DEPT", "IS_MANAGER") - .limit(1) - checkSortRemoved(df4) - checkLimitRemoved(df4) - checkPushedInfo(df4, - "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT, IS_MANAGER]", - "PushedFilters: []", - "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") - checkAnswer(df4, Seq(Row(1, false, 9000.00))) + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + val df4 = spark.read + .table("h2.test.employee") + .groupBy("dept").sum("SALARY") + .orderBy($"dept" + 1) + .limit(1) + checkSortRemoved(df4) + checkLimitRemoved(df4) + checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT + 1 ASC NULLS FIRST] LIMIT 1") + checkAnswer(df4, Seq(Row(1, 19000.00))) + } val df5 = spark.read .table("h2.test.employee") - .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") - .groupBy("my_dept", "my_manager").sum("SALARY") - .orderBy("my_dept", "my_manager") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") .limit(1) checkSortRemoved(df5) checkLimitRemoved(df5) From 515e95d08867b0779e294680fdc1f747f8f00732 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 2 Aug 2022 12:08:11 +0800 Subject: [PATCH 08/11] Update code --- .../test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 53b27df4a0037..960677d04b1ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -913,7 +913,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val df4 = spark.read .table("h2.test.employee") .groupBy("dept").sum("SALARY") - .orderBy($"dept" + 1) + .orderBy($"dept" + 100) .limit(1) checkSortRemoved(df4) checkLimitRemoved(df4) @@ -921,7 +921,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", - "PushedTopN: ORDER BY [DEPT + 1 ASC NULLS FIRST] LIMIT 1") + "PushedTopN: ORDER BY [DEPT + 100 ASC NULLS FIRST] LIMIT 1") checkAnswer(df4, Seq(Row(1, 19000.00))) } From ecd7dccd3e8b95cae4d327c977038c3fa9c203b8 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 2 Aug 2022 14:37:35 +0800 Subject: [PATCH 09/11] Update code --- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 960677d04b1ed..69649a521361c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -909,21 +909,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "ASC NULLS FIRST] LIMIT 1") checkAnswer(df3, Seq(Row(0, 44000.00))) - withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { - val df4 = spark.read - .table("h2.test.employee") - .groupBy("dept").sum("SALARY") - .orderBy($"dept" + 100) - .limit(1) - checkSortRemoved(df4) - checkLimitRemoved(df4) - checkPushedInfo(df4, - "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT]", - "PushedFilters: []", - "PushedTopN: ORDER BY [DEPT + 100 ASC NULLS FIRST] LIMIT 1") - checkAnswer(df4, Seq(Row(1, 19000.00))) - } + val df4 = spark.read + .table("h2.test.employee") + .groupBy("dept").sum("SALARY") + .orderBy($"dept".gt(1)) + .limit(1) + checkSortRemoved(df4) + checkLimitRemoved(df4) + checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT > 1 ASC NULLS FIRST] LIMIT 1") + checkAnswer(df4, Seq(Row(1, 19000.00))) val df5 = spark.read .table("h2.test.employee") From c6b20b042cf67c20c6996d6f6d328bb29aab2a71 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Sat, 6 Aug 2022 13:52:23 +0800 Subject: [PATCH 10/11] Update code --- .../spark/sql/connector/expressions/Cast.java | 5 + .../v2/V2ScanRelationPushDown.scala | 8 +- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 165 +++++++----------- 3 files changed, 71 insertions(+), 107 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Cast.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Cast.java index 26b97b46fe2ef..44111913f124b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Cast.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Cast.java @@ -42,4 +42,9 @@ public Cast(Expression expression, DataType dataType) { @Override public Expression[] children() { return new Expression[]{ expression() }; } + + @Override + public String toString() { + return "CAST(" + expression.describe() + " AS " + dataType.typeName() + ")"; + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 9bda5a2acf690..27daa899583e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -415,9 +415,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { val aliasMap = getAliasMap(project) val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)) val newOrder = if (sHolder.pushedAggregate.isDefined) { - // Without building the Scan, Aggregate push-down give the expected output starts with - // `group_col_` or `agg_func_`. When Aggregate push-down working with Sort for top n - // push-down, we need replace these expected output with the origin expressions. + // `ScanBuilderHolder` has different output columns after aggregate push-down. Here we + // replace the attributes in ordering expressions with the original table output columns. aliasReplacedOrder.map { _.transform { case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a) @@ -428,9 +427,6 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } val normalizedOrders = DataSourceStrategy.normalizeExprs( newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]] - // Because V2ExpressionBuilder can't translate aggregate functions, so we can't - // translate the sort with aggregate functions. - // TODO V2ExpressionBuilder could translate aggregate functions. val orders = DataSourceStrategy.translateSortOrders(normalizedOrders) if (orders.length == order.length) { val (isPushed, isPartiallyPushed) = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 69649a521361c..be97c7f592068 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -877,7 +877,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val df2 = spark.read .table("h2.test.employee") - .select($"DEPT".as("my_dept"), $"SALARY") + .select($"DEPT".cast("string").as("my_dept"), $"SALARY") .groupBy("my_dept").sum("SALARY") .orderBy("my_dept") .limit(1) @@ -885,105 +885,84 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkLimitRemoved(df2) checkPushedInfo(df2, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT]", + "PushedGroupByExpressions: [CAST(DEPT AS string)]", "PushedFilters: []", - "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") - checkAnswer(df2, Seq(Row(1, 19000.00))) + "PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 1") + checkAnswer(df2, Seq(Row("1", 19000.00))) val df3 = spark.read .table("h2.test.employee") - .select($"SALARY", - when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) - .groupBy("key").sum("SALARY") - .orderBy("key") + .groupBy("dept").sum("SALARY") + .orderBy($"dept".cast("string")) .limit(1) checkSortRemoved(df3) checkLimitRemoved(df3) checkPushedInfo(df3, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: " + - "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END]", + "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", - "PushedTopN: ORDER BY [" + - "CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + - "ASC NULLS FIRST] LIMIT 1") - checkAnswer(df3, Seq(Row(0, 44000.00))) + "PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 1") + checkAnswer(df3, Seq(Row(1, 19000.00))) val df4 = spark.read .table("h2.test.employee") - .groupBy("dept").sum("SALARY") - .orderBy($"dept".gt(1)) + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") .limit(1) checkSortRemoved(df4) checkLimitRemoved(df4) checkPushedInfo(df4, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", "PushedFilters: []", - "PushedTopN: ORDER BY [DEPT > 1 ASC NULLS FIRST] LIMIT 1") - checkAnswer(df4, Seq(Row(1, 19000.00))) + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") + checkAnswer(df4, Seq(Row(1, false, 9000.00))) val df5 = spark.read .table("h2.test.employee") - .groupBy("DEPT", "IS_MANAGER").sum("SALARY") - .orderBy("DEPT", "IS_MANAGER") + .select($"SALARY", $"IS_MANAGER", $"DEPT".cast("string").as("my_dept")) + .groupBy("my_dept", "IS_MANAGER").sum("SALARY") + .orderBy("my_dept", "IS_MANAGER") .limit(1) checkSortRemoved(df5) checkLimitRemoved(df5) checkPushedInfo(df5, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedGroupByExpressions: [CAST(DEPT AS string), IS_MANAGER]", "PushedFilters: []", - "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") - checkAnswer(df5, Seq(Row(1, false, 9000.00))) + "PushedTopN: " + + "ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") + checkAnswer(df5, Seq(Row("1", false, 9000.00))) val df6 = spark.read .table("h2.test.employee") - .select($"SALARY", $"IS_MANAGER", - when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) - .groupBy("key", "IS_MANAGER").sum("SALARY") - .orderBy("key", "IS_MANAGER") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) .limit(1) checkSortRemoved(df6) checkLimitRemoved(df6) checkPushedInfo(df6, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: " + - "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END, " + - "IS_MANAGER]", + "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", - "PushedTopN: ORDER BY [" + - "CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + - "ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") - checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1") + checkAnswer(df6, Seq(Row(6, 12000.00))) val df7 = spark.read - .table("h2.test.employee") - .select($"DEPT", $"SALARY") - .groupBy("dept").agg(sum("SALARY")) - .orderBy(sum("SALARY")) - .limit(1) - checkSortRemoved(df7, false) - checkLimitRemoved(df7, false) - checkPushedInfo(df7, - "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT]", - "PushedFilters: []") - checkAnswer(df7, Seq(Row(6, 12000.00))) - - val df8 = spark.read .table("h2.test.employee") .select($"DEPT", $"SALARY") .groupBy("dept").agg(sum("SALARY").as("total")) .orderBy("total") .limit(1) - checkSortRemoved(df8, false) - checkLimitRemoved(df8, false) - checkPushedInfo(df8, + checkSortRemoved(df7) + checkLimitRemoved(df7) + checkPushedInfo(df7, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", - "PushedFilters: []") - checkAnswer(df8, Seq(Row(6, 12000.00))) + "PushedFilters: []", + "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1") + checkAnswer(df7, Seq(Row(6, 12000.00))) } test("scan with aggregate push-down and paging push-down") { @@ -1005,7 +984,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val df2 = spark.read .table("h2.test.employee") - .select($"DEPT".as("my_dept"), $"SALARY") + .select($"DEPT".cast("string").as("my_dept"), $"SALARY") .groupBy("my_dept").sum("SALARY") .orderBy("my_dept") .offset(1) @@ -1014,97 +993,81 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkLimitRemoved(df2) checkPushedInfo(df2, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT]", + "PushedGroupByExpressions: [CAST(DEPT AS string)]", "PushedFilters: []", "PushedOffset: OFFSET 1", - "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2") - checkAnswer(df2, Seq(Row(2, 22000.00))) + "PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 2") + checkAnswer(df2, Seq(Row("2", 22000.00))) val df3 = spark.read .table("h2.test.employee") - .select($"SALARY", - when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) - .groupBy("key").sum("SALARY") - .orderBy("key") + .select($"DEPT".cast("string").as("my_dept"), $"IS_MANAGER", $"SALARY") + .groupBy("my_dept", "is_manager").sum("SALARY") + .orderBy("my_dept", "is_manager") .offset(1) .limit(1) checkSortRemoved(df3) checkLimitRemoved(df3) checkPushedInfo(df3, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: " + - "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END]", + "PushedGroupByExpressions: [CAST(DEPT AS string), IS_MANAGER]", "PushedFilters: []", "PushedOffset: OFFSET 1", - "PushedTopN: ORDER BY [" + - "CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + - "ASC NULLS FIRST] LIMIT 2") - checkAnswer(df3, Seq(Row(9000, 9000.00))) + "PushedTopN: " + + "ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2") + checkAnswer(df3, Seq(Row("1", true, 10000.00))) val df4 = spark.read .table("h2.test.employee") - .select($"DEPT".as("my_dept"), $"IS_MANAGER", $"SALARY") - .groupBy("my_dept", "is_manager").sum("SALARY") - .orderBy("my_dept", "is_manager") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) .offset(1) .limit(1) checkSortRemoved(df4) checkLimitRemoved(df4) checkPushedInfo(df4, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", "PushedOffset: OFFSET 1", - "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2") - checkAnswer(df4, Seq(Row(1, true, 10000.00))) + "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 2") + checkAnswer(df4, Seq(Row(1, 19000.00))) val df5 = spark.read .table("h2.test.employee") .select($"DEPT", $"SALARY") - .groupBy("dept").agg(sum("SALARY")) - .orderBy(sum("SALARY")) + .groupBy("dept").agg(sum("SALARY").as("total")) + .orderBy("total") .offset(1) .limit(1) - checkSortRemoved(df5, false) - checkLimitRemoved(df5, false) + checkSortRemoved(df5) + checkLimitRemoved(df5) checkPushedInfo(df5, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", - "PushedFilters: []") + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 2") checkAnswer(df5, Seq(Row(1, 19000.00))) val df6 = spark.read - .table("h2.test.employee") - .select($"DEPT", $"SALARY") - .groupBy("dept").agg(sum("SALARY").as("total")) - .orderBy("total") - .offset(1) - .limit(1) - checkSortRemoved(df6, false) - checkLimitRemoved(df6, false) - checkPushedInfo(df6, - "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT]", - "PushedFilters: []") - checkAnswer(df6, Seq(Row(1, 19000.00))) - - val df7 = spark.read .table("h2.test.employee") .select($"DEPT", $"IS_MANAGER", $"SALARY") .groupBy("dept", "is_manager").sum("SALARY") - .orderBy(when($"is_manager", $"dept").otherwise(0)) + .orderBy(when($"is_manager", $"dept").otherwise(0), $"dept") .offset(1) .limit(1) - checkSortRemoved(df7) - checkLimitRemoved(df7) - checkPushedInfo(df7, + checkSortRemoved(df6) + checkLimitRemoved(df6) + checkPushedInfo(df6, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT, IS_MANAGER]", "PushedFilters: []", "PushedOffset: OFFSET 1", - "PushedTopN: " + - "ORDER BY [CASE WHEN IS_MANAGER = true THEN DEPT ELSE 0 END ASC NULLS FIRST] LIMIT 2") - checkAnswer(df7, Seq(Row(1, false, 9000.00))) + "PushedTopN: ORDER BY [CASE WHEN IS_MANAGER = true THEN DEPT ELSE 0 END ASC NULLS FIRST, " + + "DEPT ASC NULLS FIRST] LIMIT 2") + checkAnswer(df6, Seq(Row(2, false, 12000.00))) } test("scan with filter push-down") { From 293590bf2b403c3f40abeb680b86742b41cf3771 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 9 Aug 2022 09:25:41 +0800 Subject: [PATCH 11/11] Update code --- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 246 ++++++++---------- 1 file changed, 113 insertions(+), 133 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index be97c7f592068..a8c770f46cd67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -860,214 +860,194 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 10000.00))) } - test("scan with aggregate push-down and top N push-down") { + test("scan with aggregate push-down, top N push-down and offset push-down") { val df1 = spark.read .table("h2.test.employee") .groupBy("DEPT").sum("SALARY") .orderBy("DEPT") - .limit(1) - checkSortRemoved(df1) - checkLimitRemoved(df1) - checkPushedInfo(df1, + + val paging1 = df1.offset(1).limit(1) + checkSortRemoved(paging1) + checkLimitRemoved(paging1) + checkPushedInfo(paging1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2") + checkAnswer(paging1, Seq(Row(2, 22000.00))) + + val topN1 = df1.limit(1) + checkSortRemoved(topN1) + checkLimitRemoved(topN1) + checkPushedInfo(topN1, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") - checkAnswer(df1, Seq(Row(1, 19000.00))) + checkAnswer(topN1, Seq(Row(1, 19000.00))) val df2 = spark.read .table("h2.test.employee") .select($"DEPT".cast("string").as("my_dept"), $"SALARY") .groupBy("my_dept").sum("SALARY") .orderBy("my_dept") - .limit(1) - checkSortRemoved(df2) - checkLimitRemoved(df2) - checkPushedInfo(df2, + + val paging2 = df2.offset(1).limit(1) + checkSortRemoved(paging2) + checkLimitRemoved(paging2) + checkPushedInfo(paging2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [CAST(DEPT AS string)]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 2") + checkAnswer(paging2, Seq(Row("2", 22000.00))) + + val topN2 = df2.limit(1) + checkSortRemoved(topN2) + checkLimitRemoved(topN2) + checkPushedInfo(topN2, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [CAST(DEPT AS string)]", "PushedFilters: []", "PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 1") - checkAnswer(df2, Seq(Row("1", 19000.00))) + checkAnswer(topN2, Seq(Row("1", 19000.00))) val df3 = spark.read .table("h2.test.employee") .groupBy("dept").sum("SALARY") .orderBy($"dept".cast("string")) - .limit(1) - checkSortRemoved(df3) - checkLimitRemoved(df3) - checkPushedInfo(df3, + + val paging3 = df3.offset(1).limit(1) + checkSortRemoved(paging3) + checkLimitRemoved(paging3) + checkPushedInfo(paging3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 2") + checkAnswer(paging3, Seq(Row(2, 22000.00))) + + val topN3 = df3.limit(1) + checkSortRemoved(topN3) + checkLimitRemoved(topN3) + checkPushedInfo(topN3, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", "PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 1") - checkAnswer(df3, Seq(Row(1, 19000.00))) + checkAnswer(topN3, Seq(Row(1, 19000.00))) val df4 = spark.read .table("h2.test.employee") .groupBy("DEPT", "IS_MANAGER").sum("SALARY") .orderBy("DEPT", "IS_MANAGER") - .limit(1) - checkSortRemoved(df4) - checkLimitRemoved(df4) - checkPushedInfo(df4, + + val paging4 = df4.offset(1).limit(1) + checkSortRemoved(paging4) + checkLimitRemoved(paging4) + checkPushedInfo(paging4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2") + checkAnswer(paging4, Seq(Row(1, true, 10000.00))) + + val topN4 = df4.limit(1) + checkSortRemoved(topN4) + checkLimitRemoved(topN4) + checkPushedInfo(topN4, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT, IS_MANAGER]", "PushedFilters: []", "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") - checkAnswer(df4, Seq(Row(1, false, 9000.00))) + checkAnswer(topN4, Seq(Row(1, false, 9000.00))) val df5 = spark.read .table("h2.test.employee") .select($"SALARY", $"IS_MANAGER", $"DEPT".cast("string").as("my_dept")) .groupBy("my_dept", "IS_MANAGER").sum("SALARY") .orderBy("my_dept", "IS_MANAGER") - .limit(1) - checkSortRemoved(df5) - checkLimitRemoved(df5) - checkPushedInfo(df5, + + val paging5 = df5.offset(1).limit(1) + checkSortRemoved(paging5) + checkLimitRemoved(paging5) + checkPushedInfo(paging5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [CAST(DEPT AS string), IS_MANAGER]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: " + + "ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2") + checkAnswer(paging5, Seq(Row("1", true, 10000.00))) + + val topN5 = df5.limit(1) + checkSortRemoved(topN5) + checkLimitRemoved(topN5) + checkPushedInfo(topN5, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [CAST(DEPT AS string), IS_MANAGER]", "PushedFilters: []", "PushedTopN: " + "ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") - checkAnswer(df5, Seq(Row("1", false, 9000.00))) + checkAnswer(topN5, Seq(Row("1", false, 9000.00))) val df6 = spark.read .table("h2.test.employee") .select($"DEPT", $"SALARY") .groupBy("dept").agg(sum("SALARY")) .orderBy(sum("SALARY")) - .limit(1) - checkSortRemoved(df6) - checkLimitRemoved(df6) - checkPushedInfo(df6, - "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT]", - "PushedFilters: []", - "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1") - checkAnswer(df6, Seq(Row(6, 12000.00))) - val df7 = spark.read - .table("h2.test.employee") - .select($"DEPT", $"SALARY") - .groupBy("dept").agg(sum("SALARY").as("total")) - .orderBy("total") - .limit(1) - checkSortRemoved(df7) - checkLimitRemoved(df7) - checkPushedInfo(df7, + val paging6 = df6.offset(1).limit(1) + checkSortRemoved(paging6) + checkLimitRemoved(paging6) + checkPushedInfo(paging6, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", - "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1") - checkAnswer(df7, Seq(Row(6, 12000.00))) - } - - test("scan with aggregate push-down and paging push-down") { - val df1 = spark.read - .table("h2.test.employee") - .groupBy("DEPT").sum("SALARY") - .orderBy("DEPT") - .offset(1) - .limit(1) - checkSortRemoved(df1) - checkLimitRemoved(df1) - checkPushedInfo(df1, - "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT]", - "PushedFilters: []", - "PushedOffset: OFFSET 1", - "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2") - checkAnswer(df1, Seq(Row(2, 22000.00))) - - val df2 = spark.read - .table("h2.test.employee") - .select($"DEPT".cast("string").as("my_dept"), $"SALARY") - .groupBy("my_dept").sum("SALARY") - .orderBy("my_dept") - .offset(1) - .limit(1) - checkSortRemoved(df2) - checkLimitRemoved(df2) - checkPushedInfo(df2, - "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [CAST(DEPT AS string)]", - "PushedFilters: []", - "PushedOffset: OFFSET 1", - "PushedTopN: ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST] LIMIT 2") - checkAnswer(df2, Seq(Row("2", 22000.00))) - - val df3 = spark.read - .table("h2.test.employee") - .select($"DEPT".cast("string").as("my_dept"), $"IS_MANAGER", $"SALARY") - .groupBy("my_dept", "is_manager").sum("SALARY") - .orderBy("my_dept", "is_manager") - .offset(1) - .limit(1) - checkSortRemoved(df3) - checkLimitRemoved(df3) - checkPushedInfo(df3, - "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [CAST(DEPT AS string), IS_MANAGER]", - "PushedFilters: []", "PushedOffset: OFFSET 1", - "PushedTopN: " + - "ORDER BY [CAST(DEPT AS string) ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 2") - checkAnswer(df3, Seq(Row("1", true, 10000.00))) + "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 2") + checkAnswer(paging6, Seq(Row(1, 19000.00))) - val df4 = spark.read - .table("h2.test.employee") - .select($"DEPT", $"SALARY") - .groupBy("dept").agg(sum("SALARY")) - .orderBy(sum("SALARY")) - .offset(1) - .limit(1) - checkSortRemoved(df4) - checkLimitRemoved(df4) - checkPushedInfo(df4, + val topN6 = df6.limit(1) + checkSortRemoved(topN6) + checkLimitRemoved(topN6) + checkPushedInfo(topN6, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", - "PushedOffset: OFFSET 1", - "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 2") - checkAnswer(df4, Seq(Row(1, 19000.00))) + "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1") + checkAnswer(topN6, Seq(Row(6, 12000.00))) - val df5 = spark.read + val df7 = spark.read .table("h2.test.employee") .select($"DEPT", $"SALARY") .groupBy("dept").agg(sum("SALARY").as("total")) .orderBy("total") - .offset(1) - .limit(1) - checkSortRemoved(df5) - checkLimitRemoved(df5) - checkPushedInfo(df5, + + val paging7 = df7.offset(1).limit(1) + checkSortRemoved(paging7) + checkLimitRemoved(paging7) + checkPushedInfo(paging7, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", "PushedOffset: OFFSET 1", "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 2") - checkAnswer(df5, Seq(Row(1, 19000.00))) + checkAnswer(paging7, Seq(Row(1, 19000.00))) - val df6 = spark.read - .table("h2.test.employee") - .select($"DEPT", $"IS_MANAGER", $"SALARY") - .groupBy("dept", "is_manager").sum("SALARY") - .orderBy(when($"is_manager", $"dept").otherwise(0), $"dept") - .offset(1) - .limit(1) - checkSortRemoved(df6) - checkLimitRemoved(df6) - checkPushedInfo(df6, + val topN7 = df7.limit(1) + checkSortRemoved(topN7) + checkLimitRemoved(topN7) + checkPushedInfo(topN7, "PushedAggregates: [SUM(SALARY)]", - "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedGroupByExpressions: [DEPT]", "PushedFilters: []", - "PushedOffset: OFFSET 1", - "PushedTopN: ORDER BY [CASE WHEN IS_MANAGER = true THEN DEPT ELSE 0 END ASC NULLS FIRST, " + - "DEPT ASC NULLS FIRST] LIMIT 2") - checkAnswer(df6, Seq(Row(2, false, 12000.00))) + "PushedTopN: ORDER BY [SUM(SALARY) ASC NULLS FIRST] LIMIT 1") + checkAnswer(topN7, Seq(Row(6, 12000.00))) } test("scan with filter push-down") {