From e5bff618f09b0b33968e4c12b360e3d30f2878f9 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 20 Jun 2014 15:20:12 +0800 Subject: [PATCH 01/16] Spark SQL basicOperator add Intersect operator Hi all, I want to submit a basic operator Intersect For example , in sql case select * from table1 intersect select * from table2 So ,i want use this operator support this function in Spark SQL This operator will return the the intersection of SparkPlan child table RDD . --- .../apache/spark/sql/execution/basicOperators.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8969794c69933..8df1729703a0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -204,3 +204,16 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class Intersect(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan { + // TODO: + override def output = children.head.output + + override def execute() = { + children.map(child => child.execute()).foldLeft(children(0).execute())((a, b) => a.intersection(b)) + + } From 469f099c510b20d0871c2a22927e65b48c968964 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 16:04:33 +0800 Subject: [PATCH 02/16] Update basicOperators.scala --- .../org/apache/spark/sql/execution/basicOperators.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8df1729703a0f..d999a14ffc118 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -209,11 +209,11 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { * :: DeveloperApi :: */ @DeveloperApi -case class Intersect(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan { +case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { // TODO: override def output = children.head.output override def execute() = { - children.map(child => child.execute()).foldLeft(children(0).execute())((a, b) => a.intersection(b)) - + left.execute().intersection(right.execute()) } +} From 61e88e7db2c118023fe501a7381c51c3da7f3940 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 16:08:03 +0800 Subject: [PATCH 03/16] Update SqlParser.scala --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 2ad2d04af5704..f18419216b238 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -119,6 +119,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") + protected val INTERSECT = Keyword("INTERSECT") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -139,7 +140,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | - UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } + UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } | + INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2)} ) | insert | cache ) From d4ac5e559485e6f948100a7e6875831b7a7b46a4 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 16:10:11 +0800 Subject: [PATCH 04/16] Update HiveQl.scala --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index df761b073a75a..f1d0203655c00 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -614,6 +614,8 @@ private[hive] object HiveQl { queries.reduceLeft(Union) case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right)) + + case Token("TOK_INTERSECT", left :: right :: Nil) => Intersect(nodeToPlan(left), nodeToPlan(right)) case a: ASTNode => throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") From ac73e60ef80ca78b2bc63d0ecc45f4b2a963d13c Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 16:11:45 +0800 Subject: [PATCH 05/16] Update basicOperators.scala --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 3e0639867b278..aa55a5b331ce8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -189,3 +189,10 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { case object NoRelation extends LeafNode { override def output = Nil } + +case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + // TODO: These aren't really the same attributes as nullability etc might change. + def output = left.output + + def references = Set.empty +} From 790765d915e7325a7dfdb46780c37a5e7b0bdf31 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 16:14:05 +0800 Subject: [PATCH 06/16] Update SparkStrategies.scala --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4694f25d6d630..faaed0edba55f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -236,6 +236,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sparkContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil + case logical.Intersect(left,right) => //yanjiegaonew + execution.Intersect(planLater(left),planLater(right)) :: Nil //yanjiegaonew case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => From 4dd453e2bf0d85b6cbfdfe703be403b83858818c Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 16:17:20 +0800 Subject: [PATCH 07/16] Update SQLQuerySuite.scala --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e9360b0fc7910..5a4df2651e6aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -369,6 +369,17 @@ class SQLQuerySuite extends QueryTest { (3, null))) } + test("INTERSECT") { + + checkAnswer( + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData "), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData "), Nil) + } test("SET commands semantics using sql()") { clear() val testKey = "test.key.0" @@ -406,3 +417,4 @@ class SQLQuerySuite extends QueryTest { } } + From e2b64be1a643d43748c90cfb341177f1157db15d Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 24 Jun 2014 11:14:45 +0800 Subject: [PATCH 08/16] Update basicOperators.scala --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index d999a14ffc118..f2be049e9d4ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -214,6 +214,6 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { override def output = children.head.output override def execute() = { - left.execute().intersection(right.execute()) + left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } } From f1288b46bb031fd34ed8d0217bcb4144d720d880 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 16:54:03 +0800 Subject: [PATCH 09/16] delete annotation --- .../apache/spark/sql/catalyst/plans/logical/basicOperators.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index aa55a5b331ce8..951db03232aeb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -191,7 +191,6 @@ case object NoRelation extends LeafNode { } case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - // TODO: These aren't really the same attributes as nullability etc might change. def output = left.output def references = Set.empty From 0b4983723d39488b8a2ce7f3e13f5bdb1d25ac83 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 16:56:02 +0800 Subject: [PATCH 10/16] delete the annotation --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index faaed0edba55f..987c96013da0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -236,8 +236,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sparkContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil - case logical.Intersect(left,right) => //yanjiegaonew - execution.Intersect(planLater(left),planLater(right)) :: Nil //yanjiegaonew + case logical.Intersect(left,right) => + execution.Intersect(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => From bdc4a05f46f8dfdee7442be0230901cb7d1ef864 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 16:56:28 +0800 Subject: [PATCH 11/16] Update basicOperators.scala --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index f2be049e9d4ee..8b2d33ce95c31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -210,7 +210,6 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { */ @DeveloperApi case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { - // TODO: override def output = children.head.output override def execute() = { From f7961f6b9f839d58f5c5b1caf9702cd1e688fee7 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 18:29:33 +0800 Subject: [PATCH 12/16] update the line less than --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index f18419216b238..cfab0a4344c1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -140,8 +140,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | - UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } | - INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2)} + INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2)} | + UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert | cache ) From 0c7cca5ea9c3e68758675c493570be87b38d346a Mon Sep 17 00:00:00 2001 From: YanjieGao <396154235@qq.com> Date: Fri, 4 Jul 2014 11:58:28 +0800 Subject: [PATCH 13/16] modify format problem --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 5 ++--- .../org/apache/spark/sql/execution/SparkStrategies.scala | 5 +++-- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 951db03232aeb..57ce63c5fb27c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -191,7 +191,6 @@ case object NoRelation extends LeafNode { } case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - def output = left.output - - def references = Set.empty + override def output = left.output + override def references = Set.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 301941767608f..808af587873b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -272,8 +272,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Limit(IntegerLiteral(limit), child) => execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => - execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil - case logical.Intersect(left,right) => execution.Intersect(planLater(left),planLater(right)) :: Nil + execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil + case logical.Intersect(left,right) => + execution.Intersect(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 35aa79854e402..163e2e55139b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -372,16 +372,17 @@ class SQLQuerySuite extends QueryTest { } test("INTERSECT") { - checkAnswer( sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData "), (1, "a") :: (2, "b") :: (3, "c") :: (4, "d") :: Nil) + checkAnswer( sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData "), Nil) } + test("SET commands semantics using sql()") { TestSQLContext.settings.synchronized { clear() @@ -420,4 +421,3 @@ class SQLQuerySuite extends QueryTest { } } } - From 1cfbfe6593ef939182d99481384cb1adb5990ad2 Mon Sep 17 00:00:00 2001 From: YanjieGao <396154235@qq.com> Date: Fri, 4 Jul 2014 18:43:54 +0800 Subject: [PATCH 14/16] refomat some files --- .../apache/spark/sql/catalyst/SqlParser.scala | 6 ++---- .../spark/sql/execution/SparkStrategies.scala | 6 +++--- .../spark/sql/execution/basicOperators.scala | 20 +++++++++---------- .../org/apache/spark/sql/SQLQuerySuite.scala | 18 ++++++++--------- 4 files changed, 24 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index e07ef591fe6ea..aab507b500c75 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -1,4 +1,4 @@ -/* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -121,6 +121,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val INTERSECT = Keyword("INTERSECT") protected val EXCEPT = Keyword("EXCEPT") + // Use reflection to find the reserved words defined in this class. protected val reservedWords = this.getClass @@ -140,11 +141,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | -<<<<<<< HEAD INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2)} | -======= EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | ->>>>>>> upstream/master UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert | cache diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b5167e5611354..5d45c2f08a6df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -272,11 +272,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Limit(IntegerLiteral(limit), child) => execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => - execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil - case logical.Intersect(left,right) => - execution.Intersect(planLater(left),planLater(right)) :: Nil + execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil case logical.Except(left,right) => execution.Except(planLater(left),planLater(right)) :: Nil + case logical.Intersect(left,right) => + execution.Intersect(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 31d0225405060..144d3fb6f13a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -208,28 +208,28 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { /** * :: DeveloperApi :: - *Returns the rows in left that also appear in right using the built in spark - *intersection function. + * Returns a table with the elements from left that are not in right using + * the built-in spark subtract function. */ @DeveloperApi -case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def output = children.head.output +case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def output = left.output override def execute() = { - left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) + left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) } } /** * :: DeveloperApi :: - * Returns a table with the elements from left that are not in right using - * the built-in spark subtract function. + *Returns the rows in left that also appear in right using the built in spark + *intersection function. */ @DeveloperApi -case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def output = left.output +case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def output = children.head.output override def execute() = { - left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) + left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 269adbee894ab..f7e7691414619 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -371,28 +371,28 @@ class SQLQuerySuite extends QueryTest { (3, null))) } - test("INTERSECT") { + test("EXCEPT") { checkAnswer( - sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData "), + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), (1, "a") :: (2, "b") :: (3, "c") :: (4, "d") :: Nil) checkAnswer( - sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData "), Nil) - } + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil) + checkAnswer( + sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) + } - test("EXCEPT") { + test("INTERSECT") { checkAnswer( - sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData "), (1, "a") :: (2, "b") :: (3, "c") :: (4, "d") :: Nil) checkAnswer( - sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil) - checkAnswer( - sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData "), Nil) } test("SET commands semantics using sql()") { From bdc2ac053e1edda888b46fd9f497c2827dbf0773 Mon Sep 17 00:00:00 2001 From: YanjieGao <396154235@qq.com> Date: Sun, 6 Jul 2014 08:39:59 +0800 Subject: [PATCH 15/16] reformat the code as Michael's suggestion --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 +- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index aab507b500c75..3e011a8cf32ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -141,7 +141,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | - INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2)} | + INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5d45c2f08a6df..7057e79e8c033 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -275,8 +275,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil case logical.Except(left,right) => execution.Except(planLater(left),planLater(right)) :: Nil - case logical.Intersect(left,right) => - execution.Intersect(planLater(left),planLater(right)) :: Nil + case logical.Intersect(left, right) => + execution.Intersect(planLater(left), planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 144d3fb6f13a8..e8816f0b3cd9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -222,8 +222,8 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { /** * :: DeveloperApi :: - *Returns the rows in left that also appear in right using the built in spark - *intersection function. + * Returns the rows in left that also appear in right using the built in spark + * intersection function. */ @DeveloperApi case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f7e7691414619..207bdf31b1fa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -386,13 +386,13 @@ class SQLQuerySuite extends QueryTest { test("INTERSECT") { checkAnswer( - sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData "), + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"), (1, "a") :: (2, "b") :: (3, "c") :: (4, "d") :: Nil) checkAnswer( - sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData "), Nil) + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData"), Nil) } test("SET commands semantics using sql()") { From 4629afe5f1ab3762efc77b0b2356cc6bbfe13047 Mon Sep 17 00:00:00 2001 From: YanjieGao <396154235@qq.com> Date: Sun, 6 Jul 2014 08:46:14 +0800 Subject: [PATCH 16/16] reformat the code --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 +- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 1 + .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 3e011a8cf32ce..e5653c5b14ac1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -1,4 +1,4 @@ - * +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7057e79e8c033..7080074a69c07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -272,9 +272,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Limit(IntegerLiteral(limit), child) => execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => - execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil + execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil case logical.Except(left,right) => - execution.Except(planLater(left),planLater(right)) :: Nil + execution.Except(planLater(left),planLater(right)) :: Nil case logical.Intersect(left, right) => execution.Intersect(planLater(left), planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 207bdf31b1fa4..fa1f32f8a49a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -372,6 +372,7 @@ class SQLQuerySuite extends QueryTest { } test("EXCEPT") { + checkAnswer( sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), (1, "a") :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 2b1f3744def8f..b70104dd5be5a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -622,7 +622,7 @@ private[hive] object HiveQl { // If there are multiple INSERTS just UNION them together into on query. queries.reduceLeft(Union) - case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right)) + case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right)) case a: ASTNode => throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")