From a8420ca0c4cbc5988607d0cd235ffeb2cb51d052 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 11 May 2014 11:23:02 -0700 Subject: [PATCH 01/15] Copy records in executeCollect to avoid issues with mutable rows. --- .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 235a9b1692460..4613df103943d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { /** * Runs this query returning the result as an array. */ - def executeCollect(): Array[Row] = execute().collect() + def executeCollect(): Array[Row] = execute().map(_.copy()).collect() protected def buildRow(values: Seq[Any]): Row = new GenericRow(values.toArray) From cf6b3818fbe7d1908bcbdc7f18c5773c01d05541 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 11 May 2014 11:30:56 -0700 Subject: [PATCH 02/15] Split out generic logic for hash joins and create two concrete physical operators: BroadcastHashJoin and ShuffledHashJoin. --- .../apache/spark/sql/execution/joins.scala | 196 +++++++++++------- 1 file changed, 126 insertions(+), 70 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 31cc26962ad93..bfd8ae5672d97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -18,13 +18,16 @@ package org.apache.spark.sql.execution import scala.collection.mutable.{ArrayBuffer, BitSet} +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning, UnspecifiedDistribution} @DeveloperApi sealed abstract class BuildSide @@ -35,21 +38,13 @@ case object BuildLeft extends BuildSide @DeveloperApi case object BuildRight extends BuildSide -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class HashJoin( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - buildSide: BuildSide, - left: SparkPlan, - right: SparkPlan) extends BinaryNode { - override def outputPartitioning: Partitioning = left.outputPartitioning - - override def requiredChildDistribution = - ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil +trait HashJoin { + val leftKeys: Seq[Expression] + val rightKeys: Seq[Expression] + val buildSide: BuildSide + val left: SparkPlan + val right: SparkPlan val (buildPlan, streamedPlan) = buildSide match { case BuildLeft => (left, right) @@ -67,79 +62,140 @@ case class HashJoin( @transient lazy val streamSideKeyGenerator = () => new MutableProjection(streamedKeys, streamedPlan.output) - def execute() = { - buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => - // TODO: Use Spark's HashMap implementation. - val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() - var currentRow: Row = null - - // Create a mapping of buildKeys -> rows - while (buildIter.hasNext) { - currentRow = buildIter.next() - val rowKey = buildSideKeyGenerator(currentRow) - if(!rowKey.anyNull) { - val existingMatchList = hashTable.get(rowKey) - val matchList = if (existingMatchList == null) { - val newMatchList = new ArrayBuffer[Row]() - hashTable.put(rowKey, newMatchList) - newMatchList - } else { - existingMatchList - } - matchList += currentRow.copy() + def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { + // TODO: Use Spark's HashMap implementation. + + val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() + var currentRow: Row = null + + // Create a mapping of buildKeys -> rows + while (buildIter.hasNext) { + currentRow = buildIter.next() + val rowKey = buildSideKeyGenerator(currentRow) + if(!rowKey.anyNull) { + val existingMatchList = hashTable.get(rowKey) + val matchList = if (existingMatchList == null) { + val newMatchList = new ArrayBuffer[Row]() + hashTable.put(rowKey, newMatchList) + newMatchList + } else { + existingMatchList } + matchList += currentRow.copy() } + } - new Iterator[Row] { - private[this] var currentStreamedRow: Row = _ - private[this] var currentHashMatches: ArrayBuffer[Row] = _ - private[this] var currentMatchPosition: Int = -1 + new Iterator[Row] { + private[this] var currentStreamedRow: Row = _ + private[this] var currentHashMatches: ArrayBuffer[Row] = _ + private[this] var currentMatchPosition: Int = -1 - // Mutable per row objects. - private[this] val joinRow = new JoinedRow + // Mutable per row objects. + private[this] val joinRow = new JoinedRow - private[this] val joinKeys = streamSideKeyGenerator() + private[this] val joinKeys = streamSideKeyGenerator() - override final def hasNext: Boolean = - (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || + override final def hasNext: Boolean = + (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || (streamIter.hasNext && fetchNext()) - override final def next() = { - val ret = joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) - currentMatchPosition += 1 - ret - } + override final def next() = { + val ret = joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) + currentMatchPosition += 1 + ret + } - /** - * Searches the streamed iterator for the next row that has at least one match in hashtable. - * - * @return true if the search is successful, and false the streamed iterator runs out of - * tuples. - */ - private final def fetchNext(): Boolean = { - currentHashMatches = null - currentMatchPosition = -1 - - while (currentHashMatches == null && streamIter.hasNext) { - currentStreamedRow = streamIter.next() - if (!joinKeys(currentStreamedRow).anyNull) { - currentHashMatches = hashTable.get(joinKeys.currentValue) - } + /** + * Searches the streamed iterator for the next row that has at least one match in hashtable. + * + * @return true if the search is successful, and false the streamed iterator runs out of + * tuples. + */ + private final def fetchNext(): Boolean = { + currentHashMatches = null + currentMatchPosition = -1 + + while (currentHashMatches == null && streamIter.hasNext) { + currentStreamedRow = streamIter.next() + if (!joinKeys(currentStreamedRow).anyNull) { + currentHashMatches = hashTable.get(joinKeys.currentValue) } + } - if (currentHashMatches == null) { - false - } else { - currentMatchPosition = 0 - true - } + if (currentHashMatches == null) { + false + } else { + currentMatchPosition = 0 + true } } } } } +/** + * :: DeveloperApi :: + * Performs and inner hash join of two child relations by first shuffling the data using the join + * keys. + */ +@DeveloperApi +case class ShuffledHashJoin( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: BuildSide, + left: SparkPlan, + right: SparkPlan) extends BinaryNode with HashJoin { + + override def outputPartitioning: Partitioning = left.outputPartitioning + + override def requiredChildDistribution = + ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + + def execute() = { + buildPlan.execute().zipPartitions(streamedPlan.execute()) { + (buildIter, streamIter) => joinIterators(buildIter, streamIter) + } + } +} + + +/** + * :: DeveloperApi :: + * Performs an inner hash join of two child relations. When the operator is constructed, a Spark + * job is asynchronously started to calculate the values for the broadcasted relation. This data + * is then placed in a Spark broadcast variable. The streamed relation is not shuffled. + */ +@DeveloperApi +case class BroadcastHashJoin( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: BuildSide, + left: SparkPlan, + right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin { + + override def otherCopyArgs = sc :: Nil + + override def outputPartitioning: Partitioning = left.outputPartitioning + + override def requiredChildDistribution = + UnspecifiedDistribution :: UnspecifiedDistribution :: Nil + + @transient + lazy val broadcastFuture = future { + sc.broadcast(buildPlan.executeCollect()) + } + + def execute() = { + val broadcastRelation = Await.result(broadcastFuture, 5.minute) + + streamedPlan.execute().mapPartitions { streamedIter => + joinIterators(broadcastRelation.value.iterator, streamedIter) + } + } +} + /** * :: DeveloperApi :: */ From 76ca4341036b95f71763f631049fdae033990ab5 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 11 May 2014 11:31:20 -0700 Subject: [PATCH 03/15] A simple strategy that broadcasts tables only when they are found in a configuration hint. --- .../spark/sql/execution/SparkStrategies.scala | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f763106da4e0e..3799c0af87f08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -21,20 +21,41 @@ import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.parquet._ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => + /** + * Uses the HashFilteredJoin pattern to find joins where at least some of the predicates can be + * evaluated by matching hash keys. + */ object HashJoin extends Strategy with PredicateHelper { + var broadcastTables: Seq[String] = + sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",") + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Find inner joins where at least some predicates can be evaluated by matching hash keys - // using the HashFilteredJoin pattern. + + case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, + left, PhysicalOperation(_, _, b: BaseRelation)) if broadcastTables.contains(b.tableName)=> + val hashJoin = + execution.BroadcastHashJoin( + leftKeys, rightKeys, BuildRight, planLater(left), planLater(b))(sparkContext) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + + case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, + PhysicalOperation(_, _, b: BaseRelation), right) if broadcastTables.contains(b.tableName)=> + val hashJoin = + execution.BroadcastHashJoin( + leftKeys, rightKeys, BuildLeft, planLater(b), planLater(right))(sparkContext) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = - execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) + execution.ShuffledHashJoin( + leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil case _ => Nil } From a92ed0cfbc9fbfada933f702e3879e70cb95283c Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 12 May 2014 15:09:39 -0700 Subject: [PATCH 04/15] Formatting. --- .../spark/sql/execution/SparkStrategies.scala | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3799c0af87f08..e666065df789d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -34,22 +34,36 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object HashJoin extends Strategy with PredicateHelper { var broadcastTables: Seq[String] = - sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",") + sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",").toBuffer def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, - left, PhysicalOperation(_, _, b: BaseRelation)) if broadcastTables.contains(b.tableName)=> + case HashFilteredJoin( + Inner, + leftKeys, + rightKeys, + condition, + left, + right @ PhysicalOperation(_, _, b: BaseRelation)) + if broadcastTables.contains(b.tableName)=> + val hashJoin = execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(b))(sparkContext) + leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil - case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, - PhysicalOperation(_, _, b: BaseRelation), right) if broadcastTables.contains(b.tableName)=> + case HashFilteredJoin( + Inner, + leftKeys, + rightKeys, + condition, + left @ PhysicalOperation(_, _, b: BaseRelation), + right) + if broadcastTables.contains(b.tableName) => + val hashJoin = execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildLeft, planLater(b), planLater(right))(sparkContext) + leftKeys, rightKeys, BuildLeft, planLater(left), planLater(right))(sparkContext) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => From 3e5d77cf80c5ec6349ccc2d3a12990acf4a692be Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 18 Jun 2014 15:53:46 -0700 Subject: [PATCH 05/15] WIP: giant and messy WIP. --- .../sql/catalyst/expressions/Projection.scala | 4 +- .../spark/sql/execution/SparkStrategies.scala | 38 +++++++++++++++++++ .../apache/spark/sql/execution/joins.scala | 11 +++--- .../spark/sql/hive/HiveStrategies.scala | 26 +++++++++++++ 4 files changed, 72 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index c9b7cea6a3e5f..8a309dad9baa6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -45,8 +45,8 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) { * that schema. * * In contrast to a normal projection, a MutableProjection reuses the same underlying row object - * each time an input row is added. This significatly reduces the cost of calcuating the - * projection, but means that it is not safe + * each time an input row is added. This significantly reduces the cost of calculating the + * projection, but means that it is not safe ...? */ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) { def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index e666065df789d..1926176191250 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -38,6 +39,26 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +// case HashFilteredJoin( +// Inner, +// leftKeys, +// rightKeys, +// condition, +// left, +// right @ PhysicalOperation(_, _, b: MetastoreRelation)) +// if tableRawSizeBelowThreshold(left) => +// // TODO: these will be used +//// import org.apache.hadoop.fs.ContentSummary +//// import org.apache.hadoop.fs.FileSystem +//// import org.apache.hadoop.fs.Path +// +// FileSystem.get() +// +// val hashJoin = +// execution.BroadcastHashJoin( +// leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) +// condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case HashFilteredJoin( Inner, leftKeys, @@ -129,8 +150,25 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } +// // FIXME(zongheng): WIP +// object AutoBroadcastHashJoin extends Strategy { +// def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +// case logical.Join(left, right, joinType, condition) => +// +// execution.BroadcastHashJoin() +// +// execution.BroadcastNestedLoopJoin( +// planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil +// case _ => Nil +// } +// } + object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + + // FIXME: WIP -- auto broadcast hash join + case logical.Join + case logical.Join(left, right, joinType, condition) => execution.BroadcastNestedLoopJoin( planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index bfd8ae5672d97..51da3007c68a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -109,7 +109,7 @@ trait HashJoin { /** * Searches the streamed iterator for the next row that has at least one match in hashtable. * - * @return true if the search is successful, and false the streamed iterator runs out of + * @return true if the search is successful, and false if the streamed iterator runs out of * tuples. */ private final def fetchNext(): Boolean = { @@ -136,7 +136,7 @@ trait HashJoin { /** * :: DeveloperApi :: - * Performs and inner hash join of two child relations by first shuffling the data using the join + * Performs an inner hash join of two child relations by first shuffling the data using the join * keys. */ @DeveloperApi @@ -163,9 +163,10 @@ case class ShuffledHashJoin( /** * :: DeveloperApi :: - * Performs an inner hash join of two child relations. When the operator is constructed, a Spark - * job is asynchronously started to calculate the values for the broadcasted relation. This data - * is then placed in a Spark broadcast variable. The streamed relation is not shuffled. + * Performs an inner hash join of two child relations. When the output RDD of this operator is + * being constructed, a Spark job is asynchronously started to calculate the values for the + * broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed + * relation is not shuffled. */ @DeveloperApi case class BroadcastHashJoin( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b2157074a41bf..7f6d6dc1a80cc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.fs.FileSystem + +import org.apache.spark.sql import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -32,6 +35,29 @@ private[hive] trait HiveStrategies { val hiveContext: HiveContext + // FIXME(zongheng): WIP + object HashJoin extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case HashFilteredJoin( + Inner, + leftKeys, + rightKeys, + condition, + left, + right @ PhysicalOperation(_, _, b: MetastoreRelation)) => + + val path = b.hiveQlTable.getPath + val fs = path.getFileSystem(hiveContext.hiveconf) + val size = fs.getContentSummary(path).getLength // TODO: in bytes? + + + val hashJoin = + sql.execution.BroadcastHashJoin( + leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + } + } + object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child) => From 7c7158bf21ee9500a871e1d8fd770ec77c5177bb Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Fri, 20 Jun 2014 15:31:53 -0700 Subject: [PATCH 06/15] Prototype of auto conversion to broadcast hash join. --- .../catalyst/plans/logical/LogicalPlan.scala | 9 ++ .../scala/org/apache/spark/sql/SQLConf.scala | 13 ++ .../org/apache/spark/sql/SQLContext.scala | 2 + .../spark/sql/execution/SparkPlan.scala | 1 - .../spark/sql/execution/SparkStrategies.scala | 116 ++++++++---------- .../spark/sql/parquet/ParquetRelation.scala | 16 ++- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 ++- .../spark/sql/hive/HiveStrategies.scala | 26 ---- 9 files changed, 103 insertions(+), 98 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0933a31c362d8..d70ef6e826cc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -109,3 +109,12 @@ abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] { abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] { self: Product => } + +/** + * A trait that can be mixed in by logical operators representing relations that could + * estimate their physical sizes. + * @tparam Ctx input (context) to the size estimator + */ +trait SizeEstimatableRelation[Ctx] { self: LogicalPlan => + def estimatedSize(context: Ctx): Long +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index b378252ba2f55..740023ab44277 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -29,9 +29,22 @@ import scala.collection.JavaConverters._ */ trait SQLConf { + /************************** Spark SQL Params/Hints ********************/ + /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt + /** + * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to + * a broadcast value during the physical executions of join operations. Setting this to 0 + * effectively disables auto conversion. + * Hive setting: hive.auto.convert.join.noconditionaltask.size. + */ + private[spark] def autoConvertJoinSize: Int = + get("spark.sql.auto.convert.join.size", "10000").toInt + + /************************ SQLConf functionality methods *************/ + @transient private val settings = java.util.Collections.synchronizedMap( new java.util.HashMap[String, String]()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1617ec717b2e0..65db4f9290f29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -223,6 +223,8 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext = self.sparkContext + val sqlContext = self + def numPartitions = self.numShufflePartitions val strategies: Seq[Strategy] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 07967fe75e882..b758a4d13411e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.columnar.InMemoryColumnarTableScan /** * :: DeveloperApi :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 32d8b08452f3c..97cf0f043d9f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.execution -import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{SizeEstimatableRelation, BaseRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.parquet._ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} @@ -30,6 +29,8 @@ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableSca private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => + val sqlContext: SQLContext + object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // Find left semi joins where at least some predicates can be evaluated by matching hash @@ -51,64 +52,70 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * evaluated by matching hash keys. */ object HashJoin extends Strategy with PredicateHelper { - var broadcastTables: Seq[String] = - sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",").toBuffer - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + private[this] def broadcastHashJoin( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + left: LogicalPlan, + right: LogicalPlan, + condition: Option[Expression], + side: BuildSide) = { + val broadcastHashJoin = execution.BroadcastHashJoin( + leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) + condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil + } -// case HashFilteredJoin( -// Inner, -// leftKeys, -// rightKeys, -// condition, -// left, -// right @ PhysicalOperation(_, _, b: MetastoreRelation)) -// if tableRawSizeBelowThreshold(left) => -// // TODO: these will be used -//// import org.apache.hadoop.fs.ContentSummary -//// import org.apache.hadoop.fs.FileSystem -//// import org.apache.hadoop.fs.Path -// -// FileSystem.get() -// -// val hashJoin = -// execution.BroadcastHashJoin( -// leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) -// condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + def broadcastTables: Seq[String] = + sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",").toBuffer + // TODO: how to unit test these conversions? + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case HashFilteredJoin( - Inner, - leftKeys, - rightKeys, - condition, - left, - right @ PhysicalOperation(_, _, b: BaseRelation)) - if broadcastTables.contains(b.tableName)=> + Inner, + leftKeys, + rightKeys, + condition, + left, + right @ PhysicalOperation(_, _, b: BaseRelation)) + if broadcastTables.contains(b.tableName) => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - val hashJoin = - execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case HashFilteredJoin( + Inner, + leftKeys, + rightKeys, + condition, + left @ PhysicalOperation(_, _, b: BaseRelation), + right) + if broadcastTables.contains(b.tableName) => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case HashFilteredJoin( - Inner, - leftKeys, - rightKeys, - condition, - left @ PhysicalOperation(_, _, b: BaseRelation), - right) - if broadcastTables.contains(b.tableName) => + Inner, + leftKeys, + rightKeys, + condition, + left, + right @ PhysicalOperation(_, _, b: SizeEstimatableRelation[SQLContext])) + if b.estimatedSize(sqlContext) <= sqlContext.autoConvertJoinSize => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - val hashJoin = - execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildLeft, planLater(left), planLater(right))(sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case HashFilteredJoin( + Inner, + leftKeys, + rightKeys, + condition, + left @ PhysicalOperation(_, _, b: SizeEstimatableRelation[SQLContext]), + right) + if b.estimatedSize(sqlContext) <= sqlContext.autoConvertJoinSize => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = execution.ShuffledHashJoin( leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case _ => Nil } } @@ -167,25 +174,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } -// // FIXME(zongheng): WIP -// object AutoBroadcastHashJoin extends Strategy { -// def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { -// case logical.Join(left, right, joinType, condition) => -// -// execution.BroadcastHashJoin() -// -// execution.BroadcastNestedLoopJoin( -// planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil -// case _ => Nil -// } -// } - object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - - // FIXME: WIP -- auto broadcast hash join - case logical.Join - case logical.Join(left, right, joinType, condition) => execution.BroadcastNestedLoopJoin( planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 32813a66de3c3..f6ae1ddd1e647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -32,9 +32,10 @@ import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveTyp import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName} import parquet.schema.Type.Repetition +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} +import org.apache.spark.sql.catalyst.plans.logical.{SizeEstimatableRelation, LogicalPlan, LeafNode} import org.apache.spark.sql.catalyst.types._ // Implicits @@ -52,10 +53,19 @@ import scala.collection.JavaConversions._ * * @param path The path to the Parquet file. */ -private[sql] case class ParquetRelation(val path: String) - extends LeafNode with MultiInstanceRelation { +private[sql] case class ParquetRelation(path: String) + extends LeafNode + with MultiInstanceRelation + with SizeEstimatableRelation[SQLContext] { self: Product => + def estimatedSize(context: SQLContext): Long = { + // TODO: right config? + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(context.sparkContext.hadoopConfiguration) + fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? + } + /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index cc95b7af0abf6..bf084584d41dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -257,7 +257,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { struct.zip(fields).map { case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" }.mkString("{", ",", "}") - case (seq: Seq[_], ArrayType(typ))=> + case (seq: Seq[_], ArrayType(typ)) => seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") case (map: Map[_,_], MapType(kType, vType)) => map.map { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 68284344afd55..af513dff189a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -34,9 +34,8 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.SparkLogicalPlan -import org.apache.spark.sql.hive.execution.{HiveTableScan, InsertIntoHiveTable} -import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} +import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.hive.execution.HiveTableScan /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -241,16 +240,25 @@ object HiveMetastoreTypes extends RegexParsers { } } + + private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) - extends BaseRelation { + extends BaseRelation + with SizeEstimatableRelation[HiveContext] { // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions. // Right now, using org.apache.hadoop.hive.ql.metadata.Table and // org.apache.hadoop.hive.ql.metadata.Partition will cause a NotSerializableException // which indicates the SerDe we used is not Serializable. + def estimatedSize(context: HiveContext): Long = { + val path = hiveQlTable.getPath + val fs = path.getFileSystem(context.hiveconf) // TODO: or sc.hadoopConfiguration? + fs.getContentSummary(path).getLength // TODO: in bytes or system-dependent? + } + def hiveQlTable = new Table(table) def hiveQlPartitions = partitions.map { p => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ada606c3a94a3..0ac0ee9071f36 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.fs.FileSystem - -import org.apache.spark.sql import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -35,29 +32,6 @@ private[hive] trait HiveStrategies { val hiveContext: HiveContext - // FIXME(zongheng): WIP - object HashJoin extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case HashFilteredJoin( - Inner, - leftKeys, - rightKeys, - condition, - left, - right @ PhysicalOperation(_, _, b: MetastoreRelation)) => - - val path = b.hiveQlTable.getPath - val fs = path.getFileSystem(hiveContext.hiveconf) - val size = fs.getContentSummary(path).getLength // TODO: in bytes? - - - val hashJoin = - sql.execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil - } - } - object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child) => From 0e64b084000f1075da994c0f1cffd254c846bbd7 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Fri, 20 Jun 2014 15:55:10 -0700 Subject: [PATCH 07/15] Scalastyle fix. --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 740023ab44277..aed34f006fc45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -29,7 +29,7 @@ import scala.collection.JavaConverters._ */ trait SQLConf { - /************************** Spark SQL Params/Hints ********************/ + /** ************************ Spark SQL Params/Hints ******************* */ /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt @@ -43,7 +43,7 @@ trait SQLConf { private[spark] def autoConvertJoinSize: Int = get("spark.sql.auto.convert.join.size", "10000").toInt - /************************ SQLConf functionality methods *************/ + /** ********************** SQLConf functionality methods ************ */ @transient private val settings = java.util.Collections.synchronizedMap( From a4267be8c13f3bebcac65830a4e869e61a7bbf52 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 24 Jun 2014 11:41:15 -0700 Subject: [PATCH 08/15] Add test for broadcast hash join and related necessary refactorings: - Make SparkLogicalPlan a BaseRelation. Moreover, in SQLContext#registerRDDAsTable, propagate the new table name to any SparkLogicalPlan with an ExistingRdd child. Essentially we are treating such a plan as a relation. - Move all current join related tests into JoinSuite, to prepare for a better test framework for join algorithms. --- .../sql/catalyst/expressions/Projection.scala | 6 +- .../catalyst/plans/logical/BaseRelation.scala | 1 - .../scala/org/apache/spark/sql/SQLConf.scala | 4 + .../org/apache/spark/sql/SQLContext.scala | 6 +- .../spark/sql/execution/SparkPlan.scala | 12 +- .../spark/sql/execution/SparkStrategies.scala | 39 +--- .../spark/sql/execution/basicOperators.scala | 2 +- .../apache/spark/sql/execution/joins.scala | 2 - .../org/apache/spark/sql/DslQuerySuite.scala | 99 ----------- .../org/apache/spark/sql/JoinSuite.scala | 166 ++++++++++++++++++ .../org/apache/spark/sql/QueryTest.scala | 4 +- .../spark/sql/execution/PlannerSuite.scala | 17 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 - 13 files changed, 197 insertions(+), 163 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 8a309dad9baa6..ac92f65c3598e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -46,7 +46,9 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) { * * In contrast to a normal projection, a MutableProjection reuses the same underlying row object * each time an input row is added. This significantly reduces the cost of calculating the - * projection, but means that it is not safe ...? + * projection, but means that it is not safe to hold on to a reference to a [[Row]] after `next()` + * has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()` + * and hold on to the returned [[Row]] before calling `next()`. */ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) { def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = @@ -67,7 +69,7 @@ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) } /** - * A mutable wrapper that makes two rows appear appear as a single concatenated row. Designed to + * A mutable wrapper that makes two rows appear as a single concatenated row. Designed to * be instantiated once per thread and reused. */ class JoinedRow extends Row { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala index 7c616788a3830..582334aa42590 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala @@ -21,5 +21,4 @@ abstract class BaseRelation extends LeafNode { self: Product => def tableName: String - def isPartitioned: Boolean = false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index aed34f006fc45..2fe7f94663996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -30,6 +30,7 @@ import scala.collection.JavaConverters._ trait SQLConf { /** ************************ Spark SQL Params/Hints ******************* */ + // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt @@ -43,6 +44,9 @@ trait SQLConf { private[spark] def autoConvertJoinSize: Int = get("spark.sql.auto.convert.join.size", "10000").toInt + /** A comma-separated list of table names marked to be broadcasted during joins. */ + private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "") + /** ********************** SQLConf functionality methods ************ */ @transient diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b566413bffcf7..ac0a9dd7ad69c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -170,7 +170,11 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { - catalog.registerTable(None, tableName, rdd.logicalPlan) + val name = tableName + val newPlan = rdd.logicalPlan transform { + case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name) + } + catalog.registerTable(None, tableName, newPlan) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b758a4d13411e..12cf9c7bdcf3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.plans.logical.BaseRelation import org.apache.spark.sql.{Logging, Row} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation @@ -65,19 +66,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { * linking. */ @DeveloperApi -case class SparkLogicalPlan(alreadyPlanned: SparkPlan) - extends logical.LogicalPlan with MultiInstanceRelation { +case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan") + extends BaseRelation with MultiInstanceRelation { def output = alreadyPlanned.output - def references = Set.empty - def children = Nil + override def references = Set.empty + override def children = Nil override final def newInstance: this.type = { SparkLogicalPlan( alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) case _ => sys.error("Multiple instance of the same relation detected.") - }).asInstanceOf[this.type] + }, tableName) + .asInstanceOf[this.type] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 461ca0cd6e6f9..950463932f296 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -21,10 +21,10 @@ import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{SizeEstimatableRelation, BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.parquet._ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} +import org.apache.spark.sql.parquet._ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => @@ -52,7 +52,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * evaluated by matching hash keys. */ object HashJoin extends Strategy with PredicateHelper { - private[this] def broadcastHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], @@ -65,10 +64,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil } - def broadcastTables: Seq[String] = - sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",").toBuffer + def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer - // TODO: how to unit test these conversions? def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case HashFilteredJoin( Inner, @@ -78,7 +75,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { left, right @ PhysicalOperation(_, _, b: BaseRelation)) if broadcastTables.contains(b.tableName) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case HashFilteredJoin( Inner, @@ -88,27 +85,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { left @ PhysicalOperation(_, _, b: BaseRelation), right) if broadcastTables.contains(b.tableName) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - - case HashFilteredJoin( - Inner, - leftKeys, - rightKeys, - condition, - left, - right @ PhysicalOperation(_, _, b: SizeEstimatableRelation[SQLContext])) - if b.estimatedSize(sqlContext) <= sqlContext.autoConvertJoinSize => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - - case HashFilteredJoin( - Inner, - leftKeys, - rightKeys, - condition, - left @ PhysicalOperation(_, _, b: SizeEstimatableRelation[SQLContext]), - right) - if b.estimatedSize(sqlContext) <= sqlContext.autoConvertJoinSize => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = @@ -125,10 +102,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Aggregate(groupingExpressions, aggregateExpressions, child) => // Collect all aggregate expressions. val allAggregates = - aggregateExpressions.flatMap(_ collect { case a: AggregateExpression => a}) + aggregateExpressions.flatMap(_ collect { case a: AggregateExpression => a }) // Collect all aggregate expressions that can be computed partially. val partialAggregates = - aggregateExpressions.flatMap(_ collect { case p: PartialAggregate => p}) + aggregateExpressions.flatMap(_ collect { case p: PartialAggregate => p }) // Only do partial aggregation if supported by all aggregate expressions. if (allAggregates.size == partialAggregates.size) { @@ -305,7 +282,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.ExistingRdd(Nil, singleRowRdd) :: Nil case logical.Repartition(expressions, child) => execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil - case SparkLogicalPlan(existingPlan) => existingPlan :: Nil + case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8969794c69933..bf092c7373efb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -201,6 +201,6 @@ object ExistingRdd { */ @DeveloperApi case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { + def tableName: String = s"ExistingRdd(${rdd.name})" override def execute() = rdd } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 66e862e3053e9..2ff92f3859621 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -38,7 +38,6 @@ case object BuildLeft extends BuildSide @DeveloperApi case object BuildRight extends BuildSide - trait HashJoin { val leftKeys: Seq[Expression] val rightKeys: Seq[Expression] @@ -334,7 +333,6 @@ case class BroadcastNestedLoopJoin( .map(c => BindReferences.bindReference(c, left.output ++ right.output)) .getOrElse(Literal(true))) - def execute() = { val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index fb599e1e01e73..e4a64a7a482b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.test._ /* Implicits */ @@ -149,102 +148,4 @@ class DslQuerySuite extends QueryTest { test("zero count") { assert(emptyTableData.count() === 0) } - - test("inner join where, one match per row") { - checkAnswer( - upperCaseData.join(lowerCaseData, Inner).where('n === 'N), - Seq( - (1, "A", 1, "a"), - (2, "B", 2, "b"), - (3, "C", 3, "c"), - (4, "D", 4, "d") - )) - } - - test("inner join ON, one match per row") { - checkAnswer( - upperCaseData.join(lowerCaseData, Inner, Some('n === 'N)), - Seq( - (1, "A", 1, "a"), - (2, "B", 2, "b"), - (3, "C", 3, "c"), - (4, "D", 4, "d") - )) - } - - test("inner join, where, multiple matches") { - val x = testData2.where('a === 1).as('x) - val y = testData2.where('a === 1).as('y) - checkAnswer( - x.join(y).where("x.a".attr === "y.a".attr), - (1,1,1,1) :: - (1,1,1,2) :: - (1,2,1,1) :: - (1,2,1,2) :: Nil - ) - } - - test("inner join, no matches") { - val x = testData2.where('a === 1).as('x) - val y = testData2.where('a === 2).as('y) - checkAnswer( - x.join(y).where("x.a".attr === "y.a".attr), - Nil) - } - - test("big inner join, 4 matches per row") { - val bigData = testData.unionAll(testData).unionAll(testData).unionAll(testData) - val bigDataX = bigData.as('x) - val bigDataY = bigData.as('y) - - checkAnswer( - bigDataX.join(bigDataY).where("x.key".attr === "y.key".attr), - testData.flatMap( - row => Seq.fill(16)((row ++ row).toSeq)).collect().toSeq) - } - - test("cartisian product join") { - checkAnswer( - testData3.join(testData3), - (1, null, 1, null) :: - (1, null, 2, 2) :: - (2, 2, 1, null) :: - (2, 2, 2, 2) :: Nil) - } - - test("left outer join") { - checkAnswer( - upperCaseData.join(lowerCaseData, LeftOuter, Some('n === 'N)), - (1, "A", 1, "a") :: - (2, "B", 2, "b") :: - (3, "C", 3, "c") :: - (4, "D", 4, "d") :: - (5, "E", null, null) :: - (6, "F", null, null) :: Nil) - } - - test("right outer join") { - checkAnswer( - lowerCaseData.join(upperCaseData, RightOuter, Some('n === 'N)), - (1, "a", 1, "A") :: - (2, "b", 2, "B") :: - (3, "c", 3, "C") :: - (4, "d", 4, "D") :: - (null, null, 5, "E") :: - (null, null, 6, "F") :: Nil) - } - - test("full outer join") { - val left = upperCaseData.where('N <= 4).as('left) - val right = upperCaseData.where('N >= 3).as('right) - - checkAnswer( - left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)), - (1, "A", null, null) :: - (2, "B", null, null) :: - (3, "C", 3, "C") :: - (4, "D", 4, "D") :: - (null, null, 5, "E") :: - (null, null, 6, "F") :: Nil) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala new file mode 100644 index 0000000000000..b150bd660876b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.TestData._ +import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext._ + +class JoinSuite extends QueryTest { + + // Ensures tables are loaded. + TestData + + test("equi-join is hash-join") { + val x = testData2.as('x) + val y = testData2.as('y) + val join = x.join(y, Inner, Some("x.a".attr === "y.a".attr)).queryExecution.analyzed + val planned = planner.HashJoin(join) + assert(planned.size === 1) + } + + test("plans broadcast hash join, given hints") { + TestSQLContext.set("spark.sql.join.broadcastTables", "testData2") + val rdd = sql("""SELECT * FROM testData t JOIN testData2 t2 ON t.key = t2.a""") + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + val physical = rdd.queryExecution.sparkPlan + val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == BuildRight => j } + + assert(bhj.size === 1) + checkAnswer( + rdd, + Seq( + (1, "1", 1, 1), + (1, "1", 1, 2), + (2, "2", 2, 1), + (2, "2", 2, 2), + (3, "3", 3, 1), + (3, "3", 3, 2) + )) + } + + test("multiple-key equi-join is hash-join") { + val x = testData2.as('x) + val y = testData2.as('y) + val join = x.join(y, Inner, + Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).queryExecution.analyzed + val planned = planner.HashJoin(join) + assert(planned.size === 1) + } + + test("inner join where, one match per row") { + checkAnswer( + upperCaseData.join(lowerCaseData, Inner).where('n === 'N), + Seq( + (1, "A", 1, "a"), + (2, "B", 2, "b"), + (3, "C", 3, "c"), + (4, "D", 4, "d") + )) + } + + test("inner join ON, one match per row") { + checkAnswer( + upperCaseData.join(lowerCaseData, Inner, Some('n === 'N)), + Seq( + (1, "A", 1, "a"), + (2, "B", 2, "b"), + (3, "C", 3, "c"), + (4, "D", 4, "d") + )) + } + + test("inner join, where, multiple matches") { + val x = testData2.where('a === 1).as('x) + val y = testData2.where('a === 1).as('y) + checkAnswer( + x.join(y).where("x.a".attr === "y.a".attr), + (1,1,1,1) :: + (1,1,1,2) :: + (1,2,1,1) :: + (1,2,1,2) :: Nil + ) + } + + test("inner join, no matches") { + val x = testData2.where('a === 1).as('x) + val y = testData2.where('a === 2).as('y) + checkAnswer( + x.join(y).where("x.a".attr === "y.a".attr), + Nil) + } + + test("big inner join, 4 matches per row") { + val bigData = testData.unionAll(testData).unionAll(testData).unionAll(testData) + val bigDataX = bigData.as('x) + val bigDataY = bigData.as('y) + + checkAnswer( + bigDataX.join(bigDataY).where("x.key".attr === "y.key".attr), + testData.flatMap( + row => Seq.fill(16)((row ++ row).toSeq)).collect().toSeq) + } + + test("cartisian product join") { + checkAnswer( + testData3.join(testData3), + (1, null, 1, null) :: + (1, null, 2, 2) :: + (2, 2, 1, null) :: + (2, 2, 2, 2) :: Nil) + } + + test("left outer join") { + checkAnswer( + upperCaseData.join(lowerCaseData, LeftOuter, Some('n === 'N)), + (1, "A", 1, "a") :: + (2, "B", 2, "b") :: + (3, "C", 3, "c") :: + (4, "D", 4, "d") :: + (5, "E", null, null) :: + (6, "F", null, null) :: Nil) + } + + test("right outer join") { + checkAnswer( + lowerCaseData.join(upperCaseData, RightOuter, Some('n === 'N)), + (1, "a", 1, "A") :: + (2, "b", 2, "B") :: + (3, "c", 3, "C") :: + (4, "d", 4, "D") :: + (null, null, 5, "E") :: + (null, null, 6, "F") :: Nil) + } + + test("full outer join") { + val left = upperCaseData.where('N <= 4).as('left) + val right = upperCaseData.where('N >= 3).as('right) + + checkAnswer( + left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)), + (1, "A", null, null) :: + (2, "B", null, null) :: + (3, "C", 3, "C") :: + (4, "D", 4, "D") :: + (null, null, 5, "E") :: + (null, null, 6, "F") :: Nil) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index ef84ead2e6e8b..8e1e1971d968b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -35,7 +35,7 @@ class QueryTest extends PlanTest { case singleItem => Seq(Seq(singleItem)) } - val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s}.nonEmpty + val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty def prepareAnswer(answer: Seq[Any]) = if (!isSorted) answer.sortBy(_.toString) else answer val sparkAnswer = try rdd.collect().toSeq catch { case e: Exception => @@ -48,7 +48,7 @@ class QueryTest extends PlanTest { """.stripMargin) } - if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { + if (prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { fail(s""" |Results do not match for query: |${rdd.logicalPlan} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index df6b118360d01..215618e852eb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -57,21 +57,4 @@ class PlannerSuite extends FunSuite { val planned = PartialAggregation(query) assert(planned.isEmpty) } - - test("equi-join is hash-join") { - val x = testData2.as('x) - val y = testData2.as('y) - val join = x.join(y, Inner, Some("x.a".attr === "y.a".attr)).queryExecution.analyzed - val planned = planner.HashJoin(join) - assert(planned.size === 1) - } - - test("multiple-key equi-join is hash-join") { - val x = testData2.as('x) - val y = testData2.as('y) - val join = x.join(y, Inner, - Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).queryExecution.analyzed - val planned = planner.HashJoin(join) - assert(planned.size === 1) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9c483752f9dc0..02d6d6c0aa272 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -267,8 +267,6 @@ private[hive] case class MetastoreRelation new Partition(hiveQlTable, p) } - override def isPartitioned = hiveQlTable.isPartitioned - val tableDesc = new TableDesc( Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]], hiveQlTable.getInputFormatClass, From 6fd84435e59134248925fcf48e4d7bb8143da5bf Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 24 Jun 2014 13:14:21 -0700 Subject: [PATCH 09/15] Cut down size estimation related stuff. --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 9 --------- .../org/apache/spark/sql/parquet/ParquetRelation.scala | 8 +++----- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 3 +-- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 98018799c72d6..edc37e3877c0e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -109,12 +109,3 @@ abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] { abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] { self: Product => } - -/** - * A trait that can be mixed in by logical operators representing relations that could - * estimate their physical sizes. - * @tparam Ctx input (context) to the size estimator - */ -trait SizeEstimatableRelation[Ctx] { self: LogicalPlan => - def estimatedSize(context: Ctx): Long -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index cc1371ec5d60c..25f973f59cc8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -29,9 +29,8 @@ import parquet.schema.MessageType import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} -import org.apache.spark.sql.catalyst.plans.logical.{SizeEstimatableRelation, LogicalPlan, LeafNode} -import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} // Implicits import scala.collection.JavaConversions._ @@ -52,8 +51,7 @@ private[sql] case class ParquetRelation( path: String, @transient conf: Option[Configuration] = None) extends LeafNode - with MultiInstanceRelation - with SizeEstimatableRelation[SQLContext] { + with MultiInstanceRelation { self: Product => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 02d6d6c0aa272..d66727be62f69 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -247,8 +247,7 @@ object HiveMetastoreTypes extends RegexParsers { private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) - extends BaseRelation - with SizeEstimatableRelation[HiveContext] { + extends BaseRelation { // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions. // Right now, using org.apache.hadoop.hive.ql.metadata.Table and From a8a093e02a98800cc66a1a51f5e1f3f8fb4d0518 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 24 Jun 2014 13:20:38 -0700 Subject: [PATCH 10/15] Minor cleanups. --- .../org/apache/spark/sql/execution/SparkPlan.scala | 4 ++-- .../apache/spark/sql/execution/basicOperators.scala | 1 - .../scala/org/apache/spark/sql/execution/joins.scala | 5 ++--- .../apache/spark/sql/parquet/ParquetRelation.scala | 11 +---------- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 8 -------- 5 files changed, 5 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 12cf9c7bdcf3f..483359d1b1273 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.plans.logical.BaseRelation import org.apache.spark.sql.{Logging, Row} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical} +import org.apache.spark.sql.catalyst.plans.logical.BaseRelation +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index bf092c7373efb..2ceb42108d4a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -201,6 +201,5 @@ object ExistingRdd { */ @DeveloperApi case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { - def tableName: String = s"ExistingRdd(${rdd.name})" override def execute() = rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 2ff92f3859621..eaaa6c3f7a0a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql.execution import scala.collection.mutable.{ArrayBuffer, BitSet} +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent._ import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global import org.apache.spark.SparkContext - import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical._ @DeveloperApi sealed abstract class BuildSide diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 25f973f59cc8b..716facabc3d33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -49,19 +49,10 @@ import scala.collection.JavaConversions._ */ private[sql] case class ParquetRelation( path: String, - @transient conf: Option[Configuration] = None) - extends LeafNode - with MultiInstanceRelation { + @transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation { self: Product => - def estimatedSize(context: SQLContext): Long = { - // TODO: right config? - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(context.sparkContext.hadoopConfiguration) - fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? - } - /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d66727be62f69..1145fdb2576aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -242,8 +242,6 @@ object HiveMetastoreTypes extends RegexParsers { } } - - private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) @@ -254,12 +252,6 @@ private[hive] case class MetastoreRelation // org.apache.hadoop.hive.ql.metadata.Partition will cause a NotSerializableException // which indicates the SerDe we used is not Serializable. - def estimatedSize(context: HiveContext): Long = { - val path = hiveQlTable.getPath - val fs = path.getFileSystem(context.hiveconf) // TODO: or sc.hadoopConfiguration? - fs.getContentSummary(path).getLength // TODO: in bytes or system-dependent? - } - def hiveQlTable = new Table(table) def hiveQlPartitions = partitions.map { p => From ad6c7cccc9eb738313403385278b7c96289703c7 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 24 Jun 2014 13:50:49 -0700 Subject: [PATCH 11/15] Minor cleanups. --- .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 2 +- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 483359d1b1273..27dc091b85812 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.{Logging, Row} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.catalyst.plans.logical.BaseRelation import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.BaseRelation import org.apache.spark.sql.catalyst.plans.physical._ /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2fbd9494cdcb9..4b40a11708fc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -29,8 +29,6 @@ import org.apache.spark.sql.parquet._ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => - val sqlContext: SQLContext - object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // Find left semi joins where at least some predicates can be evaluated by matching hash From 208d5f643c02ec3ecf1bbfe58999acff8ccb4dd1 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 24 Jun 2014 14:37:29 -0700 Subject: [PATCH 12/15] Make LeftSemiJoinHash mix in HashJoin. --- .../org/apache/spark/sql/execution/joins.scala | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index ea35859f4d645..ef068245d0b74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -44,12 +44,12 @@ trait HashJoin { val left: SparkPlan val right: SparkPlan - val (buildPlan, streamedPlan) = buildSide match { + lazy val (buildPlan, streamedPlan) = buildSide match { case BuildLeft => (left, right) case BuildRight => (right, left) } - val (buildKeys, streamedKeys) = buildSide match { + lazy val (buildKeys, streamedKeys) = buildSide match { case BuildLeft => (leftKeys, rightKeys) case BuildRight => (rightKeys, leftKeys) } @@ -60,7 +60,6 @@ trait HashJoin { @transient lazy val streamSideKeyGenerator = () => new MutableProjection(streamedKeys, streamedPlan.output) - def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { // TODO: Use Spark's HashMap implementation. @@ -167,19 +166,13 @@ case class LeftSemiJoinHash( leftKeys: Seq[Expression], rightKeys: Seq[Expression], left: SparkPlan, - right: SparkPlan) extends BinaryNode { - - val (buildPlan, streamedPlan) = (right, left) - val (buildKeys, streamedKeys) = (rightKeys, leftKeys) + right: SparkPlan) extends BinaryNode with HashJoin { - def output = left.output + val buildSide = BuildRight - @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) - @transient lazy val streamSideKeyGenerator = - () => new MutableProjection(streamedKeys, streamedPlan.output) + override def output = left.output def execute() = { - buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null From 440d277bcb82311d16a39a8132308dfdd2b50bbe Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 24 Jun 2014 15:17:01 -0700 Subject: [PATCH 13/15] Fixes to imports; add back requiredChildDistribution (lost when merging) --- .../src/main/scala/org/apache/spark/sql/execution/joins.scala | 3 +++ .../scala/org/apache/spark/sql/parquet/ParquetRelation.scala | 4 ---- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index ef068245d0b74..f776572515c9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -170,6 +170,9 @@ case class LeftSemiJoinHash( val buildSide = BuildRight + override def requiredChildDistribution = + ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + override def output = left.output def execute() = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 716facabc3d33..9c4771d1a9846 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -27,14 +27,10 @@ import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.metadata.CompressionCodecName import parquet.schema.MessageType -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} -// Implicits -import scala.collection.JavaConversions._ - /** * Relation that consists of data stored in a Parquet columnar format. * From af080d7c7d446b31b19b0ff1c2947ffdce13ac86 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 25 Jun 2014 11:17:48 -0700 Subject: [PATCH 14/15] Fix in joinIterators()'s next(). --- .../main/scala/org/apache/spark/sql/execution/joins.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index f776572515c9b..32c5f26fe8aa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -98,7 +98,10 @@ trait HashJoin { (streamIter.hasNext && fetchNext()) override final def next() = { - val ret = joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) + val ret = buildSide match { + case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) + case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) + } currentMatchPosition += 1 ret } From d0f4991412ecb030d9ccb01044b0be2259a8d2e5 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 25 Jun 2014 15:13:53 -0700 Subject: [PATCH 15/15] Fix bug in broadcast hash join & add test to cover it. --- .../spark/sql/execution/SparkStrategies.scala | 6 +-- .../org/apache/spark/sql/JoinSuite.scala | 43 +++++++++++-------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4b40a11708fc9..3cd29967d1cd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -58,7 +58,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition: Option[Expression], side: BuildSide) = { val broadcastHashJoin = execution.BroadcastHashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sqlContext) + leftKeys, rightKeys, side, planLater(left), planLater(right))(sqlContext) condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil } @@ -73,7 +73,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { left, right @ PhysicalOperation(_, _, b: BaseRelation)) if broadcastTables.contains(b.tableName) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case HashFilteredJoin( Inner, @@ -83,7 +83,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { left @ PhysicalOperation(_, _, b: BaseRelation), right) if broadcastTables.contains(b.tableName) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index b150bd660876b..3d7d5eedbe8ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -37,24 +37,31 @@ class JoinSuite extends QueryTest { } test("plans broadcast hash join, given hints") { - TestSQLContext.set("spark.sql.join.broadcastTables", "testData2") - val rdd = sql("""SELECT * FROM testData t JOIN testData2 t2 ON t.key = t2.a""") - // Using `sparkPlan` because for relevant patterns in HashJoin to be - // matched, other strategies need to be applied. - val physical = rdd.queryExecution.sparkPlan - val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == BuildRight => j } - - assert(bhj.size === 1) - checkAnswer( - rdd, - Seq( - (1, "1", 1, 1), - (1, "1", 1, 2), - (2, "2", 2, 1), - (2, "2", 2, 2), - (3, "3", 3, 1), - (3, "3", 3, 2) - )) + + def mkTest(buildSide: BuildSide, leftTable: String, rightTable: String) = { + TestSQLContext.set("spark.sql.join.broadcastTables", + s"${if (buildSide == BuildRight) rightTable else leftTable}") + val rdd = sql(s"""SELECT * FROM $leftTable JOIN $rightTable ON key = a""") + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + val physical = rdd.queryExecution.sparkPlan + val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == buildSide => j } + + assert(bhj.size === 1, "planner does not pick up hint to generate broadcast hash join") + checkAnswer( + rdd, + Seq( + (1, "1", 1, 1), + (1, "1", 1, 2), + (2, "2", 2, 1), + (2, "2", 2, 2), + (3, "3", 3, 1), + (3, "3", 3, 2) + )) + } + + mkTest(BuildRight, "testData", "testData2") + mkTest(BuildLeft, "testData", "testData2") } test("multiple-key equi-join is hash-join") {