Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,22 @@ object Unions {
case other => other :: Nil
}
}


object LeftSemiJoinBFBFilteredJoin extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
// All predicates can be evaluated for inner join (i.e., those that are in the ON
// clause and WHERE clause.)
case FilteredOperation(predicates, join@Join(left, right, Inner, condition)) =>
logger.debug(s"Considering Semi inner join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
case join@Join(left, right, joinType, condition) =>
logger.debug(s"Considering Semi join on: $condition")
splitPredicates(condition.toSeq, join)
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
}


object LeftSemiJoinBFB extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case LeftSemiJoinBFBFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.LeftSemiJoinBFBJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext)
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case _ => Nil
}
}
67 changes: 67 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,73 @@ case class LeftSemiJoinBNL(
}
}





/**
* :: DeveloperApi ::
* LeftSemiBloomFilterBroadcastJoin
* Sometimes the semijoin's broadcast table can't fit memory.So we can make it as Bloomfilter to reduce the space
* and then broadcast it do the mapside join
* The bloomfilter use Shark's BloomFilter class implementation.
*/
@DeveloperApi
case class LeftSemiJoinBFB(
leftKeys: Seq[Expression],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent 4 spaces. Also I'd go with the full more descriptive name instead of BFB since we are only going to have to type it out in like 2 places.

rightKeys: Seq[Expression],
buildSide: BuildSide,
left: SparkPlan,
right: SparkPlan,
@transient sc: SparkContext) extends BinaryNode {
override def outputPartitioning: Partitioning = left.outputPartitioning

override def requiredChildDistribution =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

val (buildPlan, streamedPlan) = buildSide match {
case BuildLeft => (left, right)
case BuildRight => (right, left)
}
val (buildKeys, streamedKeys) = buildSide match {
case BuildLeft => (leftKeys, rightKeys)
case BuildRight => (rightKeys, leftKeys)
}

def output = left.output ++ right.output

@transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
@transient lazy val streamSideKeyGenerator = new Projection(streamedKeys, streamedPlan.output)

def execute() = {

val buildSideKeys = buildPlan.execute().map(row => buildSideKeyGenerator(row)).collect()
/**
* @param fpp is the expected false positive probability.
* @param expectedSize is the number of elements to be contained.
*/
val fpp: Double = 0.03 //This could be a config param
val expectedSize: Int = buildSideKeys.size
val bf = new BloomFilter(fpp, expectedSize)
val iter = buildSideKeys.iterator
while (iter.hasNext) {
bf.add(buildSideKeyGenerator(iter.next()).toString())
}
val buildKeysBroadcast = sc.broadcast(bf)
streamedPlan.execute().filter { currentRow =>
val rowKey = streamSideKeyGenerator(currentRow)
if (buildKeysBroadcast.value.contains(rowKey.toString())) true
else false
}.map(row => {
buildRow(row)
})
}






/**
* :: DeveloperApi ::
*/
Expand Down