From 0a620989e1e857ba9c84389493dc5f45a29450f6 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Wed, 15 Jul 2015 17:17:01 +0800 Subject: [PATCH 01/22] Optimize cartesian order --- .../spark/sql/execution/joins/CartesianProduct.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 261b4724159fb..b3096751a23dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -34,7 +34,15 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod val leftResults = left.execute().map(_.copy()) val rightResults = right.execute().map(_.copy()) - leftResults.cartesian(rightResults).mapPartitions { iter => + val cartesianRdd = if(leftResults.partitions.size > rightResults.partitions.size) { + rightResults.cartesian(leftResults).mapPartitions{ iter => + iter.map(tuple => (tuple._2, tuple._1)) + } + } else { + leftResults.cartesian(rightResults) + } + + cartesianRdd.mapPartitions { iter => val joinedRow = new JoinedRow iter.map(r => joinedRow(r._1, r._2)) } From 61d1a7e4422906154eec5d922c44436a1300ef32 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Wed, 15 Jul 2015 17:36:23 +0800 Subject: [PATCH 02/22] Fix code sytle --- .../apache/spark/sql/execution/joins/CartesianProduct.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index b3096751a23dc..e0242bdad5011 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -34,8 +34,8 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod val leftResults = left.execute().map(_.copy()) val rightResults = right.execute().map(_.copy()) - val cartesianRdd = if(leftResults.partitions.size > rightResults.partitions.size) { - rightResults.cartesian(leftResults).mapPartitions{ iter => + val cartesianRdd = if (leftResults.partitions.size > rightResults.partitions.size) { + rightResults.cartesian(leftResults).mapPartitions { iter => iter.map(tuple => (tuple._2, tuple._1)) } } else { From 23deb4b736b70013b4a3ca66ab328fca245f6b33 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Thu, 16 Jul 2015 15:03:01 +0800 Subject: [PATCH 03/22] Update --- .../spark/sql/execution/SparkStrategies.scala | 16 ++++++++-- .../execution/joins/CartesianProduct.scala | 30 ++++++++++--------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 73b463471ec5a..d6b26b7a89d6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -213,10 +213,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object CartesianProduct extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, _, None) => - execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil + val buildSide = + if (left.statistics.sizeInBytes <= right.statistics.sizeInBytes) { + joins.BuildRight + } else { + joins.BuildLeft + } + execution.joins.CartesianProduct(planLater(left), planLater(right), buildSide) :: Nil case logical.Join(left, right, Inner, Some(condition)) => + val buildSide = + if (left.statistics.sizeInBytes <= right.statistics.sizeInBytes) { + joins.BuildRight + } else { + joins.BuildLeft + } execution.Filter(condition, - execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil + execution.joins.CartesianProduct(planLater(left), planLater(right), buildSide)) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index e0242bdad5011..5ca8368ec491b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -27,24 +27,26 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} * :: DeveloperApi :: */ @DeveloperApi -case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output +case class CartesianProduct( + left: SparkPlan, + right: SparkPlan, + buildSide: BuildSide) extends BinaryNode { - protected override def doExecute(): RDD[InternalRow] = { - val leftResults = left.execute().map(_.copy()) - val rightResults = right.execute().map(_.copy()) + private val (streamed, broadcast) = buildSide match { + case BuildRight => (left, right) + case BuildLeft => (right, left) + } - val cartesianRdd = if (leftResults.partitions.size > rightResults.partitions.size) { - rightResults.cartesian(leftResults).mapPartitions { iter => - iter.map(tuple => (tuple._2, tuple._1)) - } - } else { - leftResults.cartesian(rightResults) - } + override def output: Seq[Attribute] = left.output ++ right.output - cartesianRdd.mapPartitions { iter => + protected override def doExecute(): RDD[InternalRow] = { + val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy())) + broadcastedRelation.value.cartesian(streamed.execute().map(_.copy())).mapPartitions{ iter => val joinedRow = new JoinedRow - iter.map(r => joinedRow(r._1, r._2)) + buildSide match { + case BuildRight => iter.map(r => joinedRow(r._1, r._2)) + case BuildLeft => iter.map(r => joinedRow(r._2, r._1)) + } } } } From eb9d155492867191816a69aaacfcd8c24e15595d Mon Sep 17 00:00:00 2001 From: linweizhong Date: Thu, 16 Jul 2015 15:25:33 +0800 Subject: [PATCH 04/22] Fix code style --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- .../apache/spark/sql/execution/joins/CartesianProduct.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d6b26b7a89d6a..38a84ad6a156b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -216,7 +216,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val buildSide = if (left.statistics.sizeInBytes <= right.statistics.sizeInBytes) { joins.BuildRight - } else { + } else { joins.BuildLeft } execution.joins.CartesianProduct(planLater(left), planLater(right), buildSide) :: Nil @@ -224,7 +224,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val buildSide = if (left.statistics.sizeInBytes <= right.statistics.sizeInBytes) { joins.BuildRight - } else { + } else { joins.BuildLeft } execution.Filter(condition, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 5ca8368ec491b..76b0bb716855c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -41,7 +41,7 @@ case class CartesianProduct( protected override def doExecute(): RDD[InternalRow] = { val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy())) - broadcastedRelation.value.cartesian(streamed.execute().map(_.copy())).mapPartitions{ iter => + broadcastedRelation.value.cartesian(streamed.execute().map(_.copy())).mapPartitions { iter => val joinedRow = new JoinedRow buildSide match { case BuildRight => iter.map(r => joinedRow(r._1, r._2)) From 8198648cccae9db832a8d9fdaa862a895d484880 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Thu, 16 Jul 2015 20:41:32 +0800 Subject: [PATCH 05/22] Fix unit test failed --- .../execution/joins/CartesianProduct.scala | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 76b0bb716855c..9cd577cbbbb72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -17,11 +17,15 @@ package org.apache.spark.sql.execution.joins +import scala.concurrent._ +import scala.concurrent.duration._ + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils /** * :: DeveloperApi :: @@ -31,17 +35,30 @@ case class CartesianProduct( left: SparkPlan, right: SparkPlan, buildSide: BuildSide) extends BinaryNode { + override def output: Seq[Attribute] = left.output ++ right.output private val (streamed, broadcast) = buildSide match { case BuildRight => (left, right) case BuildLeft => (right, left) } - override def output: Seq[Attribute] = left.output ++ right.output + private val timeout: Duration = { + val timeoutValue = sqlContext.conf.broadcastTimeout + if (timeoutValue < 0) { + Duration.Inf + } else { + timeoutValue.seconds + } + } + + @transient + private val broadcastFuture = future { + sparkContext.broadcast(broadcast.execute().map(_.copy())) + }(CartesianProduct.broadcastCartesianProductExecutionContext) protected override def doExecute(): RDD[InternalRow] = { - val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy())) - broadcastedRelation.value.cartesian(streamed.execute().map(_.copy())).mapPartitions { iter => + val broadcastedRdd = Await.result(broadcastFuture, timeout) + streamed.execute().map(_.copy()).cartesian(broadcastedRdd.value).mapPartitions { iter => val joinedRow = new JoinedRow buildSide match { case BuildRight => iter.map(r => joinedRow(r._1, r._2)) @@ -50,3 +67,8 @@ case class CartesianProduct( } } } + +object CartesianProduct { + private val broadcastCartesianProductExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128)) +} \ No newline at end of file From 1006d4691f68bd0d8a921401b3e3bdd258acc291 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Fri, 17 Jul 2015 08:53:10 +0800 Subject: [PATCH 06/22] Fix code style --- .../org/apache/spark/sql/execution/joins/CartesianProduct.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 9cd577cbbbb72..34d16275dc891 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -71,4 +71,4 @@ case class CartesianProduct( object CartesianProduct { private val broadcastCartesianProductExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128)) -} \ No newline at end of file +} From f0ce44725ac7fec63eed6323b1d1c917a4fa4fc9 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Fri, 17 Jul 2015 10:08:53 +0800 Subject: [PATCH 07/22] Update --- .../spark/sql/execution/SparkStrategies.scala | 51 ++++++++++--- .../joins/BroadcastCartesianProduct.scala | 74 +++++++++++++++++++ .../execution/joins/CartesianProduct.scala | 28 +------ 3 files changed, 116 insertions(+), 37 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 38a84ad6a156b..99f908bab7e65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -213,22 +213,51 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object CartesianProduct extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, _, None) => - val buildSide = - if (left.statistics.sizeInBytes <= right.statistics.sizeInBytes) { - joins.BuildRight + // For BroadcastCartesianProduct we will broadcast the small size plan, + // for CartesianProduct we will use the small size plan as cartesian left rdd. + if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { + if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && + right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { + execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), + joins.BuildRight) :: Nil } else { - joins.BuildLeft + execution.joins.CartesianProduct(planLater(left), planLater(right), + joins.BuildLeft) :: Nil + } + } else { + if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && + left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { + execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), + joins.BuildLeft) :: Nil + } else { + execution.joins.CartesianProduct(planLater(left), planLater(right), + joins.BuildRight) :: Nil } - execution.joins.CartesianProduct(planLater(left), planLater(right), buildSide) :: Nil + } case logical.Join(left, right, Inner, Some(condition)) => - val buildSide = - if (left.statistics.sizeInBytes <= right.statistics.sizeInBytes) { - joins.BuildRight + if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { + if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && + right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { + execution.Filter(condition, + execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), + joins.BuildRight)) :: Nil } else { - joins.BuildLeft + execution.Filter(condition, + execution.joins.CartesianProduct(planLater(left), planLater(right), + joins.BuildLeft)) :: Nil + } + } else { + if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && + left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { + execution.Filter(condition, + execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), + joins.BuildLeft)) :: Nil + } else { + execution.Filter(condition, + execution.joins.CartesianProduct(planLater(left), planLater(right), + joins.BuildRight)) :: Nil } - execution.Filter(condition, - execution.joins.CartesianProduct(planLater(left), planLater(right), buildSide)) :: Nil + } case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala new file mode 100644 index 0000000000000..d390d866c45a5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import scala.concurrent._ +import scala.concurrent.duration._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class BroadcastCartesianProduct( + left: SparkPlan, + right: SparkPlan, + buildSide: BuildSide) extends BinaryNode { + override def output: Seq[Attribute] = left.output ++ right.output + + private val (streamed, broadcast) = buildSide match { + case BuildRight => (left, right) + case BuildLeft => (right, left) + } + + private val timeout: Duration = { + val timeoutValue = sqlContext.conf.broadcastTimeout + if (timeoutValue < 0) { + Duration.Inf + } else { + timeoutValue.seconds + } + } + + @transient + private val broadcastFuture = future { + sparkContext.broadcast(broadcast.execute().map(_.copy())) + }(BroadcastCartesianProduct.broadcastCartesianProductExecutionContext) + + protected override def doExecute(): RDD[InternalRow] = { + val broadcastedRdd = Await.result(broadcastFuture, timeout) + streamed.execute().map(_.copy()).cartesian(broadcastedRdd.value).mapPartitions { iter => + val joinedRow = new JoinedRow + buildSide match { + case BuildRight => iter.map(r => joinedRow(r._1, r._2)) + case BuildLeft => iter.map(r => joinedRow(r._2, r._1)) + } + } + } +} + +object BroadcastCartesianProduct { + private val broadcastCartesianProductExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128)) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 34d16275dc891..329da1098dad6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -17,15 +17,11 @@ package org.apache.spark.sql.execution.joins -import scala.concurrent._ -import scala.concurrent.duration._ - import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} -import org.apache.spark.util.ThreadUtils /** * :: DeveloperApi :: @@ -37,28 +33,13 @@ case class CartesianProduct( buildSide: BuildSide) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output - private val (streamed, broadcast) = buildSide match { + private val (small, big) = buildSide match { case BuildRight => (left, right) case BuildLeft => (right, left) } - private val timeout: Duration = { - val timeoutValue = sqlContext.conf.broadcastTimeout - if (timeoutValue < 0) { - Duration.Inf - } else { - timeoutValue.seconds - } - } - - @transient - private val broadcastFuture = future { - sparkContext.broadcast(broadcast.execute().map(_.copy())) - }(CartesianProduct.broadcastCartesianProductExecutionContext) - protected override def doExecute(): RDD[InternalRow] = { - val broadcastedRdd = Await.result(broadcastFuture, timeout) - streamed.execute().map(_.copy()).cartesian(broadcastedRdd.value).mapPartitions { iter => + small.execute().map(_.copy()).cartesian(big.execute().map(_.copy())).mapPartitions { iter => val joinedRow = new JoinedRow buildSide match { case BuildRight => iter.map(r => joinedRow(r._1, r._2)) @@ -67,8 +48,3 @@ case class CartesianProduct( } } } - -object CartesianProduct { - private val broadcastCartesianProductExecutionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128)) -} From 2bc09914472d08f97b2a82ec99f840417df4bb49 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Fri, 17 Jul 2015 10:10:18 +0800 Subject: [PATCH 08/22] Update code style --- .../sql/execution/joins/BroadcastCartesianProduct.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala index d390d866c45a5..17293f815eb6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala @@ -32,9 +32,9 @@ import org.apache.spark.util.ThreadUtils */ @DeveloperApi case class BroadcastCartesianProduct( - left: SparkPlan, - right: SparkPlan, - buildSide: BuildSide) extends BinaryNode { + left: SparkPlan, + right: SparkPlan, + buildSide: BuildSide) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output private val (streamed, broadcast) = buildSide match { From 547242e9942f67799f71fe594b16c2d5ca0f2866 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Fri, 17 Jul 2015 10:11:20 +0800 Subject: [PATCH 09/22] code style --- .../sql/execution/joins/BroadcastCartesianProduct.scala | 6 +++--- .../apache/spark/sql/execution/joins/CartesianProduct.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala index 17293f815eb6b..762c910de6d98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala @@ -32,9 +32,9 @@ import org.apache.spark.util.ThreadUtils */ @DeveloperApi case class BroadcastCartesianProduct( - left: SparkPlan, - right: SparkPlan, - buildSide: BuildSide) extends BinaryNode { + left: SparkPlan, + right: SparkPlan, + buildSide: BuildSide) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output private val (streamed, broadcast) = buildSide match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 329da1098dad6..57ae4dd27a73e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} */ @DeveloperApi case class CartesianProduct( - left: SparkPlan, - right: SparkPlan, - buildSide: BuildSide) extends BinaryNode { + left: SparkPlan, + right: SparkPlan, + buildSide: BuildSide) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output private val (small, big) = buildSide match { From bca7a07f573aadf969a8f9c5d7650f5954aeb63f Mon Sep 17 00:00:00 2001 From: linweizhong Date: Fri, 17 Jul 2015 10:42:59 +0800 Subject: [PATCH 10/22] Fix unit test failed --- .../sql/execution/joins/BroadcastCartesianProduct.scala | 6 ++++-- .../apache/spark/sql/execution/joins/CartesianProduct.scala | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala index 762c910de6d98..b1da9313aa659 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala @@ -57,8 +57,10 @@ case class BroadcastCartesianProduct( }(BroadcastCartesianProduct.broadcastCartesianProductExecutionContext) protected override def doExecute(): RDD[InternalRow] = { - val broadcastedRdd = Await.result(broadcastFuture, timeout) - streamed.execute().map(_.copy()).cartesian(broadcastedRdd.value).mapPartitions { iter => + val leftResults = streamed.execute().map(_.copy()) + val rightResults = Await.result(broadcastFuture, timeout).value + + leftResults.cartesian(rightResults).mapPartitions { iter => val joinedRow = new JoinedRow buildSide match { case BuildRight => iter.map(r => joinedRow(r._1, r._2)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 57ae4dd27a73e..9befdc22d09d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -39,7 +39,10 @@ case class CartesianProduct( } protected override def doExecute(): RDD[InternalRow] = { - small.execute().map(_.copy()).cartesian(big.execute().map(_.copy())).mapPartitions { iter => + val leftResults = small.execute().map(_.copy()) + val rightResults = big.execute().map(_.copy()) + + leftResults.cartesian(rightResults).mapPartitions { iter => val joinedRow = new JoinedRow buildSide match { case BuildRight => iter.map(r => joinedRow(r._1, r._2)) From a1689000c21497eadddc594a922395236e62347e Mon Sep 17 00:00:00 2001 From: linweizhong Date: Mon, 20 Jul 2015 14:11:51 +0800 Subject: [PATCH 11/22] Fix NullPointerException --- .../joins/BroadcastCartesianProduct.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala index b1da9313aa659..0d432573d13c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala @@ -53,19 +53,23 @@ case class BroadcastCartesianProduct( @transient private val broadcastFuture = future { - sparkContext.broadcast(broadcast.execute().map(_.copy())) + val input = broadcast.execute().map(_.copy()).collect() + sparkContext.broadcast(input) }(BroadcastCartesianProduct.broadcastCartesianProductExecutionContext) protected override def doExecute(): RDD[InternalRow] = { val leftResults = streamed.execute().map(_.copy()) - val rightResults = Await.result(broadcastFuture, timeout).value + val rightResults = Await.result(broadcastFuture, timeout) - leftResults.cartesian(rightResults).mapPartitions { iter => - val joinedRow = new JoinedRow - buildSide match { - case BuildRight => iter.map(r => joinedRow(r._1, r._2)) - case BuildLeft => iter.map(r => joinedRow(r._2, r._1)) - } + leftResults.mapPartitions { streamedIter => + for (x <- streamedIter; y <- rightResults.value) + yield { + val joinedRow = new JoinedRow + buildSide match { + case BuildRight => joinedRow(x, y) + case BuildLeft => joinedRow(y, x) + } + } } } } From 99bcde76690beb35aab1e9f762bb7829ed671c35 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Wed, 22 Jul 2015 12:25:44 +0800 Subject: [PATCH 12/22] Update thread pool name --- .../spark/sql/execution/joins/BroadcastCartesianProduct.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala index 0d432573d13c6..28334ad50e4c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala @@ -76,5 +76,5 @@ case class BroadcastCartesianProduct( object BroadcastCartesianProduct { private val broadcastCartesianProductExecutionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128)) + ThreadUtils.newDaemonCachedThreadPool("broadcast-cartesian-product", 128)) } From 43105364910cee72c90f57284e73b21b7823d8d3 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Sat, 1 Aug 2015 11:23:46 +0800 Subject: [PATCH 13/22] Update --- .../spark/sql/execution/SparkStrategies.scala | 75 ++++++++----------- 1 file changed, 32 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 99f908bab7e65..c32486f70fd94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -211,53 +211,42 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } object CartesianProduct extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.Join(left, right, _, None) => - // For BroadcastCartesianProduct we will broadcast the small size plan, - // for CartesianProduct we will use the small size plan as cartesian left rdd. - if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { - if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && - right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { - execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), - joins.BuildRight) :: Nil - } else { - execution.joins.CartesianProduct(planLater(left), planLater(right), - joins.BuildLeft) :: Nil - } + def canBroadCast(plan: LogicalPlan): Boolean = { + if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && + plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { + true + } else { + false + } + } + + def createCartesianProduct(left: LogicalPlan, right: LogicalPlan): SparkPlan = { + // For BroadcastCartesianProduct we will broadcast the small size plan, + // for CartesianProduct we will use the small size plan as cartesian left rdd. + if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { + if (canBroadCast(right)) { + execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), + joins.BuildRight) } else { - if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && - left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { - execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), - joins.BuildLeft) :: Nil - } else { - execution.joins.CartesianProduct(planLater(left), planLater(right), - joins.BuildRight) :: Nil - } + execution.joins.CartesianProduct(planLater(left), planLater(right), + joins.BuildLeft) } - case logical.Join(left, right, Inner, Some(condition)) => - if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { - if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && - right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { - execution.Filter(condition, - execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), - joins.BuildRight)) :: Nil - } else { - execution.Filter(condition, - execution.joins.CartesianProduct(planLater(left), planLater(right), - joins.BuildLeft)) :: Nil - } + } else { + if (canBroadCast(left)) { + execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), + joins.BuildLeft) } else { - if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && - left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { - execution.Filter(condition, - execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), - joins.BuildLeft)) :: Nil - } else { - execution.Filter(condition, - execution.joins.CartesianProduct(planLater(left), planLater(right), - joins.BuildRight)) :: Nil - } + execution.joins.CartesianProduct(planLater(left), planLater(right), + joins.BuildRight) } + } + } + + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case logical.Join(left, right, _, None) => + createCartesianProduct(left, right) :: Nil + case logical.Join(left, right, Inner, Some(condition)) => + execution.Filter(condition, createCartesianProduct(left, right)) :: Nil case _ => Nil } } From b2a0ae89700d8afef7b795b676fdea41146cdd3e Mon Sep 17 00:00:00 2001 From: linweizhong Date: Sat, 1 Aug 2015 11:41:35 +0800 Subject: [PATCH 14/22] Update --- .../spark/sql/execution/SparkStrategies.scala | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c32486f70fd94..a3dc944c942af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -211,32 +211,21 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } object CartesianProduct extends Strategy { - def canBroadCast(plan: LogicalPlan): Boolean = { - if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && - plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { - true - } else { - false - } - } - def createCartesianProduct(left: LogicalPlan, right: LogicalPlan): SparkPlan = { // For BroadcastCartesianProduct we will broadcast the small size plan, // for CartesianProduct we will use the small size plan as cartesian left rdd. if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { - if (canBroadCast(right)) { - execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), - joins.BuildRight) - } else { - execution.joins.CartesianProduct(planLater(left), planLater(right), + right match { + case CanBroadcast(right) => execution.joins.BroadcastCartesianProduct(planLater(left), + planLater(right), joins.BuildRight) + case _ => execution.joins.CartesianProduct(planLater(left), planLater(right), joins.BuildLeft) } } else { - if (canBroadCast(left)) { - execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), - joins.BuildLeft) - } else { - execution.joins.CartesianProduct(planLater(left), planLater(right), + left match { + case CanBroadcast(left) => execution.joins.BroadcastCartesianProduct(planLater(left), + planLater(right), joins.BuildLeft) + case _ => execution.joins.CartesianProduct(planLater(left), planLater(right), joins.BuildRight) } } From 5ca1d26f208870607698694fda78e2a728a7e2dc Mon Sep 17 00:00:00 2001 From: linweizhong Date: Sat, 1 Aug 2015 12:36:59 +0800 Subject: [PATCH 15/22] Use BroadcastNestedLoopJoin replace BroadcastCartesianProduct --- .../spark/sql/execution/SparkStrategies.scala | 13 +-- .../joins/BroadcastCartesianProduct.scala | 80 ------------------- 2 files changed, 7 insertions(+), 86 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index a3dc944c942af..820f2afe8d8bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -212,19 +212,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object CartesianProduct extends Strategy { def createCartesianProduct(left: LogicalPlan, right: LogicalPlan): SparkPlan = { - // For BroadcastCartesianProduct we will broadcast the small size plan, - // for CartesianProduct we will use the small size plan as cartesian left rdd. + // If plan can broadcast we use BroadcastNestedLoopJoin, as we know for inner join with true + // condition is same as Cartesian. + // For CartesianProduct we will use the small size plan as cartesian left rdd. if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { right match { - case CanBroadcast(right) => execution.joins.BroadcastCartesianProduct(planLater(left), - planLater(right), joins.BuildRight) + case CanBroadcast(right) => joins.BroadcastNestedLoopJoin(planLater(left), + planLater(right), joins.BuildRight, null, null) case _ => execution.joins.CartesianProduct(planLater(left), planLater(right), joins.BuildLeft) } } else { left match { - case CanBroadcast(left) => execution.joins.BroadcastCartesianProduct(planLater(left), - planLater(right), joins.BuildLeft) + case CanBroadcast(left) => joins.BroadcastNestedLoopJoin(planLater(left), + planLater(right), joins.BuildLeft, null, null) case _ => execution.joins.CartesianProduct(planLater(left), planLater(right), joins.BuildRight) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala deleted file mode 100644 index 28334ad50e4c9..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastCartesianProduct.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.joins - -import scala.concurrent._ -import scala.concurrent.duration._ - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} -import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} -import org.apache.spark.util.ThreadUtils - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class BroadcastCartesianProduct( - left: SparkPlan, - right: SparkPlan, - buildSide: BuildSide) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output - - private val (streamed, broadcast) = buildSide match { - case BuildRight => (left, right) - case BuildLeft => (right, left) - } - - private val timeout: Duration = { - val timeoutValue = sqlContext.conf.broadcastTimeout - if (timeoutValue < 0) { - Duration.Inf - } else { - timeoutValue.seconds - } - } - - @transient - private val broadcastFuture = future { - val input = broadcast.execute().map(_.copy()).collect() - sparkContext.broadcast(input) - }(BroadcastCartesianProduct.broadcastCartesianProductExecutionContext) - - protected override def doExecute(): RDD[InternalRow] = { - val leftResults = streamed.execute().map(_.copy()) - val rightResults = Await.result(broadcastFuture, timeout) - - leftResults.mapPartitions { streamedIter => - for (x <- streamedIter; y <- rightResults.value) - yield { - val joinedRow = new JoinedRow - buildSide match { - case BuildRight => joinedRow(x, y) - case BuildLeft => joinedRow(y, x) - } - } - } - } -} - -object BroadcastCartesianProduct { - private val broadcastCartesianProductExecutionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("broadcast-cartesian-product", 128)) -} From 04678d1e7e02486bb8b43fc964f0ff146fbea98b Mon Sep 17 00:00:00 2001 From: linweizhong Date: Sat, 1 Aug 2015 15:34:57 +0800 Subject: [PATCH 16/22] Fix unit test failed --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 820f2afe8d8bb..20e536c212221 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -218,14 +218,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { right match { case CanBroadcast(right) => joins.BroadcastNestedLoopJoin(planLater(left), - planLater(right), joins.BuildRight, null, null) + planLater(right), joins.BuildRight, Inner, None) case _ => execution.joins.CartesianProduct(planLater(left), planLater(right), joins.BuildLeft) } } else { left match { case CanBroadcast(left) => joins.BroadcastNestedLoopJoin(planLater(left), - planLater(right), joins.BuildLeft, null, null) + planLater(right), joins.BuildLeft, Inner, None) case _ => execution.joins.CartesianProduct(planLater(left), planLater(right), joins.BuildRight) } From 8a8658cee375975d96685ec8ce90cab6eba777b5 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Tue, 8 Sep 2015 21:04:20 +0800 Subject: [PATCH 17/22] Add Inner for do cartesian broadcast, SPARK-10484 point out this --- .../spark/sql/execution/joins/BroadcastNestedLoopJoin.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index 28c88b1b03d02..f414b1c358c78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -22,7 +22,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.collection.CompactBuffer @@ -71,6 +71,7 @@ case class BroadcastNestedLoopJoin( left.output.map(_.withNullability(true)) ++ right.output case FullOuter => left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case Inner => left.output ++ right.output case x => throw new IllegalArgumentException( s"BroadcastNestedLoopJoin should not take $x as the JoinType") From 60f2102b6ac5f7395a1dccf31563937f5e1d9f4d Mon Sep 17 00:00:00 2001 From: linweizhong Date: Wed, 9 Sep 2015 14:32:49 +0800 Subject: [PATCH 18/22] Update --- .../spark/sql/execution/SparkStrategies.scala | 35 +++++++++---------- .../execution/joins/CartesianProduct.scala | 34 ++++++++++-------- 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4c714409c4d86..7f0dbec13ee34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} +import org.apache.spark.sql.execution.joins.BuildSide import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} import org.apache.spark.sql.types._ import org.apache.spark.sql.{SQLContext, Strategy, execution} @@ -267,32 +268,30 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } object CartesianProduct extends Strategy { - def createCartesianProduct(left: LogicalPlan, right: LogicalPlan): SparkPlan = { - // If plan can broadcast we use BroadcastNestedLoopJoin, as we know for inner join with true - // condition is same as Cartesian. - // For CartesianProduct we will use the small size plan as cartesian left rdd. + def getSmallSide(left: LogicalPlan, right: LogicalPlan): BuildSide = { if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { - right match { - case CanBroadcast(right) => joins.BroadcastNestedLoopJoin(planLater(left), - planLater(right), joins.BuildRight, Inner, None) - case _ => execution.joins.CartesianProduct(planLater(left), planLater(right), - joins.BuildRight) - } + joins.BuildRight } else { - left match { - case CanBroadcast(left) => joins.BroadcastNestedLoopJoin(planLater(left), - planLater(right), joins.BuildLeft, Inner, None) - case _ => execution.joins.CartesianProduct(planLater(left), planLater(right), - joins.BuildLeft) - } + joins.BuildLeft } } def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + // If plan can broadcast we use BroadcastNestedLoopJoin, as we know for inner join with true + // condition is same as Cartesian. + case logical.Join(CanBroadcast(left), right, joinType, condition) => + execution.joins.BroadcastNestedLoopJoin( + planLater(left), planLater(right), joins.BuildLeft, joinType, condition) :: Nil + case logical.Join(left, CanBroadcast(right), joinType, condition) => + execution.joins.BroadcastNestedLoopJoin( + planLater(left), planLater(right), joins.BuildRight, joinType, condition) :: Nil case logical.Join(left, right, _, None) => - createCartesianProduct(left, right) :: Nil + execution.joins.CartesianProduct(planLater(left), planLater(right), + getSmallSide(left, right)) :: Nil case logical.Join(left, right, Inner, Some(condition)) => - execution.Filter(condition, createCartesianProduct(left, right)) :: Nil + execution.Filter(condition, + execution.joins.CartesianProduct(planLater(left), planLater(right), + getSmallSide(left, right))) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index f537ae481ae33..eeb61f57bd2b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -34,11 +34,6 @@ case class CartesianProduct( buildSide: BuildSide) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output - private val (small, big) = buildSide match { - case BuildRight => (right, left) - case BuildLeft => (left, right) - } - override private[sql] lazy val metrics = Map( "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"), "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"), @@ -49,23 +44,34 @@ case class CartesianProduct( val numRightRows = longMetric("numRightRows") val numOutputRows = longMetric("numOutputRows") - val leftResults = small.execute().map { row => + val leftResults = left.execute().map { row => numLeftRows += 1 row.copy() } - val rightResults = big.execute().map { row => + val rightResults = right.execute().map { row => numRightRows += 1 row.copy() } - leftResults.cartesian(rightResults).mapPartitions { iter => + val (smallResults, bigResults) = buildSide match { + case BuildRight => (rightResults, leftResults) + case BuildLeft => (leftResults, rightResults) + } + + // Use the small size rdd as cartesian left rdd. + smallResults.cartesian(bigResults).mapPartitions { iter => val joinedRow = new JoinedRow - iter.map { r => - numOutputRows += 1 - buildSide match { - case BuildLeft => joinedRow(r._1, r._2) - case BuildRight => joinedRow(r._2, r._1) - } + buildSide match { + case BuildLeft => + iter.map { r => + numOutputRows += 1 + joinedRow(r._1, r._2) + } + case BuildRight => + iter.map { r => + numOutputRows += 1 + joinedRow(r._1, r._2) + } } } } From e01c8f031855b383bba83febbb58b0e6025da845 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Wed, 9 Sep 2015 14:35:16 +0800 Subject: [PATCH 19/22] fix error --- .../org/apache/spark/sql/execution/joins/CartesianProduct.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index eeb61f57bd2b7..8f60fbbd0ae5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -70,7 +70,7 @@ case class CartesianProduct( case BuildRight => iter.map { r => numOutputRows += 1 - joinedRow(r._1, r._2) + joinedRow(r._2, r._1) } } } From a66f4759862b697c6c3ab1dbc0e7444478ba89e2 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Tue, 29 Sep 2015 19:40:31 +0800 Subject: [PATCH 20/22] Add some unit test which PR#8652 have done, and fix unit test error --- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../org/apache/spark/sql/JoinSuite.scala | 80 +++++++++++++++++++ .../execution/metric/SQLMetricsSuite.scala | 22 ++++- 3 files changed, 101 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 04a4a9643dc50..a9e8a42c59fde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -276,7 +276,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object CartesianProduct extends Strategy { def getSmallSide(left: LogicalPlan, right: LogicalPlan): BuildSide = { - if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { + if (right.statistics.sizeInBytes < left.statistics.sizeInBytes) { joins.BuildRight } else { joins.BuildLeft diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 7a027e13089e3..a0860dad1f669 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -27,6 +27,10 @@ class JoinSuite extends QueryTest with SharedSQLContext { setupTestData() + def statisticSizeInByte(df: DataFrame): BigInt = { + df.queryExecution.optimizedPlan.statistics.sizeInBytes + } + test("equi-join is hash-join") { val x = testData2.as("x") val y = testData2.as("y") @@ -465,6 +469,82 @@ class JoinSuite extends QueryTest with SharedSQLContext { sql("UNCACHE TABLE testData") } + test("cross join with broadcast") { + sql("CACHE TABLE testData") + + val sizeInByteOfTestData = statisticSizeInByte(sqlContext.table("testData")) + + // we set the threshold is greater than statistic of the cached table testData + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (sizeInByteOfTestData + 1).toString()) { + + assert(statisticSizeInByte(sqlContext.table("testData2")) > + sqlContext.conf.autoBroadcastJoinThreshold) + + assert(statisticSizeInByte(sqlContext.table("testData")) < + sqlContext.conf.autoBroadcastJoinThreshold) + + Seq( + ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", + classOf[LeftSemiJoinHash]), + ("SELECT * FROM testData LEFT SEMI JOIN testData2", + classOf[LeftSemiJoinBNL]), + ("SELECT * FROM testData JOIN testData2", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData JOIN testData2 WHERE key = 2", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData LEFT JOIN testData2", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData RIGHT JOIN testData2", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData FULL OUTER JOIN testData2", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData LEFT JOIN testData2 WHERE key = 2", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData JOIN testData2 WHERE key > a", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData left JOIN testData2 ON (key * a != key + a)", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData right JOIN testData2 ON (key * a != key + a)", + classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)", + classOf[BroadcastNestedLoopJoin]) + ).foreach { case (query, joinClass) => assertJoin(query, joinClass) } + + checkAnswer( + sql( + """ + SELECT x.value, y.a, y.b FROM testData x JOIN testData2 y WHERE x.key = 2 + """.stripMargin), + Row("2", 1, 1) :: + Row("2", 1, 2) :: + Row("2", 2, 1) :: + Row("2", 2, 2) :: + Row("2", 3, 1) :: + Row("2", 3, 2) :: Nil) + + checkAnswer( + sql( + """ + SELECT x.value, y.a, y.b FROM testData x JOIN testData2 y WHERE x.key < y.a + """.stripMargin), + Row("1", 2, 1) :: + Row("1", 2, 2) :: + Row("1", 3, 1) :: + Row("1", 3, 2) :: + Row("2", 3, 1) :: + Row("2", 3, 2) :: Nil) + } + + sql("UNCACHE TABLE testData") + } + test("left semi join") { val df = sql("SELECT * FROM testData2 LEFT SEMI JOIN testData ON key = a") checkAnswer(df, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 6afffae161ef6..29cb531e34cc5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -455,7 +455,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } - test("CartesianProduct metrics") { + test("CartesianProduct metrics with SPARK-9066, adjust rdd order") { val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2) testDataForJoin.registerTempTable("testDataForJoin") withTempTable("testDataForJoin") { @@ -466,7 +466,25 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { testSparkPlanMetrics(df, 1, Map( 1L -> ("CartesianProduct", Map( "number of left rows" -> 12L, // left needs to be scanned twice - "number of right rows" -> 12L, // right is read 6 times + "number of right rows" -> 4L, // right is read 4 times, actually it is left + "number of output rows" -> 12L))) + ) + } + } + + test("CartesianProduct metrics with SPARK-9066, keep rdd order") { + val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2) + testDataForJoin.registerTempTable("testDataForJoin") + withTempTable("testDataForJoin") { + // Assume the execution plan is + // ... -> CartesianProduct(nodeId = 1) -> TungstenProject(nodeId = 0) + + val df = sqlContext.sql( + "SELECT * FROM testDataForJoin JOIN testData2") + testSparkPlanMetrics(df, 1, Map( + 1L -> ("CartesianProduct", Map( + "number of left rows" -> 4L, // left needs to be scanned twice + "number of right rows" -> 12L, // right is read 12 times "number of output rows" -> 12L))) ) } From 981224234aa602b1477ecca2bafbd33dc10d55fd Mon Sep 17 00:00:00 2001 From: linweizhong Date: Tue, 29 Sep 2015 19:48:19 +0800 Subject: [PATCH 21/22] Fix unit test error --- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 29cb531e34cc5..664efffa3c8a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -455,7 +455,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } - test("CartesianProduct metrics with SPARK-9066, adjust rdd order") { + test("CartesianProduct metrics") { val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2) testDataForJoin.registerTempTable("testDataForJoin") withTempTable("testDataForJoin") { @@ -466,19 +466,20 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { testSparkPlanMetrics(df, 1, Map( 1L -> ("CartesianProduct", Map( "number of left rows" -> 12L, // left needs to be scanned twice - "number of right rows" -> 4L, // right is read 4 times, actually it is left + "number of right rows" -> 12L, // right is read 6 times "number of output rows" -> 12L))) ) } } - test("CartesianProduct metrics with SPARK-9066, keep rdd order") { + test("CartesianProduct metrics with SPARK-9066, adjust rdd order") { val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2) testDataForJoin.registerTempTable("testDataForJoin") withTempTable("testDataForJoin") { // Assume the execution plan is // ... -> CartesianProduct(nodeId = 1) -> TungstenProject(nodeId = 0) - + // As the left.statistics.sizeInBytes equal to right.statistics.sizeInBytes, so we exchange + // the left and right logical plan. val df = sqlContext.sql( "SELECT * FROM testDataForJoin JOIN testData2") testSparkPlanMetrics(df, 1, Map( From ce6ad25bede45c13e4325293b260a3301c1aceca Mon Sep 17 00:00:00 2001 From: linweizhong Date: Tue, 29 Sep 2015 19:54:37 +0800 Subject: [PATCH 22/22] Delete unused unit test --- .../execution/metric/SQLMetricsSuite.scala | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 664efffa3c8a8..6afffae161ef6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -472,25 +472,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } - test("CartesianProduct metrics with SPARK-9066, adjust rdd order") { - val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2) - testDataForJoin.registerTempTable("testDataForJoin") - withTempTable("testDataForJoin") { - // Assume the execution plan is - // ... -> CartesianProduct(nodeId = 1) -> TungstenProject(nodeId = 0) - // As the left.statistics.sizeInBytes equal to right.statistics.sizeInBytes, so we exchange - // the left and right logical plan. - val df = sqlContext.sql( - "SELECT * FROM testDataForJoin JOIN testData2") - testSparkPlanMetrics(df, 1, Map( - 1L -> ("CartesianProduct", Map( - "number of left rows" -> 4L, // left needs to be scanned twice - "number of right rows" -> 12L, // right is read 12 times - "number of output rows" -> 12L))) - ) - } - } - test("save metrics") { withTempPath { file => val previousExecutionIds = sqlContext.listener.executionIdToData.keySet