diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 820ecfb78b52e..96e29ad8fcc77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -173,3 +173,22 @@ object Unions { case other => other :: Nil } } + + +object LeftSemiJoinBFBFilteredJoin extends Logging with PredicateHelper { + /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ + type ReturnType = + (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) + + def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { + // All predicates can be evaluated for inner join (i.e., those that are in the ON + // clause and WHERE clause.) + case FilteredOperation(predicates, join@Join(left, right, Inner, condition)) => + logger.debug(s"Considering Semi inner join on: ${predicates ++ condition}") + splitPredicates(predicates ++ condition, join) + case join@Join(left, right, joinType, condition) => + logger.debug(s"Considering Semi join on: $condition") + splitPredicates(condition.toSeq, join) + case _ => None + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 70c1171148ebb..4899e71171098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -259,3 +259,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } } + + +object LeftSemiJoinBFB extends Strategy with PredicateHelper { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case LeftSemiJoinBFBFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => + val hashJoin = + execution.LeftSemiJoinBFBJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case _ => Nil + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 8d7a5ba59f96a..c510ba63a44d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -245,6 +245,73 @@ case class LeftSemiJoinBNL( } } + + + + +/** + * :: DeveloperApi :: + * LeftSemiBloomFilterBroadcastJoin + * Sometimes the semijoin's broadcast table can't fit memory.So we can make it as Bloomfilter to reduce the space + * and then broadcast it do the mapside join + * The bloomfilter use Shark's BloomFilter class implementation. + */ +@DeveloperApi +case class LeftSemiJoinBFB( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: BuildSide, + left: SparkPlan, + right: SparkPlan, + @transient sc: SparkContext) extends BinaryNode { + override def outputPartitioning: Partitioning = left.outputPartitioning + + override def requiredChildDistribution = + ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + val (buildPlan, streamedPlan) = buildSide match { + case BuildLeft => (left, right) + case BuildRight => (right, left) + } + val (buildKeys, streamedKeys) = buildSide match { + case BuildLeft => (leftKeys, rightKeys) + case BuildRight => (rightKeys, leftKeys) + } + + def output = left.output ++ right.output + + @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) + @transient lazy val streamSideKeyGenerator = new Projection(streamedKeys, streamedPlan.output) + + def execute() = { + + val buildSideKeys = buildPlan.execute().map(row => buildSideKeyGenerator(row)).collect() + /** + * @param fpp is the expected false positive probability. + * @param expectedSize is the number of elements to be contained. + */ + val fpp: Double = 0.03 //This could be a config param + val expectedSize: Int = buildSideKeys.size + val bf = new BloomFilter(fpp, expectedSize) + val iter = buildSideKeys.iterator + while (iter.hasNext) { + bf.add(buildSideKeyGenerator(iter.next()).toString()) + } + val buildKeysBroadcast = sc.broadcast(bf) + streamedPlan.execute().filter { currentRow => + val rowKey = streamSideKeyGenerator(currentRow) + if (buildKeysBroadcast.value.contains(rowKey.toString())) true + else false + }.map(row => { + buildRow(row) + }) + } + + + + + + /** * :: DeveloperApi :: */