Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val UNCACHE = Keyword("UNCACHE")
protected val UNION = Keyword("UNION")
protected val WHERE = Keyword("WHERE")
protected val INTERSECT = Keyword("INTERSECT")
protected val EXCEPT = Keyword("EXCEPT")


Expand All @@ -140,6 +141,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val query: Parser[LogicalPlan] = (
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,8 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
case object NoRelation extends LeafNode {
override def output = Nil
}

case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
override def output = left.output
override def references = Set.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
case logical.Except(left,right) =>
execution.Except(planLater(left),planLater(right)) :: Nil
case logical.Intersect(left, right) =>
execution.Intersect(planLater(left), planLater(right)) :: Nil
case logical.Generate(generator, join, outer, _, child) =>
execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
case logical.NoRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,16 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
}
}

/**
* :: DeveloperApi ::
* Returns the rows in left that also appear in right using the built in spark
* intersection function.
*/
@DeveloperApi
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output = children.head.output

override def execute() = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,17 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil)
}

test("INTERSECT") {
checkAnswer(
sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"),
(1, "a") ::
(2, "b") ::
(3, "c") ::
(4, "d") :: Nil)
checkAnswer(
sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData"), Nil)
}

test("SET commands semantics using sql()") {
TestSQLContext.settings.synchronized {
clear()
Expand Down