From 6ab9b6fb559ab11ac95078a1672672edf85efbac Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 9 Jun 2017 18:45:23 -0700 Subject: [PATCH 1/6] [SPARK-21044][SPARK-21041][SQL] Add RemoveInvalidRange optimizer **BEFORE** ``` scala> spark.range(0,0,1).explain == Physical Plan == *Range (0, 0, step=1, splits=8) ``` **AFTER** ``` scala> spark.range(0,0,1).explain == Physical Plan == LocalTableScan , [id#0L] ``` --- .../sql/catalyst/optimizer/Optimizer.scala | 11 +++++ .../optimizer/RemoveInvalidRangeSuite.scala | 49 +++++++++++++++++++ .../spark/sql/DataFrameRangeSuite.scala | 11 +++++ 3 files changed, 71 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveInvalidRangeSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d16689a34298a..ba6114ad9aac3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -110,6 +110,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) EliminateSerialization, RemoveRedundantAliases, RemoveRedundantProject, + RemoveInvalidRange, SimplifyCreateStructOps, SimplifyCreateArrayOps, SimplifyCreateMapOps, @@ -270,6 +271,16 @@ object RemoveRedundantProject extends Rule[LogicalPlan] { } } +/** + * Replace invalid `range` with emtpy range. + */ +object RemoveInvalidRange extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case p @ Range(start, end, step, a, b) if (start == end) || (start < end ^ 0 < step) => + LocalRelation(p.output, data = Seq.empty) + } +} + /** * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveInvalidRangeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveInvalidRangeSuite.scala new file mode 100644 index 0000000000000..230c6082be92b --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveInvalidRangeSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class RemoveInvalidRangeSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("RemoveInvalidRange", Once, + RemoveInvalidRange) :: Nil + } + + test("preserve valid ranges") { + Seq(Range(0, 10, 1, 1), Range(10, 0, -1, 1)).foreach { query => + val optimized = Optimize.execute(query.analyze) + val correctAnswer = query + + comparePlans(optimized, correctAnswer) + } + } + + test("remove ranges with invalid combination of start/end/step") { + Seq(Range(0, 0, 1, 1), Range(0, 0, -1, 1), Range(1, 10, -1, 1), Range(10, 1, 1, 1)).foreach { + query => + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation(query.output, data = Seq.empty) + comparePlans(optimized, correctAnswer) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 7b495656b93d7..93669adb24186 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -191,6 +191,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall checkAnswer(sql("SELECT * FROM range(3)"), Row(0) :: Row(1) :: Row(2) :: Nil) } } + + test("SPARK-21041 SparkSession.range()'s behavior is inconsistent with SparkContext.range()") { + val start = java.lang.Long.MAX_VALUE - 3 + val end = java.lang.Long.MIN_VALUE + 2 + Seq("false", "true").foreach { value => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value) { + assert(spark.sparkContext.range(start, end, 1).collect.length == 0) + assert(spark.range(start, end, 1).collect.length == 0) + } + } + } } object DataFrameRangeSuite { From fe9d98bf990a75337cb849bc79c065c821b210bd Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 10 Jun 2017 12:34:14 -0700 Subject: [PATCH 2/6] address comments --- .../sql/catalyst/optimizer/Optimizer.scala | 11 ----- .../plans/logical/basicLogicalOperators.scala | 14 ++++-- .../optimizer/RemoveInvalidRangeSuite.scala | 49 ------------------- 3 files changed, 11 insertions(+), 63 deletions(-) delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveInvalidRangeSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ba6114ad9aac3..d16689a34298a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -110,7 +110,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) EliminateSerialization, RemoveRedundantAliases, RemoveRedundantProject, - RemoveInvalidRange, SimplifyCreateStructOps, SimplifyCreateArrayOps, SimplifyCreateMapOps, @@ -271,16 +270,6 @@ object RemoveRedundantProject extends Rule[LogicalPlan] { } } -/** - * Replace invalid `range` with emtpy range. - */ -object RemoveInvalidRange extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case p @ Range(start, end, step, a, b) if (start == end) || (start < end ^ 0 < step) => - LocalRelation(p.output, data = Seq.empty) - } -} - /** * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 6878b6b179c3a..5d8d86391c889 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -482,11 +482,17 @@ case class Sort( /** Factory for constructing new `Range` nodes. */ object Range { - def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): Range = { + def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]) + : LeafNode with MultiInstanceRelation = { val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes - new Range(start, end, step, numSlices, output) + if (start == end || (start < end ^ 0 < step)) { + LocalRelation(output) + } else { + new Range(start, end, step, numSlices, output) + } } - def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = { + def apply(start: Long, end: Long, step: Long, numSlices: Int) + : LeafNode with MultiInstanceRelation = { Range(start, end, step, Some(numSlices)) } } @@ -500,6 +506,8 @@ case class Range( extends LeafNode with MultiInstanceRelation { require(step != 0, s"step ($step) cannot be 0") + require(start != end, s"start ($step) cannot be equal to end ($end)") + require(start < end ^ step < 0, s"the sign of step ($step) is invalid for range ($start, $end)") val numElements: BigInt = { val safeStart = BigInt(start) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveInvalidRangeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveInvalidRangeSuite.scala deleted file mode 100644 index 230c6082be92b..0000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveInvalidRangeSuite.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.optimizer - -import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} -import org.apache.spark.sql.catalyst.rules.RuleExecutor - -class RemoveInvalidRangeSuite extends PlanTest { - object Optimize extends RuleExecutor[LogicalPlan] { - val batches = - Batch("RemoveInvalidRange", Once, - RemoveInvalidRange) :: Nil - } - - test("preserve valid ranges") { - Seq(Range(0, 10, 1, 1), Range(10, 0, -1, 1)).foreach { query => - val optimized = Optimize.execute(query.analyze) - val correctAnswer = query - - comparePlans(optimized, correctAnswer) - } - } - - test("remove ranges with invalid combination of start/end/step") { - Seq(Range(0, 0, 1, 1), Range(0, 0, -1, 1), Range(1, 10, -1, 1), Range(10, 1, 1, 1)).foreach { - query => - val optimized = Optimize.execute(query.analyze) - val correctAnswer = LocalRelation(query.output, data = Seq.empty) - comparePlans(optimized, correctAnswer) - } - } -} From c4ad4c5923272579fa2ebcbfc235835049577c3a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 10 Jun 2017 14:30:35 -0700 Subject: [PATCH 3/6] Use LogicalPlan instead. --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 5d8d86391c889..4131edd7b02f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -482,8 +482,7 @@ case class Sort( /** Factory for constructing new `Range` nodes. */ object Range { - def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]) - : LeafNode with MultiInstanceRelation = { + def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): LogicalPlan = { val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes if (start == end || (start < end ^ 0 < step)) { LocalRelation(output) @@ -491,8 +490,7 @@ object Range { new Range(start, end, step, numSlices, output) } } - def apply(start: Long, end: Long, step: Long, numSlices: Int) - : LeafNode with MultiInstanceRelation = { + def apply(start: Long, end: Long, step: Long, numSlices: Int): LogicalPlan = { Range(start, end, step, Some(numSlices)) } } From 84b87b79cd169f9676988ccc214968d5bec31f57 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 10 Jun 2017 14:45:15 -0700 Subject: [PATCH 4/6] fix. --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4131edd7b02f0..afb1eeaa71daa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -504,7 +504,7 @@ case class Range( extends LeafNode with MultiInstanceRelation { require(step != 0, s"step ($step) cannot be 0") - require(start != end, s"start ($step) cannot be equal to end ($end)") + require(start != end, s"start ($start) cannot be equal to end ($end)") require(start < end ^ step < 0, s"the sign of step ($step) is invalid for range ($start, $end)") val numElements: BigInt = { From 89dd7ada850c1fb02fba32bc955f2de3a7ae3679 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 11 Jun 2017 19:39:03 -0700 Subject: [PATCH 5/6] address comments. --- .../plans/logical/basicLogicalOperators.scala | 12 +++--------- .../spark/sql/execution/basicPhysicalOperators.scala | 10 +++++++--- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index afb1eeaa71daa..6878b6b179c3a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -482,15 +482,11 @@ case class Sort( /** Factory for constructing new `Range` nodes. */ object Range { - def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): LogicalPlan = { + def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): Range = { val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes - if (start == end || (start < end ^ 0 < step)) { - LocalRelation(output) - } else { - new Range(start, end, step, numSlices, output) - } + new Range(start, end, step, numSlices, output) } - def apply(start: Long, end: Long, step: Long, numSlices: Int): LogicalPlan = { + def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = { Range(start, end, step, Some(numSlices)) } } @@ -504,8 +500,6 @@ case class Range( extends LeafNode with MultiInstanceRelation { require(step != 0, s"step ($step) cannot be 0") - require(start != end, s"start ($start) cannot be equal to end ($end)") - require(start < end ^ step < 0, s"the sign of step ($step) is invalid for range ($start, $end)") val numElements: BigInt = { val safeStart = BigInt(start) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index f69a688555bbf..04c130314388a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext} -import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD} +import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} @@ -347,8 +347,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) } override def inputRDDs(): Seq[RDD[InternalRow]] = { - sqlContext.sparkContext.parallelize(0 until numSlices, numSlices) - .map(i => InternalRow(i)) :: Nil + val rdd = if (start == end || (start < end ^ 0 < step)) { + new EmptyRDD[InternalRow](sqlContext.sparkContext) + } else { + sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i)) + } + rdd :: Nil } protected override def doProduce(ctx: CodegenContext): String = { From 46f60f0fd9981b52de5b4c719ce51d0de9a97805 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 11 Jun 2017 22:50:49 -0700 Subject: [PATCH 6/6] Add `start == end` test case and remove redundant `SparkContext.rage` test case. --- .../test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 93669adb24186..45afbd29d1907 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -197,8 +197,8 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall val end = java.lang.Long.MIN_VALUE + 2 Seq("false", "true").foreach { value => withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value) { - assert(spark.sparkContext.range(start, end, 1).collect.length == 0) assert(spark.range(start, end, 1).collect.length == 0) + assert(spark.range(start, start, 1).collect.length == 0) } } }