From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 1/6] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdba..5a5b71e52dd79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da02..5e00546a74c00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 2/6] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00aa..47962ebe6ef82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c00..61d9dcd37572b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 3/6] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572b..3427152b2da02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 58b60383486eaa174ca7c579c3f0f75e1f3d58d0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 9 Feb 2016 22:44:15 -0800 Subject: [PATCH 4/6] support tablesample --- .../apache/spark/sql/hive/SQLBuilder.scala | 21 ++++++++++++++++++- .../sql/hive/LogicalPlanToSQLSuite.scala | 10 +++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index fc5725d6915ea..08372c970f842 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.util.random.RandomSampler /** * A builder class used to convert a resolved logical plan into a SQL query string. Note that this @@ -119,6 +120,18 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi limitSQL = limit.sql } yield s"$childSQL LIMIT $limitSQL" + // TABLESAMPLE is part of tableSource clause in the parser, + // and thus we must handle it with subquery. + case Sample(lb, ub, withReplacement, _, child @ Subquery(alias, grandChild)) + if !withReplacement && lb <= (ub + RandomSampler.roundingEpsilon) => + val fraction = math.min(100, math.max(0, (ub - lb) * 100)) + val aliasName = if (grandChild.isInstanceOf[Subquery]) alias else "" + val plan = if (grandChild.isInstanceOf[Subquery]) grandChild else child + + toSQL(plan).map { planSQL => + s"$planSQL TABLESAMPLE($fraction PERCENT) $aliasName" + } + case Filter(condition, child) => for { childSQL <- toSQL(child) @@ -213,7 +226,13 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi wrapChildWithSubquery(plan) case plan @ Project(_, - _: Subquery | _: Filter | _: Join | _: MetastoreRelation | OneRowRelation | _: Limit + _: Subquery + | _: Filter + | _: Join + | _: MetastoreRelation + | OneRowRelation + | _: Limit + | _: Sample ) => plan case plan: Project => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 1f731db26f387..f6fd401221983 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -145,6 +145,16 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkHiveQl("SELECT COUNT(DISTINCT id) FROM t0") } + test("TABLESAMPLE") { + checkHiveQl("SELECT * FROM t0 TABLESAMPLE(100 PERCENT) s") + checkHiveQl("SELECT * FROM t0 TABLESAMPLE(100 PERCENT)") + // When a sampling fraction is not 100%, the returned results are random. + // Thus, added an always-false filter here to check if the generated plan can be successfully + // executed. + checkHiveQl("SELECT s.id FROM t0 TABLESAMPLE(0.1 PERCENT) s WHERE 1=0") + checkHiveQl("SELECT * FROM t0 TABLESAMPLE(0.1 PERCENT) WHERE 1=0") + } + // TODO Enable this // Query plans transformed by DistinctAggregationRewriter are not recognized yet ignore("multi-distinct columns") { From 12be2c33682ff3a74f1d26e735c6ef3770df3d34 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 13 Feb 2016 00:09:09 -0800 Subject: [PATCH 5/6] address comments. --- .../apache/spark/sql/hive/SQLBuilder.scala | 21 ++++- .../sql/hive/LogicalPlanToSQLSuite.scala | 93 +++++++++++++------ 2 files changed, 80 insertions(+), 34 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 698c3e5cda907..15e301be0253e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -83,12 +83,23 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi // TABLESAMPLE is part of tableSource clause in the parser, // and thus we must handle it with subquery. - case Sample(lb, ub, withReplacement, _, child @ Subquery(alias, grandChild)) - if !withReplacement && lb <= (ub + RandomSampler.roundingEpsilon) => + case p @ Sample(lb, ub, withReplacement, _, _) + if !withReplacement && lb <= (ub + RandomSampler.roundingEpsilon) => val fraction = math.min(100, math.max(0, (ub - lb) * 100)) - val aliasName = if (grandChild.isInstanceOf[Subquery]) alias else "" - val plan = if (grandChild.isInstanceOf[Subquery]) grandChild else child - s"${toSQL(plan)} TABLESAMPLE($fraction PERCENT) $aliasName" + p.child match { + case m: MetastoreRelation => + val aliasName = m.alias.getOrElse("") + build( + s"`${m.databaseName}`.`${m.tableName}`", + "TABLESAMPLE(" + fraction + " PERCENT)", + aliasName) + case s: Subquery => + val aliasName = if (s.child.isInstanceOf[Subquery]) s.alias else "" + val plan = if (s.child.isInstanceOf[Subquery]) s.child else s + build(toSQL(plan), "TABLESAMPLE(" + fraction + " PERCENT)", aliasName) + case _ => + build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)") + } case p: Filter => val whereOrHaving = p.child match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 1d3e0cbd8fac3..59dcf87ea4aa9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -26,24 +26,32 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { import testImplicits._ protected override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS parquet_t0") + sql("DROP TABLE IF EXISTS parquet_t1") + sql("DROP TABLE IF EXISTS parquet_t2") sql("DROP TABLE IF EXISTS t0") - sql("DROP TABLE IF EXISTS t1") - sql("DROP TABLE IF EXISTS t2") - sqlContext.range(10).write.saveAsTable("t0") + + sqlContext.range(10).write.saveAsTable("parquet_t0") + sql("CREATE TABLE t0 AS SELECT * FROM parquet_t0") sqlContext .range(10) .select('id as 'key, concat(lit("val_"), 'id) as 'value) .write - .saveAsTable("t1") + .saveAsTable("parquet_t1") - sqlContext.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write.saveAsTable("t2") + sqlContext + .range(10) + .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) + .write + .saveAsTable("parquet_t2") } override protected def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS parquet_t0") + sql("DROP TABLE IF EXISTS parquet_t1") + sql("DROP TABLE IF EXISTS parquet_t2") sql("DROP TABLE IF EXISTS t0") - sql("DROP TABLE IF EXISTS t1") - sql("DROP TABLE IF EXISTS t2") } private def checkHiveQl(hiveQl: String): Unit = { @@ -82,15 +90,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { } test("in") { - checkHiveQl("SELECT id FROM t0 WHERE id IN (1, 2, 3)") + checkHiveQl("SELECT id FROM parquet_t0 WHERE id IN (1, 2, 3)") } test("aggregate function in having clause") { - checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key HAVING MAX(key) > 0") + checkHiveQl("SELECT COUNT(value) FROM parquet_t1 GROUP BY key HAVING MAX(key) > 0") } test("aggregate function in order by clause") { - checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key ORDER BY MAX(key)") + checkHiveQl("SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY MAX(key)") } // When there are multiple aggregate functions in ORDER BY clause, all of them are extracted into @@ -98,80 +106,107 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { // execution since these aliases have different expression ID. But this introduces name collision // when converting resolved plans back to SQL query strings as expression IDs are stripped. test("aggregate function in order by clause with multiple order keys") { - checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key ORDER BY key, MAX(key)") + checkHiveQl("SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY key, MAX(key)") } test("type widening in union") { - checkHiveQl("SELECT id FROM t0 UNION ALL SELECT CAST(id AS INT) AS id FROM t0") + checkHiveQl("SELECT id FROM parquet_t0 UNION ALL SELECT CAST(id AS INT) AS id FROM parquet_t0") } test("self join") { - checkHiveQl("SELECT x.key FROM t1 x JOIN t1 y ON x.key = y.key") + checkHiveQl("SELECT x.key FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key") } test("self join with group by") { - checkHiveQl("SELECT x.key, COUNT(*) FROM t1 x JOIN t1 y ON x.key = y.key group by x.key") + checkHiveQl( + "SELECT x.key, COUNT(*) FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key group by x.key") } test("three-child union") { - checkHiveQl("SELECT id FROM t0 UNION ALL SELECT id FROM t0 UNION ALL SELECT id FROM t0") + checkHiveQl( + """ + |SELECT id FROM parquet_t0 + |UNION ALL SELECT id FROM parquet_t0 + |UNION ALL SELECT id FROM parquet_t0 + """.stripMargin) } test("case") { - checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 WHEN id % 2 = 0 THEN 1 END FROM t0") + checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 WHEN id % 2 = 0 THEN 1 END FROM parquet_t0") } test("case with else") { - checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 ELSE 1 END FROM t0") + checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 ELSE 1 END FROM parquet_t0") } test("case with key") { - checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' END FROM t0") + checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' END FROM parquet_t0") } test("case with key and else") { - checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' ELSE 'baz' END FROM t0") + checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' ELSE 'baz' END FROM parquet_t0") } test("select distinct without aggregate functions") { - checkHiveQl("SELECT DISTINCT id FROM t0") + checkHiveQl("SELECT DISTINCT id FROM parquet_t0") } test("cluster by") { - checkHiveQl("SELECT id FROM t0 CLUSTER BY id") + checkHiveQl("SELECT id FROM parquet_t0 CLUSTER BY id") } test("distribute by") { - checkHiveQl("SELECT id FROM t0 DISTRIBUTE BY id") + checkHiveQl("SELECT id FROM parquet_t0 DISTRIBUTE BY id") } test("distribute by with sort by") { - checkHiveQl("SELECT id FROM t0 DISTRIBUTE BY id SORT BY id") + checkHiveQl("SELECT id FROM parquet_t0 DISTRIBUTE BY id SORT BY id") } test("distinct aggregation") { - checkHiveQl("SELECT COUNT(DISTINCT id) FROM t0") + checkHiveQl("SELECT COUNT(DISTINCT id) FROM parquet_t0") } test("TABLESAMPLE") { - checkHiveQl("SELECT * FROM t0 TABLESAMPLE(100 PERCENT) s") + // Project [id#2L] + // +- Sample 0.0, 1.0, false, ... + // +- Subquery s + // +- Subquery parquet_t0 + // +- Relation[id#2L] ParquetRelation + checkHiveQl("SELECT s.id FROM parquet_t0 TABLESAMPLE(100 PERCENT) s") + + // Project [id#2L] + // +- Sample 0.0, 1.0, false, ... + // +- Subquery parquet_t0 + // +- Relation[id#2L] ParquetRelation + checkHiveQl("SELECT * FROM parquet_t0 TABLESAMPLE(100 PERCENT)") + + // Project [id#21L] + // +- Sample 0.0, 1.0, false, ... + // +- MetastoreRelation default, t0, Some(s) + checkHiveQl("SELECT s.id FROM t0 TABLESAMPLE(100 PERCENT) s") + + // Project [id#24L] + // +- Sample 0.0, 1.0, false, ... + // +- MetastoreRelation default, t0, None checkHiveQl("SELECT * FROM t0 TABLESAMPLE(100 PERCENT)") + // When a sampling fraction is not 100%, the returned results are random. // Thus, added an always-false filter here to check if the generated plan can be successfully // executed. - checkHiveQl("SELECT s.id FROM t0 TABLESAMPLE(0.1 PERCENT) s WHERE 1=0") - checkHiveQl("SELECT * FROM t0 TABLESAMPLE(0.1 PERCENT) WHERE 1=0") + checkHiveQl("SELECT s.id FROM parquet_t0 TABLESAMPLE(0.1 PERCENT) s WHERE 1=0") + checkHiveQl("SELECT * FROM parquet_t0 TABLESAMPLE(0.1 PERCENT) WHERE 1=0") } // TODO Enable this // Query plans transformed by DistinctAggregationRewriter are not recognized yet ignore("multi-distinct columns") { - checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a") + checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM parquet_t2 GROUP BY a") } test("persisted data source relations") { Seq("orc", "json", "parquet").foreach { format => - val tableName = s"${format}_t0" + val tableName = s"${format}_parquet_t0" withTable(tableName) { sqlContext.range(10).write.format(format).saveAsTable(tableName) checkHiveQl(s"SELECT id FROM $tableName") From fc9a156b3df04df89ca83fb2d3a68faef52e1179 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 13 Feb 2016 10:56:33 -0800 Subject: [PATCH 6/6] added a flag for determining if this Sample node is from the parser. --- .../scala/org/apache/spark/sql/catalyst/CatalystQl.scala | 6 ++++-- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 5 ++++- .../spark/sql/catalyst/optimizer/FilterPushdownSuite.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../main/scala/org/apache/spark/sql/hive/SQLBuilder.scala | 7 ++----- 7 files changed, 16 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala index 8099751900a42..47bf33fcd38d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala @@ -499,12 +499,14 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C s"Sampling fraction ($fraction) must be on interval [0, 100]") Sample(0.0, fraction.toDouble / 100, withReplacement = false, (math.random * 1000).toInt, - relation) + relation)( + isTableSample = true) case Token("TOK_TABLEBUCKETSAMPLE", Token(numerator, Nil) :: Token(denominator, Nil) :: Nil) => val fraction = numerator.toDouble / denominator.toDouble - Sample(0.0, fraction, withReplacement = false, (math.random * 1000).toInt, relation) + Sample(0.0, fraction, withReplacement = false, (math.random * 1000).toInt, relation)( + isTableSample = true) case a => noParseRule("Sampling", a) }.getOrElse(relation) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 902e18081bddf..4819b229f96db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -109,7 +109,7 @@ object SamplePushDown extends Rule[LogicalPlan] { // Push down projection into sample case Project(projectList, s @ Sample(lb, up, replace, seed, child)) => Sample(lb, up, replace, seed, - Project(projectList, child)) + Project(projectList, child))() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index e8e0a78904a32..aacaed42f787d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -561,15 +561,18 @@ case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { * @param withReplacement Whether to sample with replacement. * @param seed the random seed * @param child the LogicalPlan + * @param isTableSample Is created from TABLESAMPLE in the parser. */ case class Sample( lowerBound: Double, upperBound: Double, withReplacement: Boolean, seed: Long, - child: LogicalPlan) extends UnaryNode { + child: LogicalPlan)( + val isTableSample: java.lang.Boolean = false) extends UnaryNode { override def output: Seq[Attribute] = child.output + override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index b49ca928b6292..a20600095040c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -640,14 +640,14 @@ class FilterPushdownSuite extends PlanTest { test("push project and filter down into sample") { val x = testRelation.subquery('x) val originalQuery = - Sample(0.0, 0.6, false, 11L, x).select('a) + Sample(0.0, 0.6, false, 11L, x)().select('a) val originalQueryAnalyzed = EliminateSubQueries(analysis.SimpleAnalyzer.execute(originalQuery)) val optimized = Optimize.execute(originalQueryAnalyzed) val correctAnswer = - Sample(0.0, 0.6, false, 11L, x.select('a)) + Sample(0.0, 0.6, false, 11L, x.select('a))() comparePlans(optimized, correctAnswer.analyze) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index c5b2b7d11893c..d6b8db41dc660 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1039,7 +1039,7 @@ class DataFrame private[sql]( * @since 1.3.0 */ def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame = withPlan { - Sample(0.0, fraction, withReplacement, seed, logicalPlan) + Sample(0.0, fraction, withReplacement, seed, logicalPlan)() } /** @@ -1071,7 +1071,7 @@ class DataFrame private[sql]( val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => - new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted)) + new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted)()) }.toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 378763268acc6..4519e12be78bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -564,7 +564,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def sample(withReplacement: Boolean, fraction: Double, seed: Long) : Dataset[T] = - withPlan(Sample(0.0, fraction, withReplacement, seed, _)) + withPlan(Sample(0.0, fraction, withReplacement, seed, _)()) /** * Returns a new [[Dataset]] by sampling a fraction of records, using a random seed. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 15e301be0253e..e5365b4e2b720 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -81,11 +81,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case p: Limit => s"${toSQL(p.child)} LIMIT ${p.limitExpr.sql}" - // TABLESAMPLE is part of tableSource clause in the parser, - // and thus we must handle it with subquery. - case p @ Sample(lb, ub, withReplacement, _, _) - if !withReplacement && lb <= (ub + RandomSampler.roundingEpsilon) => - val fraction = math.min(100, math.max(0, (ub - lb) * 100)) + case p: Sample if p.isTableSample => + val fraction = math.min(100, math.max(0, (p.upperBound - p.lowerBound) * 100)) p.child match { case m: MetastoreRelation => val aliasName = m.alias.getOrElse("")