From 470243cce24bee9a6c49a2f1538932ff3dba8f64 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Thu, 19 Jun 2014 12:40:53 +0800 Subject: [PATCH 1/5] Spark SQL add LeftSemiBloomFilterBroadcastJoin Hi ,All . I want to submit a join operator called LeftSemiBloomFilterBroadcastJoin (LeftSemiJoinBFB) Sometimes the Semijoin's broadcast table can't fit memory.So we can make it as Bloomfilter to reduce the space and then broadcast it do the mapside join . Some code reference HashJoin and BroadcastNestedLoopJoin implementation. The bloomfilter code use Shark's BloomFilter class implementation. --- .../apache/spark/sql/execution/joins.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 8d7a5ba59f96a..7416de6cea78d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -245,6 +245,74 @@ case class LeftSemiJoinBNL( } } + + + +/* +LeftSemiBloomFilterBroadcastJoin +Sometimes the semijoin's broadcast table can't fit memory.So we can make it as Bloomfilter to reduce the space +and then broadcast it do the mapside join +THe bloomfilter use Shark's BloomFilter class implementation. +*/ + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class LeftSemiJoinBFB( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: BuildSide, + left: SparkPlan, + right: SparkPlan, + @transient sc: SparkContext) extends BinaryNode { + override def outputPartitioning: Partitioning = left.outputPartitioning + override def requiredChildDistribution = + ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + val (buildPlan, streamedPlan) = buildSide match { + case BuildLeft => (left, right) + case BuildRight => (right, left) + } + val (buildKeys, streamedKeys) = buildSide match { + case BuildLeft => (leftKeys, rightKeys) + case BuildRight => (rightKeys, leftKeys) + } + def output = left.output++right.output + @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) + @transient lazy val streamSideKeyGenerator = new Projection(streamedKeys, streamedPlan.output) + def execute() = { + + val buildSideKeys=buildPlan.execute().map(row=>buildSideKeyGenerator(row)).collect() + /** + * @param fpp is the expected false positive probability. + * @param expectedSize is the number of elements to be contained. + */ + val fpp: Double=0.03 //This could be a config param + val expectedSize: Int=buildSideKeys.size + val bf = new BloomFilter(fpp,expectedSize) + val iter=buildSideKeys.iterator + while(iter.hasNext) + { + bf.add(buildSideKeyGenerator(iter.next()).toString()) + } + val buildKeysBroadcast=sc.broadcast(bf) + streamedPlan.execute().filter{ currentRow=> + val rowKey = streamSideKeyGenerator(currentRow) + if(buildKeysBroadcast.value.contains(rowKey.toString())) true + else false + }.map(row=>{ + buildRow(row)}) + } +} + + + + + + + + + /** * :: DeveloperApi :: */ From 0078afc259302438f4d31b8925b90a0f737b130a Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 20 Jun 2014 13:05:03 +0800 Subject: [PATCH 2/5] Reformat the code style Reformat the code as intent 4 --- .../apache/spark/sql/execution/joins.scala | 56 ++++++++++--------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 7416de6cea78d..425249b8f9717 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -248,11 +248,12 @@ case class LeftSemiJoinBNL( + /* LeftSemiBloomFilterBroadcastJoin Sometimes the semijoin's broadcast table can't fit memory.So we can make it as Bloomfilter to reduce the space -and then broadcast it do the mapside join -THe bloomfilter use Shark's BloomFilter class implementation. +and then broadcast it do the mapside join +The bloomfilter use Shark's BloomFilter class implementation. */ /** @@ -260,15 +261,17 @@ THe bloomfilter use Shark's BloomFilter class implementation. */ @DeveloperApi case class LeftSemiJoinBFB( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - buildSide: BuildSide, - left: SparkPlan, - right: SparkPlan, - @transient sc: SparkContext) extends BinaryNode { + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: BuildSide, + left: SparkPlan, + right: SparkPlan, + @transient sc: SparkContext) extends BinaryNode { override def outputPartitioning: Partitioning = left.outputPartitioning + override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + val (buildPlan, streamedPlan) = buildSide match { case BuildLeft => (left, right) case BuildRight => (right, left) @@ -277,34 +280,35 @@ case class LeftSemiJoinBFB( case BuildLeft => (leftKeys, rightKeys) case BuildRight => (rightKeys, leftKeys) } - def output = left.output++right.output + + def output = left.output ++ right.output + @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) - @transient lazy val streamSideKeyGenerator = new Projection(streamedKeys, streamedPlan.output) + @transient lazy val streamSideKeyGenerator = new Projection(streamedKeys, streamedPlan.output) + def execute() = { - val buildSideKeys=buildPlan.execute().map(row=>buildSideKeyGenerator(row)).collect() + val buildSideKeys = buildPlan.execute().map(row => buildSideKeyGenerator(row)).collect() /** * @param fpp is the expected false positive probability. * @param expectedSize is the number of elements to be contained. */ - val fpp: Double=0.03 //This could be a config param - val expectedSize: Int=buildSideKeys.size - val bf = new BloomFilter(fpp,expectedSize) - val iter=buildSideKeys.iterator - while(iter.hasNext) - { - bf.add(buildSideKeyGenerator(iter.next()).toString()) + val fpp: Double = 0.03 //This could be a config param + val expectedSize: Int = buildSideKeys.size + val bf = new BloomFilter(fpp, expectedSize) + val iter = buildSideKeys.iterator + while (iter.hasNext) { + bf.add(buildSideKeyGenerator(iter.next()).toString()) } - val buildKeysBroadcast=sc.broadcast(bf) - streamedPlan.execute().filter{ currentRow=> + val buildKeysBroadcast = sc.broadcast(bf) + streamedPlan.execute().filter { currentRow => val rowKey = streamSideKeyGenerator(currentRow) - if(buildKeysBroadcast.value.contains(rowKey.toString())) true - else false - }.map(row=>{ - buildRow(row)}) + if (buildKeysBroadcast.value.contains(rowKey.toString())) true + else false + }.map(row => { + buildRow(row) + }) } -} - From e13f150b1f666757945ee64d1e235f03bd012d3f Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 20 Jun 2014 17:02:00 +0800 Subject: [PATCH 3/5] udpate code format Reformat the intent and annotation --- .../org/apache/spark/sql/execution/joins.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 425249b8f9717..c510ba63a44d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -249,15 +249,12 @@ case class LeftSemiJoinBNL( -/* -LeftSemiBloomFilterBroadcastJoin -Sometimes the semijoin's broadcast table can't fit memory.So we can make it as Bloomfilter to reduce the space -and then broadcast it do the mapside join -The bloomfilter use Shark's BloomFilter class implementation. -*/ - /** * :: DeveloperApi :: + * LeftSemiBloomFilterBroadcastJoin + * Sometimes the semijoin's broadcast table can't fit memory.So we can make it as Bloomfilter to reduce the space + * and then broadcast it do the mapside join + * The bloomfilter use Shark's BloomFilter class implementation. */ @DeveloperApi case class LeftSemiJoinBFB( @@ -315,8 +312,6 @@ case class LeftSemiJoinBFB( - - /** * :: DeveloperApi :: */ From eba72800ee3a62987d7832abb3a3500313442061 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 08:52:47 +0800 Subject: [PATCH 4/5] Update patterns.scala --- .../sql/catalyst/planning/patterns.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 820ecfb78b52e..96e29ad8fcc77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -173,3 +173,22 @@ object Unions { case other => other :: Nil } } + + +object LeftSemiJoinBFBFilteredJoin extends Logging with PredicateHelper { + /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ + type ReturnType = + (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) + + def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { + // All predicates can be evaluated for inner join (i.e., those that are in the ON + // clause and WHERE clause.) + case FilteredOperation(predicates, join@Join(left, right, Inner, condition)) => + logger.debug(s"Considering Semi inner join on: ${predicates ++ condition}") + splitPredicates(predicates ++ condition, join) + case join@Join(left, right, joinType, condition) => + logger.debug(s"Considering Semi join on: $condition") + splitPredicates(condition.toSeq, join) + case _ => None + } +} From 9b36125b9f84966099163ad32ace23c8d3e036ab Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 08:54:34 +0800 Subject: [PATCH 5/5] Update SparkStrategies.scala --- .../apache/spark/sql/execution/SparkStrategies.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 70c1171148ebb..4899e71171098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -259,3 +259,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } } + + +object LeftSemiJoinBFB extends Strategy with PredicateHelper { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case LeftSemiJoinBFBFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => + val hashJoin = + execution.LeftSemiJoinBFBJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case _ => Nil + } +}