From 2e0d7d6d6ccf896e363db68b76030adf9ea9e691 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 15 Aug 2016 23:26:29 +0800 Subject: [PATCH 1/4] Don't do foldablePropagate on object operators. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 7 +++++++ .../scala/org/apache/spark/sql/DatasetSuite.scala | 15 +++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e34a478818e98..eaef33fec8fd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -727,6 +727,13 @@ object FoldablePropagation extends Rule[LogicalPlan] { case j @ Join(_, _, LeftOuter | RightOuter | FullOuter, _) => stop = true j + + // Operators that operate on objects should only have expressions from encoders, which + // should never have foldable expressions. + case o: ObjectConsumer => o + case o: ObjectProducer => o + case a: AppendColumns => a + case p: LogicalPlan if !stop => p.transformExpressions { case a: AttributeReference if foldableMap.contains(a) => foldableMap(a) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 88fb1472b668b..e04389916e4cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -878,6 +878,21 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val ds = spark.createDataset(data)(enc) checkDataset(ds, (("a", "b"), "c"), (null, "d")) } + + test("SPARK-16995: flat mapping on Dataset containing a column created with lit/expr") { + val df = Seq("1").toDF("a") + + import df.sparkSession.implicits._ + + assertResult(Seq()) { + df.withColumn("b", lit(0)).as[ClassData] + .groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() }.collect().toSeq + } + assertResult(Seq()) { + df.withColumn("b", expr("0")).as[ClassData] + .groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() }.collect().toSeq + } + } } case class Generic[T](id: T, value: Double) From 4094b4353f488d33023c3e9682e91f5250410a30 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 16 Aug 2016 16:46:00 +0800 Subject: [PATCH 2/4] Set stop to true. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index eaef33fec8fd1..e101273bf4e28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -730,9 +730,15 @@ object FoldablePropagation extends Rule[LogicalPlan] { // Operators that operate on objects should only have expressions from encoders, which // should never have foldable expressions. - case o: ObjectConsumer => o - case o: ObjectProducer => o - case a: AppendColumns => a + case o: ObjectConsumer => + stop = true + o + case o: ObjectProducer => + stop = true + o + case a: AppendColumns => + stop = true + a case p: LogicalPlan if !stop => p.transformExpressions { case a: AttributeReference if foldableMap.contains(a) => From 0008c3e11dfb85523f9f4606d5dec714339d5f43 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 17 Aug 2016 10:24:16 +0800 Subject: [PATCH 3/4] Add specified cases. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e101273bf4e28..31bb941417159 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -730,15 +730,15 @@ object FoldablePropagation extends Rule[LogicalPlan] { // Operators that operate on objects should only have expressions from encoders, which // should never have foldable expressions. - case o: ObjectConsumer => + case m: MapGroups => stop = true - o - case o: ObjectProducer => + m + case f: FlatMapGroupsInR => stop = true - o - case a: AppendColumns => + f + case c: CoGroup => stop = true - a + c case p: LogicalPlan if !stop => p.transformExpressions { case a: AttributeReference if foldableMap.contains(a) => From b688abfc679a98de0ca875a571081993160230b1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 18 Aug 2016 10:45:15 +0800 Subject: [PATCH 4/4] Address comments. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 31bb941417159..a2bf4d72289d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -728,8 +728,8 @@ object FoldablePropagation extends Rule[LogicalPlan] { stop = true j - // Operators that operate on objects should only have expressions from encoders, which - // should never have foldable expressions. + // These 3 operators take attributes as constructor parameters, and these attributes + // can't be replaced by alias. case m: MapGroups => stop = true m diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index e04389916e4cb..8ce6ea66b6bbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -884,14 +884,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { import df.sparkSession.implicits._ - assertResult(Seq()) { + checkDataset( df.withColumn("b", lit(0)).as[ClassData] - .groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() }.collect().toSeq - } - assertResult(Seq()) { + .groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() }) + checkDataset( df.withColumn("b", expr("0")).as[ClassData] - .groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() }.collect().toSeq - } + .groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() }) } }