From 4a6f903897d28a3038918997e692410259a90ae3 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 19 Jun 2020 10:36:52 +0800 Subject: [PATCH 01/15] Reuse completeNextStageWithFetchFailure --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..762b14e170fcc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1796,9 +1796,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(1, 0, shuffleDep1) scheduler.resubmitFailedStages() // stage 0, attempt 1 should have the properties of job2 @@ -1872,9 +1870,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the second stage complete normally completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC")) // fail the third stage because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // have DAGScheduler try again @@ -1900,9 +1896,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // complete stage 1 completeShuffleMapStageSuccessfully(1, 0, 1) // pretend stage 2 failed because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. From 181186c6cce9c3b4e3061dc84b667ee898dd3f40 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 29 Oct 2020 11:42:05 +0800 Subject: [PATCH 02/15] Improve the performance for first_value --- .../expressions/aggregate/First.scala | 10 +- .../sql/execution/window/WindowExecBase.scala | 7 + .../resources/sql-tests/inputs/window.sql | 32 +- .../sql-tests/results/window.sql.out | 392 ++++++++++-------- 4 files changed, 258 insertions(+), 183 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala index 65fd43c924d08..1ccf60b9f0e26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala @@ -51,7 +51,7 @@ import org.apache.spark.sql.types._ group = "agg_funcs", since = "2.0.0") case class First(child: Expression, ignoreNulls: Boolean) - extends DeclarativeAggregate with ExpectsInputTypes { + extends DeclarativeAggregate with OffsetWindowSpec with ExpectsInputTypes { def this(child: Expression) = this(child, false) @@ -59,6 +59,14 @@ case class First(child: Expression, ignoreNulls: Boolean) this(child, FirstLast.validateIgnoreNullExpr(ignoreNullsExpr, "first")) } + override val input = child + + override val offset = Literal.create(1, IntegerType) + + override lazy val default = Literal.create(null, input.dataType) + + override val isRelative = false + override def children: Seq[Expression] = child :: Nil override def nullable: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index f0b99c1522aa1..3d7a6104f8bc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -135,6 +135,13 @@ trait WindowExecBase extends UnaryExecNode { case e @ WindowExpression(function, spec) => val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] function match { + case AggregateExpression(f: OffsetWindowSpec, _, _, _, _) if !f.ignoreNulls && + frame.frameType == RowFrame && frame.lower == UnboundedPreceding => + frame.upper match { + case UnboundedFollowing => collect("UNBOUNDED_OFFSET", f.fakeFrame, e, f) + case CurrentRow => collect("UNBOUNDED_PRECEDING_OFFSET", f.fakeFrame, e, f) + case _ => collect("AGGREGATE", frame, e, f) + } case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", frame, e, f) case f: FrameLessOffsetWindowFunction => collect("FRAME_LESS_OFFSET", frame, e, f) case f: OffsetWindowSpec if !f.ignoreNulls && diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql index c1be5fb27e6fa..83ab4e2e7bbf5 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/window.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql @@ -146,10 +146,11 @@ SELECT val, cate, count(val) FILTER (WHERE val > 1) OVER(PARTITION BY cate) FROM testData ORDER BY cate, val; --- nth_value() over () +-- nth_value()/first_value() over () SELECT employee_name, salary, + first_value(employee_name) OVER (ORDER BY salary DESC) highest_salary, nth_value(employee_name, 2) OVER (ORDER BY salary DESC) second_highest_salary FROM basic_pays @@ -158,6 +159,9 @@ ORDER BY salary DESC; SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary @@ -168,6 +172,9 @@ ORDER BY salary DESC; SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary @@ -178,6 +185,9 @@ ORDER BY salary DESC; SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary + RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) second_highest_salary @@ -188,6 +198,9 @@ ORDER BY salary; SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) second_highest_salary @@ -198,6 +211,9 @@ ORDER BY salary DESC; SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) second_highest_salary @@ -208,6 +224,9 @@ ORDER BY salary DESC; SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary @@ -218,6 +237,9 @@ ORDER BY salary DESC; SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary @@ -228,6 +250,9 @@ ORDER BY salary DESC; SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary @@ -239,6 +264,11 @@ SELECT employee_name, department, salary, + FIRST_VALUE(employee_name) OVER ( + PARTITION BY department + ORDER BY salary DESC + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) highest_salary, NTH_VALUE(employee_name, 2) OVER ( PARTITION BY department ORDER BY salary DESC diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out index f6506a77e239c..b3d0a47c74243 100644 --- a/sql/core/src/test/resources/sql-tests/results/window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out @@ -421,36 +421,40 @@ window aggregate function with filter predicate is not supported yet.; SELECT employee_name, salary, + first_value(employee_name) OVER (ORDER BY salary DESC) highest_salary, nth_value(employee_name, 2) OVER (ORDER BY salary DESC) second_highest_salary FROM basic_pays ORDER BY salary DESC -- !query schema -struct +struct -- !query output -Larry Bott 11798 NULL -Gerard Bondur 11472 Gerard Bondur -Pamela Castillo 11303 Gerard Bondur -Barry Jones 10586 Gerard Bondur -George Vanauf 10563 Gerard Bondur -Loui Bondur 10449 Gerard Bondur -Mary Patterson 9998 Gerard Bondur -Steve Patterson 9441 Gerard Bondur -Julie Firrelli 9181 Gerard Bondur -Jeff Firrelli 8992 Gerard Bondur -William Patterson 8870 Gerard Bondur -Diane Murphy 8435 Gerard Bondur -Leslie Jennings 8113 Gerard Bondur -Gerard Hernandez 6949 Gerard Bondur -Foon Yue Tseng 6660 Gerard Bondur -Anthony Bow 6627 Gerard Bondur -Leslie Thompson 5186 Gerard Bondur +Larry Bott 11798 Larry Bott NULL +Gerard Bondur 11472 Larry Bott Gerard Bondur +Pamela Castillo 11303 Larry Bott Gerard Bondur +Barry Jones 10586 Larry Bott Gerard Bondur +George Vanauf 10563 Larry Bott Gerard Bondur +Loui Bondur 10449 Larry Bott Gerard Bondur +Mary Patterson 9998 Larry Bott Gerard Bondur +Steve Patterson 9441 Larry Bott Gerard Bondur +Julie Firrelli 9181 Larry Bott Gerard Bondur +Jeff Firrelli 8992 Larry Bott Gerard Bondur +William Patterson 8870 Larry Bott Gerard Bondur +Diane Murphy 8435 Larry Bott Gerard Bondur +Leslie Jennings 8113 Larry Bott Gerard Bondur +Gerard Hernandez 6949 Larry Bott Gerard Bondur +Foon Yue Tseng 6660 Larry Bott Gerard Bondur +Anthony Bow 6627 Larry Bott Gerard Bondur +Leslie Thompson 5186 Larry Bott Gerard Bondur -- !query SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary @@ -458,31 +462,34 @@ FROM basic_pays ORDER BY salary DESC -- !query schema -struct +struct -- !query output -Larry Bott 11798 NULL -Gerard Bondur 11472 Gerard Bondur -Pamela Castillo 11303 Gerard Bondur -Barry Jones 10586 Gerard Bondur -George Vanauf 10563 Gerard Bondur -Loui Bondur 10449 Gerard Bondur -Mary Patterson 9998 Gerard Bondur -Steve Patterson 9441 Gerard Bondur -Julie Firrelli 9181 Gerard Bondur -Jeff Firrelli 8992 Gerard Bondur -William Patterson 8870 Gerard Bondur -Diane Murphy 8435 Gerard Bondur -Leslie Jennings 8113 Gerard Bondur -Gerard Hernandez 6949 Gerard Bondur -Foon Yue Tseng 6660 Gerard Bondur -Anthony Bow 6627 Gerard Bondur -Leslie Thompson 5186 Gerard Bondur +Larry Bott 11798 Larry Bott NULL +Gerard Bondur 11472 Larry Bott Gerard Bondur +Pamela Castillo 11303 Larry Bott Gerard Bondur +Barry Jones 10586 Larry Bott Gerard Bondur +George Vanauf 10563 Larry Bott Gerard Bondur +Loui Bondur 10449 Larry Bott Gerard Bondur +Mary Patterson 9998 Larry Bott Gerard Bondur +Steve Patterson 9441 Larry Bott Gerard Bondur +Julie Firrelli 9181 Larry Bott Gerard Bondur +Jeff Firrelli 8992 Larry Bott Gerard Bondur +William Patterson 8870 Larry Bott Gerard Bondur +Diane Murphy 8435 Larry Bott Gerard Bondur +Leslie Jennings 8113 Larry Bott Gerard Bondur +Gerard Hernandez 6949 Larry Bott Gerard Bondur +Foon Yue Tseng 6660 Larry Bott Gerard Bondur +Anthony Bow 6627 Larry Bott Gerard Bondur +Leslie Thompson 5186 Larry Bott Gerard Bondur -- !query SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary @@ -490,31 +497,34 @@ FROM basic_pays ORDER BY salary DESC -- !query schema -struct +struct -- !query output -Larry Bott 11798 NULL -Gerard Bondur 11472 Gerard Bondur -Pamela Castillo 11303 Gerard Bondur -Barry Jones 10586 Gerard Bondur -George Vanauf 10563 Gerard Bondur -Loui Bondur 10449 Gerard Bondur -Mary Patterson 9998 Gerard Bondur -Steve Patterson 9441 Gerard Bondur -Julie Firrelli 9181 Gerard Bondur -Jeff Firrelli 8992 Gerard Bondur -William Patterson 8870 Gerard Bondur -Diane Murphy 8435 Gerard Bondur -Leslie Jennings 8113 Gerard Bondur -Gerard Hernandez 6949 Gerard Bondur -Foon Yue Tseng 6660 Gerard Bondur -Anthony Bow 6627 Gerard Bondur -Leslie Thompson 5186 Gerard Bondur +Larry Bott 11798 Larry Bott NULL +Gerard Bondur 11472 Larry Bott Gerard Bondur +Pamela Castillo 11303 Larry Bott Gerard Bondur +Barry Jones 10586 Larry Bott Gerard Bondur +George Vanauf 10563 Larry Bott Gerard Bondur +Loui Bondur 10449 Larry Bott Gerard Bondur +Mary Patterson 9998 Larry Bott Gerard Bondur +Steve Patterson 9441 Larry Bott Gerard Bondur +Julie Firrelli 9181 Larry Bott Gerard Bondur +Jeff Firrelli 8992 Larry Bott Gerard Bondur +William Patterson 8870 Larry Bott Gerard Bondur +Diane Murphy 8435 Larry Bott Gerard Bondur +Leslie Jennings 8113 Larry Bott Gerard Bondur +Gerard Hernandez 6949 Larry Bott Gerard Bondur +Foon Yue Tseng 6660 Larry Bott Gerard Bondur +Anthony Bow 6627 Larry Bott Gerard Bondur +Leslie Thompson 5186 Larry Bott Gerard Bondur -- !query SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary + RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) second_highest_salary @@ -522,31 +532,34 @@ FROM basic_pays ORDER BY salary -- !query schema -struct +struct -- !query output -Leslie Thompson 5186 NULL -Anthony Bow 6627 Anthony Bow -Foon Yue Tseng 6660 Anthony Bow -Gerard Hernandez 6949 Anthony Bow -Leslie Jennings 8113 Foon Yue Tseng -Diane Murphy 8435 Foon Yue Tseng -William Patterson 8870 Leslie Jennings -Jeff Firrelli 8992 Diane Murphy -Julie Firrelli 9181 Diane Murphy -Steve Patterson 9441 Diane Murphy -Mary Patterson 9998 Diane Murphy -Loui Bondur 10449 Jeff Firrelli -George Vanauf 10563 Jeff Firrelli -Barry Jones 10586 Jeff Firrelli -Pamela Castillo 11303 Mary Patterson -Gerard Bondur 11472 Loui Bondur -Larry Bott 11798 Loui Bondur +Leslie Thompson 5186 Leslie Thompson NULL +Anthony Bow 6627 Leslie Thompson Anthony Bow +Foon Yue Tseng 6660 Leslie Thompson Anthony Bow +Gerard Hernandez 6949 Leslie Thompson Anthony Bow +Leslie Jennings 8113 Anthony Bow Foon Yue Tseng +Diane Murphy 8435 Anthony Bow Foon Yue Tseng +William Patterson 8870 Gerard Hernandez Leslie Jennings +Jeff Firrelli 8992 Leslie Jennings Diane Murphy +Julie Firrelli 9181 Leslie Jennings Diane Murphy +Steve Patterson 9441 Leslie Jennings Diane Murphy +Mary Patterson 9998 Leslie Jennings Diane Murphy +Loui Bondur 10449 William Patterson Jeff Firrelli +George Vanauf 10563 William Patterson Jeff Firrelli +Barry Jones 10586 William Patterson Jeff Firrelli +Pamela Castillo 11303 Steve Patterson Mary Patterson +Gerard Bondur 11472 Mary Patterson Loui Bondur +Larry Bott 11798 Mary Patterson Loui Bondur -- !query SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) second_highest_salary @@ -554,31 +567,34 @@ FROM basic_pays ORDER BY salary DESC -- !query schema -struct +struct -- !query output -Larry Bott 11798 Gerard Bondur -Gerard Bondur 11472 Gerard Bondur -Pamela Castillo 11303 Gerard Bondur -Barry Jones 10586 Pamela Castillo -George Vanauf 10563 Barry Jones -Loui Bondur 10449 George Vanauf -Mary Patterson 9998 Loui Bondur -Steve Patterson 9441 Mary Patterson -Julie Firrelli 9181 Steve Patterson -Jeff Firrelli 8992 Julie Firrelli -William Patterson 8870 Jeff Firrelli -Diane Murphy 8435 William Patterson -Leslie Jennings 8113 Diane Murphy -Gerard Hernandez 6949 Leslie Jennings -Foon Yue Tseng 6660 Gerard Hernandez -Anthony Bow 6627 Foon Yue Tseng -Leslie Thompson 5186 Anthony Bow +Larry Bott 11798 Larry Bott Gerard Bondur +Gerard Bondur 11472 Larry Bott Gerard Bondur +Pamela Castillo 11303 Larry Bott Gerard Bondur +Barry Jones 10586 Gerard Bondur Pamela Castillo +George Vanauf 10563 Pamela Castillo Barry Jones +Loui Bondur 10449 Barry Jones George Vanauf +Mary Patterson 9998 George Vanauf Loui Bondur +Steve Patterson 9441 Loui Bondur Mary Patterson +Julie Firrelli 9181 Mary Patterson Steve Patterson +Jeff Firrelli 8992 Steve Patterson Julie Firrelli +William Patterson 8870 Julie Firrelli Jeff Firrelli +Diane Murphy 8435 Jeff Firrelli William Patterson +Leslie Jennings 8113 William Patterson Diane Murphy +Gerard Hernandez 6949 Diane Murphy Leslie Jennings +Foon Yue Tseng 6660 Leslie Jennings Gerard Hernandez +Anthony Bow 6627 Gerard Hernandez Foon Yue Tseng +Leslie Thompson 5186 Foon Yue Tseng Anthony Bow -- !query SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) second_highest_salary @@ -586,31 +602,34 @@ FROM basic_pays ORDER BY salary DESC -- !query schema -struct +struct -- !query output -Larry Bott 11798 Gerard Bondur -Gerard Bondur 11472 Pamela Castillo -Pamela Castillo 11303 Barry Jones -Barry Jones 10586 George Vanauf -George Vanauf 10563 Loui Bondur -Loui Bondur 10449 Mary Patterson -Mary Patterson 9998 Steve Patterson -Steve Patterson 9441 Julie Firrelli -Julie Firrelli 9181 Jeff Firrelli -Jeff Firrelli 8992 William Patterson -William Patterson 8870 Diane Murphy -Diane Murphy 8435 Leslie Jennings -Leslie Jennings 8113 Gerard Hernandez -Gerard Hernandez 6949 Foon Yue Tseng -Foon Yue Tseng 6660 Anthony Bow -Anthony Bow 6627 Leslie Thompson -Leslie Thompson 5186 NULL +Larry Bott 11798 Larry Bott Gerard Bondur +Gerard Bondur 11472 Gerard Bondur Pamela Castillo +Pamela Castillo 11303 Pamela Castillo Barry Jones +Barry Jones 10586 Barry Jones George Vanauf +George Vanauf 10563 George Vanauf Loui Bondur +Loui Bondur 10449 Loui Bondur Mary Patterson +Mary Patterson 9998 Mary Patterson Steve Patterson +Steve Patterson 9441 Steve Patterson Julie Firrelli +Julie Firrelli 9181 Julie Firrelli Jeff Firrelli +Jeff Firrelli 8992 Jeff Firrelli William Patterson +William Patterson 8870 William Patterson Diane Murphy +Diane Murphy 8435 Diane Murphy Leslie Jennings +Leslie Jennings 8113 Leslie Jennings Gerard Hernandez +Gerard Hernandez 6949 Gerard Hernandez Foon Yue Tseng +Foon Yue Tseng 6660 Foon Yue Tseng Anthony Bow +Anthony Bow 6627 Anthony Bow Leslie Thompson +Leslie Thompson 5186 Leslie Thompson NULL -- !query SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary @@ -618,31 +637,34 @@ FROM basic_pays ORDER BY salary DESC -- !query schema -struct +struct -- !query output -Larry Bott 11798 Gerard Bondur -Gerard Bondur 11472 Gerard Bondur -Pamela Castillo 11303 Gerard Bondur -Barry Jones 10586 Gerard Bondur -George Vanauf 10563 Gerard Bondur -Loui Bondur 10449 Gerard Bondur -Mary Patterson 9998 Gerard Bondur -Steve Patterson 9441 Gerard Bondur -Julie Firrelli 9181 Gerard Bondur -Jeff Firrelli 8992 Gerard Bondur -William Patterson 8870 Gerard Bondur -Diane Murphy 8435 Gerard Bondur -Leslie Jennings 8113 Gerard Bondur -Gerard Hernandez 6949 Gerard Bondur -Foon Yue Tseng 6660 Gerard Bondur -Anthony Bow 6627 Gerard Bondur -Leslie Thompson 5186 Gerard Bondur +Larry Bott 11798 Larry Bott Gerard Bondur +Gerard Bondur 11472 Larry Bott Gerard Bondur +Pamela Castillo 11303 Larry Bott Gerard Bondur +Barry Jones 10586 Larry Bott Gerard Bondur +George Vanauf 10563 Larry Bott Gerard Bondur +Loui Bondur 10449 Larry Bott Gerard Bondur +Mary Patterson 9998 Larry Bott Gerard Bondur +Steve Patterson 9441 Larry Bott Gerard Bondur +Julie Firrelli 9181 Larry Bott Gerard Bondur +Jeff Firrelli 8992 Larry Bott Gerard Bondur +William Patterson 8870 Larry Bott Gerard Bondur +Diane Murphy 8435 Larry Bott Gerard Bondur +Leslie Jennings 8113 Larry Bott Gerard Bondur +Gerard Hernandez 6949 Larry Bott Gerard Bondur +Foon Yue Tseng 6660 Larry Bott Gerard Bondur +Anthony Bow 6627 Larry Bott Gerard Bondur +Leslie Thompson 5186 Larry Bott Gerard Bondur -- !query SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary @@ -650,31 +672,34 @@ FROM basic_pays ORDER BY salary DESC -- !query schema -struct +struct -- !query output -Larry Bott 11798 Gerard Bondur -Gerard Bondur 11472 Gerard Bondur -Pamela Castillo 11303 Gerard Bondur -Barry Jones 10586 Gerard Bondur -George Vanauf 10563 Gerard Bondur -Loui Bondur 10449 Gerard Bondur -Mary Patterson 9998 Gerard Bondur -Steve Patterson 9441 Gerard Bondur -Julie Firrelli 9181 Gerard Bondur -Jeff Firrelli 8992 Gerard Bondur -William Patterson 8870 Gerard Bondur -Diane Murphy 8435 Gerard Bondur -Leslie Jennings 8113 Gerard Bondur -Gerard Hernandez 6949 Gerard Bondur -Foon Yue Tseng 6660 Gerard Bondur -Anthony Bow 6627 Gerard Bondur -Leslie Thompson 5186 Gerard Bondur +Larry Bott 11798 Larry Bott Gerard Bondur +Gerard Bondur 11472 Larry Bott Gerard Bondur +Pamela Castillo 11303 Larry Bott Gerard Bondur +Barry Jones 10586 Larry Bott Gerard Bondur +George Vanauf 10563 Larry Bott Gerard Bondur +Loui Bondur 10449 Larry Bott Gerard Bondur +Mary Patterson 9998 Larry Bott Gerard Bondur +Steve Patterson 9441 Larry Bott Gerard Bondur +Julie Firrelli 9181 Larry Bott Gerard Bondur +Jeff Firrelli 8992 Larry Bott Gerard Bondur +William Patterson 8870 Larry Bott Gerard Bondur +Diane Murphy 8435 Larry Bott Gerard Bondur +Leslie Jennings 8113 Larry Bott Gerard Bondur +Gerard Hernandez 6949 Larry Bott Gerard Bondur +Foon Yue Tseng 6660 Larry Bott Gerard Bondur +Anthony Bow 6627 Larry Bott Gerard Bondur +Leslie Thompson 5186 Larry Bott Gerard Bondur -- !query SELECT employee_name, salary, + first_value(employee_name) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) highest_salary, nth_value(employee_name, 2) OVER ( ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary @@ -682,25 +707,25 @@ FROM basic_pays ORDER BY salary DESC -- !query schema -struct +struct -- !query output -Larry Bott 11798 Gerard Bondur -Gerard Bondur 11472 Gerard Bondur -Pamela Castillo 11303 Gerard Bondur -Barry Jones 10586 Gerard Bondur -George Vanauf 10563 Gerard Bondur -Loui Bondur 10449 Gerard Bondur -Mary Patterson 9998 Gerard Bondur -Steve Patterson 9441 Gerard Bondur -Julie Firrelli 9181 Gerard Bondur -Jeff Firrelli 8992 Gerard Bondur -William Patterson 8870 Gerard Bondur -Diane Murphy 8435 Gerard Bondur -Leslie Jennings 8113 Gerard Bondur -Gerard Hernandez 6949 Gerard Bondur -Foon Yue Tseng 6660 Gerard Bondur -Anthony Bow 6627 Gerard Bondur -Leslie Thompson 5186 Gerard Bondur +Larry Bott 11798 Larry Bott Gerard Bondur +Gerard Bondur 11472 Larry Bott Gerard Bondur +Pamela Castillo 11303 Larry Bott Gerard Bondur +Barry Jones 10586 Larry Bott Gerard Bondur +George Vanauf 10563 Larry Bott Gerard Bondur +Loui Bondur 10449 Larry Bott Gerard Bondur +Mary Patterson 9998 Larry Bott Gerard Bondur +Steve Patterson 9441 Larry Bott Gerard Bondur +Julie Firrelli 9181 Larry Bott Gerard Bondur +Jeff Firrelli 8992 Larry Bott Gerard Bondur +William Patterson 8870 Larry Bott Gerard Bondur +Diane Murphy 8435 Larry Bott Gerard Bondur +Leslie Jennings 8113 Larry Bott Gerard Bondur +Gerard Hernandez 6949 Larry Bott Gerard Bondur +Foon Yue Tseng 6660 Larry Bott Gerard Bondur +Anthony Bow 6627 Larry Bott Gerard Bondur +Leslie Thompson 5186 Larry Bott Gerard Bondur -- !query @@ -708,6 +733,11 @@ SELECT employee_name, department, salary, + FIRST_VALUE(employee_name) OVER ( + PARTITION BY department + ORDER BY salary DESC + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) highest_salary, NTH_VALUE(employee_name, 2) OVER ( PARTITION BY department ORDER BY salary DESC @@ -717,22 +747,22 @@ FROM basic_pays ORDER BY department -- !query schema -struct --- !query output -Gerard Bondur Accounting 11472 Mary Patterson -Mary Patterson Accounting 9998 Mary Patterson -Jeff Firrelli Accounting 8992 Mary Patterson -William Patterson Accounting 8870 Mary Patterson -Diane Murphy Accounting 8435 Mary Patterson -Anthony Bow Accounting 6627 Mary Patterson -Leslie Jennings IT 8113 Leslie Thompson -Leslie Thompson IT 5186 Leslie Thompson -Larry Bott SCM 11798 Pamela Castillo -Pamela Castillo SCM 11303 Pamela Castillo -Barry Jones SCM 10586 Pamela Castillo -Loui Bondur SCM 10449 Pamela Castillo -Gerard Hernandez SCM 6949 Pamela Castillo -George Vanauf Sales 10563 Steve Patterson -Steve Patterson Sales 9441 Steve Patterson -Julie Firrelli Sales 9181 Steve Patterson -Foon Yue Tseng Sales 6660 Steve Patterson \ No newline at end of file +struct +-- !query output +Gerard Bondur Accounting 11472 Gerard Bondur Mary Patterson +Mary Patterson Accounting 9998 Gerard Bondur Mary Patterson +Jeff Firrelli Accounting 8992 Gerard Bondur Mary Patterson +William Patterson Accounting 8870 Gerard Bondur Mary Patterson +Diane Murphy Accounting 8435 Gerard Bondur Mary Patterson +Anthony Bow Accounting 6627 Gerard Bondur Mary Patterson +Leslie Jennings IT 8113 Leslie Jennings Leslie Thompson +Leslie Thompson IT 5186 Leslie Jennings Leslie Thompson +Larry Bott SCM 11798 Larry Bott Pamela Castillo +Pamela Castillo SCM 11303 Larry Bott Pamela Castillo +Barry Jones SCM 10586 Larry Bott Pamela Castillo +Loui Bondur SCM 10449 Larry Bott Pamela Castillo +Gerard Hernandez SCM 6949 Larry Bott Pamela Castillo +George Vanauf Sales 10563 George Vanauf Steve Patterson +Steve Patterson Sales 9441 George Vanauf Steve Patterson +Julie Firrelli Sales 9181 George Vanauf Steve Patterson +Foon Yue Tseng Sales 6660 George Vanauf Steve Patterson \ No newline at end of file From 879d6c7687e57004cc7f5925e53afb11e2064f9f Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 6 Nov 2020 16:48:48 +0800 Subject: [PATCH 03/15] Reactor code --- .../spark/sql/catalyst/analysis/Analyzer.scala | 13 +++++++++++++ .../sql/catalyst/expressions/aggregate/First.scala | 10 +--------- .../spark/sql/execution/window/WindowExecBase.scala | 7 ------- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f0143fdb23473..ac7f73f28fd7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -242,6 +242,7 @@ class Analyzer( ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: + WindowFirstSubstitution :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ResolveOutputRelation :: @@ -2979,6 +2980,18 @@ class Analyzer( } } + /** + * Substitute the aggregate expression which uses [[First]] as the aggregate function + * in the window with the window function [[NthValue]]. + */ + object WindowFirstSubstitution extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { + case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), _) => + we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls)) + case other => other + } + } + /** * Check and add proper window frames for all window functions. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala index 1ccf60b9f0e26..65fd43c924d08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala @@ -51,7 +51,7 @@ import org.apache.spark.sql.types._ group = "agg_funcs", since = "2.0.0") case class First(child: Expression, ignoreNulls: Boolean) - extends DeclarativeAggregate with OffsetWindowSpec with ExpectsInputTypes { + extends DeclarativeAggregate with ExpectsInputTypes { def this(child: Expression) = this(child, false) @@ -59,14 +59,6 @@ case class First(child: Expression, ignoreNulls: Boolean) this(child, FirstLast.validateIgnoreNullExpr(ignoreNullsExpr, "first")) } - override val input = child - - override val offset = Literal.create(1, IntegerType) - - override lazy val default = Literal.create(null, input.dataType) - - override val isRelative = false - override def children: Seq[Expression] = child :: Nil override def nullable: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index 8ff937fca0ff9..a6a3f3d7384bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -135,13 +135,6 @@ trait WindowExecBase extends UnaryExecNode { case e @ WindowExpression(function, spec) => val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] function match { - case AggregateExpression(f: OffsetWindowSpec, _, _, _, _) if !f.ignoreNulls && - frame.frameType == RowFrame && frame.lower == UnboundedPreceding => - frame.upper match { - case UnboundedFollowing => collect("UNBOUNDED_OFFSET", f.fakeFrame, e, f) - case CurrentRow => collect("UNBOUNDED_PRECEDING_OFFSET", f.fakeFrame, e, f) - case _ => collect("AGGREGATE", frame, e, f) - } case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", frame, e, f) case f: FrameLessOffsetWindowFunction => collect("FRAME_LESS_OFFSET", f.fakeFrame, e, f) From 57c7ef1fe17fe7448505f8b1976f153b275125af Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 9 Nov 2020 10:33:44 +0800 Subject: [PATCH 04/15] Fix compatible --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ac7f73f28fd7c..e26c6a538ef6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2986,7 +2986,8 @@ class Analyzer( */ object WindowFirstSubstitution extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { - case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), _) => + case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), spec) + if !spec.orderSpec.isEmpty => we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls)) case other => other } From 7b99d2720b38e5f9a67a9c0810cde23b6e1ae797 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 9 Nov 2020 10:45:26 +0800 Subject: [PATCH 05/15] Add test case. --- .../sql/catalyst/analysis/AnalysisSuite.scala | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 37dcee1e59ee8..a92ba07988a1a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, First, Sum} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan import org.apache.spark.sql.catalyst.plans.{Cross, Inner} import org.apache.spark.sql.catalyst.plans.logical._ @@ -727,6 +727,24 @@ class AnalysisSuite extends AnalysisTest with Matchers { "window expressions are not allowed in observed metrics, but found") } + test("check WindowFirstSubstitution") { + val a = testRelation.output.head + val inputPlan = testRelation.select( + WindowExpression( + First(a, false).toAggregateExpression(), + WindowSpecDefinition(Nil, a.asc :: Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))).as("window")) + val inputPlan2 = testRelation.select( + WindowExpression( + NthValue(a, Literal(1), false), + WindowSpecDefinition(Nil, a.asc :: Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))).as("window")) + val analyzer = getAnalyzer + val actualPlan = analyzer.execute(inputPlan) + val expectedPlan = analyzer.execute(inputPlan2) + comparePlans(actualPlan, expectedPlan) + } + test("check CollectMetrics duplicates") { val a = testRelation.output.head val sum = Sum(a).toAggregateExpression().as("sum") From 0c953ff5e8b249debcf0fe2050840afdbb176ad8 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 9 Nov 2020 14:19:15 +0800 Subject: [PATCH 06/15] Update golden files. --- .../sql-tests/results/postgreSQL/window_part1.sql.out | 2 +- .../sql-tests/results/postgreSQL/window_part2.sql.out | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part1.sql.out index 76567b689445a..5446e42343c50 100755 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part1.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part1.sql.out @@ -270,7 +270,7 @@ struct +struct -- !query output 0 0 0 0 0 0 diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out index ccddf9db172a6..c9207fe0f29bf 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out @@ -101,7 +101,7 @@ from window w as (order by ss.id asc nulls first range between 2 preceding and 2 following) -- !query schema -struct +struct -- !query output 1 1 1 3 2 2 1 4 @@ -123,7 +123,7 @@ from window w as (order by ss.id asc nulls last range between 2 preceding and 2 following) -- !query schema -struct +struct -- !query output 1 1 1 3 2 2 1 4 @@ -145,7 +145,7 @@ from window w as (order by ss.id desc nulls first range between 2 preceding and 2 following) -- !query schema -struct +struct -- !query output 1 1 3 1 2 2 4 1 @@ -167,7 +167,7 @@ from window w as (order by ss.id desc nulls last range between 2 preceding and 2 following) -- !query schema -struct +struct -- !query output 1 1 3 1 2 2 4 1 From 2f3fbda5857d861021aa272d3dec085def4f638b Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 10 Nov 2020 10:40:10 +0800 Subject: [PATCH 07/15] Update golden files --- .../results/postgreSQL/window_part2.sql.out | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out index c9207fe0f29bf..5cb4b0d7c783f 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out @@ -272,7 +272,7 @@ from numerics window w as (order by f_float4 range between 1 preceding and 1 following) -- !query schema -struct +struct -- !query output 1 -3.0 1 1 2 -1.0 2 3 @@ -289,7 +289,7 @@ from numerics window w as (order by f_float4 range between 1 preceding and 1.1 following) -- !query schema -struct +struct -- !query output 1 -3.0 1 1 2 -1.0 2 3 @@ -306,7 +306,7 @@ from numerics window w as (order by f_float4 range between 'inf' preceding and 'inf' following) -- !query schema -struct +struct -- !query output 1 -3.0 1 7 2 -1.0 1 7 @@ -323,7 +323,7 @@ from numerics window w as (order by f_float4 range between 1.1 preceding and 'NaN' following) -- !query schema -struct +struct -- !query output 1 -3.0 1 7 2 -1.0 2 7 @@ -340,7 +340,7 @@ from numerics window w as (order by f_float8 range between 1 preceding and 1 following) -- !query schema -struct +struct -- !query output 1 -3.0 1 1 2 -1.0 2 3 @@ -357,7 +357,7 @@ from numerics window w as (order by f_float8 range between 1 preceding and 1.1 following) -- !query schema -struct +struct -- !query output 1 -3.0 1 1 2 -1.0 2 3 @@ -374,7 +374,7 @@ from numerics window w as (order by f_float8 range between 'inf' preceding and 'inf' following) -- !query schema -struct +struct -- !query output 1 -3.0 1 7 2 -1.0 1 7 @@ -391,7 +391,7 @@ from numerics window w as (order by f_float8 range between 1.1 preceding and 'NaN' following) -- !query schema -struct +struct -- !query output 1 -3.0 1 7 2 -1.0 2 7 @@ -425,7 +425,7 @@ from numerics window w as (order by f_numeric range between 1 preceding and 1.1 following) -- !query schema -struct +struct -- !query output 1 -3 1 1 2 -1 2 3 @@ -442,7 +442,7 @@ from numerics window w as (order by f_numeric range between 1 preceding and 1.1 following) -- !query schema -struct +struct -- !query output 1 -3 1 1 2 -1 2 3 From 7c4dcdcbbd053107d7d442d31e9abb59254d4cc3 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 11 Nov 2020 17:42:40 +0800 Subject: [PATCH 08/15] Optimize code --- .../sql/catalyst/analysis/Analyzer.scala | 14 ---------- .../sql/catalyst/optimizer/Optimizer.scala | 13 +++++++++ .../sql/catalyst/analysis/AnalysisSuite.scala | 18 ------------ .../OptimizeWindowFunctionsSuite.scala | 5 ++++ .../results/postgreSQL/window_part1.sql.out | 2 +- .../results/postgreSQL/window_part2.sql.out | 28 +++++++++---------- 6 files changed, 33 insertions(+), 47 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e26c6a538ef6b..f0143fdb23473 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -242,7 +242,6 @@ class Analyzer( ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: - WindowFirstSubstitution :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ResolveOutputRelation :: @@ -2980,19 +2979,6 @@ class Analyzer( } } - /** - * Substitute the aggregate expression which uses [[First]] as the aggregate function - * in the window with the window function [[NthValue]]. - */ - object WindowFirstSubstitution extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { - case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), spec) - if !spec.orderSpec.isEmpty => - we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls)) - case other => other - } - } - /** * Check and add proper window frames for all window functions. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9519a56c2817a..34cf0477160c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -82,6 +82,7 @@ abstract class Optimizer(catalogManager: CatalogManager) // Operator combine CollapseRepartition, CollapseProject, + OptimizeWindowFunctions, CollapseWindow, CombineFilters, CombineLimits, @@ -806,6 +807,18 @@ object CollapseRepartition extends Rule[LogicalPlan] { } } +/** + * Substitute the aggregate expression which uses [[First]] as the aggregate function + * in the window with the window function [[NthValue]]. + */ +object OptimizeWindowFunctions extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { + case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), spec) + if !spec.orderSpec.isEmpty => + we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls)) + } +} + /** * Collapse Adjacent Window Expression. * - If the partition specs and order specs are the same and the window expression are diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index a92ba07988a1a..43076e1ecc4dd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -727,24 +727,6 @@ class AnalysisSuite extends AnalysisTest with Matchers { "window expressions are not allowed in observed metrics, but found") } - test("check WindowFirstSubstitution") { - val a = testRelation.output.head - val inputPlan = testRelation.select( - WindowExpression( - First(a, false).toAggregateExpression(), - WindowSpecDefinition(Nil, a.asc :: Nil, - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))).as("window")) - val inputPlan2 = testRelation.select( - WindowExpression( - NthValue(a, Literal(1), false), - WindowSpecDefinition(Nil, a.asc :: Nil, - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))).as("window")) - val analyzer = getAnalyzer - val actualPlan = analyzer.execute(inputPlan) - val expectedPlan = analyzer.execute(inputPlan2) - comparePlans(actualPlan, expectedPlan) - } - test("check CollectMetrics duplicates") { val a = testRelation.output.head val sum = Sum(a).toAggregateExpression().as("sum") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala new file mode 100644 index 0000000000000..0153a3c9897d6 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala @@ -0,0 +1,5 @@ +package org.apache.spark.sql.catalyst.optimizer + +class OptimizeWindowFunctionsSuite { + +} diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part1.sql.out index 5446e42343c50..76567b689445a 100755 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part1.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part1.sql.out @@ -270,7 +270,7 @@ struct +struct -- !query output 0 0 0 0 0 0 diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out index 5cb4b0d7c783f..ccddf9db172a6 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part2.sql.out @@ -101,7 +101,7 @@ from window w as (order by ss.id asc nulls first range between 2 preceding and 2 following) -- !query schema -struct +struct -- !query output 1 1 1 3 2 2 1 4 @@ -123,7 +123,7 @@ from window w as (order by ss.id asc nulls last range between 2 preceding and 2 following) -- !query schema -struct +struct -- !query output 1 1 1 3 2 2 1 4 @@ -145,7 +145,7 @@ from window w as (order by ss.id desc nulls first range between 2 preceding and 2 following) -- !query schema -struct +struct -- !query output 1 1 3 1 2 2 4 1 @@ -167,7 +167,7 @@ from window w as (order by ss.id desc nulls last range between 2 preceding and 2 following) -- !query schema -struct +struct -- !query output 1 1 3 1 2 2 4 1 @@ -272,7 +272,7 @@ from numerics window w as (order by f_float4 range between 1 preceding and 1 following) -- !query schema -struct +struct -- !query output 1 -3.0 1 1 2 -1.0 2 3 @@ -289,7 +289,7 @@ from numerics window w as (order by f_float4 range between 1 preceding and 1.1 following) -- !query schema -struct +struct -- !query output 1 -3.0 1 1 2 -1.0 2 3 @@ -306,7 +306,7 @@ from numerics window w as (order by f_float4 range between 'inf' preceding and 'inf' following) -- !query schema -struct +struct -- !query output 1 -3.0 1 7 2 -1.0 1 7 @@ -323,7 +323,7 @@ from numerics window w as (order by f_float4 range between 1.1 preceding and 'NaN' following) -- !query schema -struct +struct -- !query output 1 -3.0 1 7 2 -1.0 2 7 @@ -340,7 +340,7 @@ from numerics window w as (order by f_float8 range between 1 preceding and 1 following) -- !query schema -struct +struct -- !query output 1 -3.0 1 1 2 -1.0 2 3 @@ -357,7 +357,7 @@ from numerics window w as (order by f_float8 range between 1 preceding and 1.1 following) -- !query schema -struct +struct -- !query output 1 -3.0 1 1 2 -1.0 2 3 @@ -374,7 +374,7 @@ from numerics window w as (order by f_float8 range between 'inf' preceding and 'inf' following) -- !query schema -struct +struct -- !query output 1 -3.0 1 7 2 -1.0 1 7 @@ -391,7 +391,7 @@ from numerics window w as (order by f_float8 range between 1.1 preceding and 'NaN' following) -- !query schema -struct +struct -- !query output 1 -3.0 1 7 2 -1.0 2 7 @@ -425,7 +425,7 @@ from numerics window w as (order by f_numeric range between 1 preceding and 1.1 following) -- !query schema -struct +struct -- !query output 1 -3 1 1 2 -1 2 3 @@ -442,7 +442,7 @@ from numerics window w as (order by f_numeric range between 1 preceding and 1.1 following) -- !query schema -struct +struct -- !query output 1 -3 1 1 2 -1 2 3 From 31703204a1970c4e55ab65de8b0b548e0983bba5 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 11 Nov 2020 09:45:09 +0000 Subject: [PATCH 09/15] Delete OptimizeWindowFunctionsSuite.scala Optimize code --- .../catalyst/optimizer/OptimizeWindowFunctionsSuite.scala | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala deleted file mode 100644 index 0153a3c9897d6..0000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala +++ /dev/null @@ -1,5 +0,0 @@ -package org.apache.spark.sql.catalyst.optimizer - -class OptimizeWindowFunctionsSuite { - -} From 1b4533dcd8f27f45827b66cc868d01a2a0cf84b9 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 11 Nov 2020 17:46:15 +0800 Subject: [PATCH 10/15] Optimize code --- .../org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 43076e1ecc4dd..37dcee1e59ee8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, First, Sum} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan import org.apache.spark.sql.catalyst.plans.{Cross, Inner} import org.apache.spark.sql.catalyst.plans.logical._ From f851a4ceae8eaca7a1460f48f0266b3fd42519af Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 11 Nov 2020 19:26:41 +0800 Subject: [PATCH 11/15] Add test case. --- .../OptimizeWindowFunctionsSuite.scala | 51 +++++++++++++++++++ .../apache/spark/sql/SQLQueryTestSuite.scala | 3 +- .../sql/test/DataFrameReaderWriterSuite.scala | 24 +++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala new file mode 100644 index 0000000000000..1449d41b22f34 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.First +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class OptimizeWindowFunctionsSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("OptimizeWindowFunctions", FixedPoint(10), + OptimizeWindowFunctions) :: Nil + } + + test("check OptimizeWindowFunctions") { + val testRelation = LocalRelation('a.double, 'b.double, 'c.string) + val a = testRelation.output.head + val inputPlan = testRelation.select( + WindowExpression( + First(a, false).toAggregateExpression(), + WindowSpecDefinition(Nil, a.asc :: Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))) + val correctAnswer = testRelation.select( + WindowExpression( + NthValue(a, Literal(1), false), + WindowSpecDefinition(Nil, a.asc :: Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))) + + val optimized = Optimize.execute(inputPlan) + assert(optimized == correctAnswer) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 36e55c0994f18..8583685862d8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -563,7 +563,8 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper // Filter out test files with invalid extensions such as temp files created // by vi (.swp), Mac (.DS_Store) etc. val filteredFiles = files.filter(_.getName.endsWith(validFileExtensions)) - filteredFiles ++ dirs.flatMap(listFilesRecursively) + (filteredFiles ++ dirs.flatMap(listFilesRecursively)) + .filter(_.getName.equals("window.sql")) } /** Load built-in test tables into the SparkSession. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 4e61dba4955af..84553fe5ae4c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -1033,6 +1033,30 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } + test("abc2") { + spark.sql("create table SPARK_33045(id string) using parquet") + val values = Range(1, 90000) + spark.sql(s"select concat_ws(${values.mkString(", ")})").show + } + + test("abc1") { + spark.sql("create table SPARK_33045(id string) using parquet") + val values = Range(1, 9000) + spark.sql(s"select * from SPARK_33045 where id in (${values.mkString(", ")}, id)").show + } + + test("abc") { + spark.sql("create table SPARK_33045(id string) using parquet") + val values = Range(1, 9000) + spark.sql(s"select * from SPARK_33045 where id like all (${values.mkString(", ")})").show + } + + test("concat") { + spark.sql("create table SPARK_33045(id int) using parquet") + val values = Range(1, 900) + spark.sql(s"select concat(${values.mkString(", ")}, id) from SPARK_33045").show + } + test("Insert overwrite table command should output correct schema: basic") { withTable("tbl", "tbl2") { withView("view1") { From fd7e02eccb5c4ac6112e34a43634c8f656a447d3 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 11 Nov 2020 19:30:34 +0800 Subject: [PATCH 12/15] Revert some code. --- .../apache/spark/sql/SQLQueryTestSuite.scala | 3 +-- .../sql/test/DataFrameReaderWriterSuite.scala | 24 ------------------- 2 files changed, 1 insertion(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 8583685862d8e..36e55c0994f18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -563,8 +563,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper // Filter out test files with invalid extensions such as temp files created // by vi (.swp), Mac (.DS_Store) etc. val filteredFiles = files.filter(_.getName.endsWith(validFileExtensions)) - (filteredFiles ++ dirs.flatMap(listFilesRecursively)) - .filter(_.getName.equals("window.sql")) + filteredFiles ++ dirs.flatMap(listFilesRecursively) } /** Load built-in test tables into the SparkSession. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 84553fe5ae4c8..4e61dba4955af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -1033,30 +1033,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } - test("abc2") { - spark.sql("create table SPARK_33045(id string) using parquet") - val values = Range(1, 90000) - spark.sql(s"select concat_ws(${values.mkString(", ")})").show - } - - test("abc1") { - spark.sql("create table SPARK_33045(id string) using parquet") - val values = Range(1, 9000) - spark.sql(s"select * from SPARK_33045 where id in (${values.mkString(", ")}, id)").show - } - - test("abc") { - spark.sql("create table SPARK_33045(id string) using parquet") - val values = Range(1, 9000) - spark.sql(s"select * from SPARK_33045 where id like all (${values.mkString(", ")})").show - } - - test("concat") { - spark.sql("create table SPARK_33045(id int) using parquet") - val values = Range(1, 900) - spark.sql(s"select concat(${values.mkString(", ")}, id) from SPARK_33045").show - } - test("Insert overwrite table command should output correct schema: basic") { withTable("tbl", "tbl2") { withView("view1") { From 72ceacc2677c66d884d5e4f4ac124c3f61975edd Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 12 Nov 2020 10:55:47 +0800 Subject: [PATCH 13/15] Optimize code --- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../OptimizeWindowFunctionsSuite.scala | 24 ++++- .../resources/sql-tests/inputs/window.sql | 94 +++++++------------ .../sql-tests/results/window.sql.out | 94 +++++++------------ 4 files changed, 88 insertions(+), 127 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 34cf0477160c8..57ddd392e180a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -808,8 +808,7 @@ object CollapseRepartition extends Rule[LogicalPlan] { } /** - * Substitute the aggregate expression which uses [[First]] as the aggregate function - * in the window with the window function [[NthValue]]. + * Replaces first(col) to nth_value(col, 1) for better performance. */ object OptimizeWindowFunctions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala index 1449d41b22f34..c89208dce45d6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala @@ -31,21 +31,35 @@ class OptimizeWindowFunctionsSuite extends PlanTest { OptimizeWindowFunctions) :: Nil } - test("check OptimizeWindowFunctions") { - val testRelation = LocalRelation('a.double, 'b.double, 'c.string) - val a = testRelation.output.head + val testRelation = LocalRelation('a.double, 'b.double, 'c.string) + val a = testRelation.output(0) + val b = testRelation.output(1) + val c = testRelation.output(2) + + test("replace first(col) by nth_value(col, 1) if the window frame is ordered") { val inputPlan = testRelation.select( WindowExpression( First(a, false).toAggregateExpression(), - WindowSpecDefinition(Nil, a.asc :: Nil, + WindowSpecDefinition(b :: Nil, c.asc :: Nil, SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))) val correctAnswer = testRelation.select( WindowExpression( NthValue(a, Literal(1), false), - WindowSpecDefinition(Nil, a.asc :: Nil, + WindowSpecDefinition(b :: Nil, c.asc :: Nil, SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))) val optimized = Optimize.execute(inputPlan) assert(optimized == correctAnswer) } + + test("can't replace first(col) by nth_value(col, 1) if the window frame isn't ordered") { + val inputPlan = testRelation.select( + WindowExpression( + First(a, false).toAggregateExpression(), + WindowSpecDefinition(b :: Nil, Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))) + + val optimized = Optimize.execute(inputPlan) + assert(optimized == inputPlan) + } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql index 83ab4e2e7bbf5..f5223af9125f6 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/window.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql @@ -150,130 +150,104 @@ FROM testData ORDER BY cate, val; SELECT employee_name, salary, - first_value(employee_name) OVER (ORDER BY salary DESC) highest_salary, - nth_value(employee_name, 2) OVER (ORDER BY salary DESC) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC) ORDER BY salary DESC; SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) ORDER BY salary DESC; SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) ORDER BY salary DESC; SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary - RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary - RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) ORDER BY salary; SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) ORDER BY salary DESC; SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) ORDER BY salary DESC; SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) ORDER BY salary DESC; SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) ORDER BY salary DESC; SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) ORDER BY salary DESC; SELECT employee_name, department, salary, - FIRST_VALUE(employee_name) OVER ( - PARTITION BY department - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) highest_salary, - NTH_VALUE(employee_name, 2) OVER ( - PARTITION BY department - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) second_highest_salary + FIRST_VALUE(employee_name) OVER w highest_salary, + NTH_VALUE(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS ( + PARTITION BY department + ORDER BY salary DESC + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING +) ORDER BY department; \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out index b3d0a47c74243..1304dcf21d0b3 100644 --- a/sql/core/src/test/resources/sql-tests/results/window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out @@ -421,10 +421,11 @@ window aggregate function with filter predicate is not supported yet.; SELECT employee_name, salary, - first_value(employee_name) OVER (ORDER BY salary DESC) highest_salary, - nth_value(employee_name, 2) OVER (ORDER BY salary DESC) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC) ORDER BY salary DESC -- !query schema struct @@ -452,14 +453,11 @@ Leslie Thompson 5186 Larry Bott Gerard Bondur SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) ORDER BY salary DESC -- !query schema struct @@ -487,14 +485,11 @@ Leslie Thompson 5186 Larry Bott Gerard Bondur SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) ORDER BY salary DESC -- !query schema struct @@ -522,14 +517,11 @@ Leslie Thompson 5186 Larry Bott Gerard Bondur SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary - RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary - RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) ORDER BY salary -- !query schema struct @@ -557,14 +549,11 @@ Larry Bott 11798 Mary Patterson Loui Bondur SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) ORDER BY salary DESC -- !query schema struct @@ -592,14 +581,11 @@ Leslie Thompson 5186 Foon Yue Tseng Anthony Bow SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) ORDER BY salary DESC -- !query schema struct @@ -627,14 +613,11 @@ Leslie Thompson 5186 Leslie Thompson NULL SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) ORDER BY salary DESC -- !query schema struct @@ -662,14 +645,11 @@ Leslie Thompson 5186 Larry Bott Gerard Bondur SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) ORDER BY salary DESC -- !query schema struct @@ -697,14 +677,11 @@ Leslie Thompson 5186 Larry Bott Gerard Bondur SELECT employee_name, salary, - first_value(employee_name) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) highest_salary, - nth_value(employee_name, 2) OVER ( - ORDER BY salary DESC - ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) ORDER BY salary DESC -- !query schema struct @@ -733,18 +710,15 @@ SELECT employee_name, department, salary, - FIRST_VALUE(employee_name) OVER ( - PARTITION BY department - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) highest_salary, - NTH_VALUE(employee_name, 2) OVER ( - PARTITION BY department - ORDER BY salary DESC - RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) second_highest_salary + FIRST_VALUE(employee_name) OVER w highest_salary, + NTH_VALUE(employee_name, 2) OVER w second_highest_salary FROM basic_pays +WINDOW w AS ( + PARTITION BY department + ORDER BY salary DESC + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING +) ORDER BY department -- !query schema struct From 68d3388001615841685e9942c4220d7904f33665 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 12 Nov 2020 13:01:48 +0800 Subject: [PATCH 14/15] Optimize code --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 3 ++- .../optimizer/OptimizeWindowFunctionsSuite.scala | 13 ++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 57ddd392e180a..87eb0be77fcee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -813,7 +813,8 @@ object CollapseRepartition extends Rule[LogicalPlan] { object OptimizeWindowFunctions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), spec) - if !spec.orderSpec.isEmpty => + if spec.orderSpec.nonEmpty && + spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame].frameType == RowFrame => we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala index c89208dce45d6..dfe1d47bcba06 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala @@ -36,7 +36,7 @@ class OptimizeWindowFunctionsSuite extends PlanTest { val b = testRelation.output(1) val c = testRelation.output(2) - test("replace first(col) by nth_value(col, 1) if the window frame is ordered") { + test("replace first(col) by nth_value(col, 1)") { val inputPlan = testRelation.select( WindowExpression( First(a, false).toAggregateExpression(), @@ -52,6 +52,17 @@ class OptimizeWindowFunctionsSuite extends PlanTest { assert(optimized == correctAnswer) } + test("can't replace first(col) by nth_value(col, 1) if the window frame type is row") { + val inputPlan = testRelation.select( + WindowExpression( + First(a, false).toAggregateExpression(), + WindowSpecDefinition(b :: Nil, c.asc :: Nil, + SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)))) + + val optimized = Optimize.execute(inputPlan) + assert(optimized == inputPlan) + } + test("can't replace first(col) by nth_value(col, 1) if the window frame isn't ordered") { val inputPlan = testRelation.select( WindowExpression( From 3a7f4e740eb5a9cecf880bc5cc294b2459e98cf1 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 12 Nov 2020 15:42:26 +0800 Subject: [PATCH 15/15] Optimize code --- .../sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala index dfe1d47bcba06..389aaeafe655f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala @@ -52,7 +52,7 @@ class OptimizeWindowFunctionsSuite extends PlanTest { assert(optimized == correctAnswer) } - test("can't replace first(col) by nth_value(col, 1) if the window frame type is row") { + test("can't replace first(col) by nth_value(col, 1) if the window frame type is range") { val inputPlan = testRelation.select( WindowExpression( First(a, false).toAggregateExpression(),