From 5f4455ae3400302c4f3cb019419dbdada4edf5c9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 13 Jun 2016 12:00:40 +0800 Subject: [PATCH 1/4] Remove the additional Project to be consistent with SQL. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../org/apache/spark/sql/DataFrameWriter.scala | 13 ++----------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a5755616329a..c38410246d6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -480,10 +480,10 @@ class Analyzer( inputPartCols.find(_.name == name).getOrElse( throw new AnalysisException(s"Cannot find partition column $name")) } + // Assume partition columns are correctly placed at the end of the child's output i.copy( table = EliminateSubqueryAliases(table), - partition = tablePartitionNames.map(_ -> None).toMap, - child = Project(columns ++ partColumns, child)) + partition = tablePartitionNames.map(_ -> None).toMap) } case _ => i.copy(table = EliminateSubqueryAliases(table)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index afae0786b73d..8251afa95e91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -506,21 +506,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap) val overwrite = mode == SaveMode.Overwrite - // A partitioned relation's schema can be different from the input logicalPlan, since - // partition columns are all moved after data columns. We Project to adjust the ordering. - // TODO: this belongs to the analyzer. - val input = normalizedParCols.map { parCols => - val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => - parCols.contains(attr.name) - } - Project(inputDataCols ++ inputPartCols, df.logicalPlan) - }.getOrElse(df.logicalPlan) - + // Assume partition columns are correctly placed at the end of the df.logicalPlan's output. df.sparkSession.sessionState.executePlan( InsertIntoTable( UnresolvedRelation(tableIdent), partitions.getOrElse(Map.empty[String, Option[String]]), - input, + df.logicalPlan, overwrite, ifNotExists = false)).toRdd } From c0500c2339f98da8c3a789093b0732e81048a64b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 13 Jun 2016 13:04:44 +0000 Subject: [PATCH 2/4] Remove test. --- .../sql/hive/InsertIntoHiveTableSuite.scala | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index fae59001b98e..dd5ba155c687 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -257,27 +257,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } - test("Detect table partitioning with correct partition order") { - withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { - sql("CREATE TABLE source (id bigint, part2 string, part1 string, data string)") - val data = (1 to 10).map(i => (i, if ((i % 2) == 0) "even" else "odd", "p", s"data-$i")) - .toDF("id", "part2", "part1", "data") - - data.write.insertInto("source") - checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) - - // the original data with part1 and part2 at the end - val expected = data.select("id", "data", "part1", "part2") - - sql( - """CREATE TABLE partitioned (id bigint, data string) - |PARTITIONED BY (part1 string, part2 string)""".stripMargin) - spark.table("source").write.insertInto("partitioned") - - checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq) - } - } - test("InsertIntoTable#resolved should include dynamic partitions") { withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") From f8c9ccf2a250f4bd07fa23dcdf4ced6faa409451 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 14 Jun 2016 08:41:07 +0000 Subject: [PATCH 3/4] Address comment. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c38410246d6c..24a0cfa10c4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -470,15 +470,11 @@ class Analyzer( // Assume partition columns are correctly placed at the end of the child's output i.copy(table = EliminateSubqueryAliases(table)) } else { - // Set up the table's partition scheme with all dynamic partitions by moving partition - // columns to the end of the column list, in partition order. - val (inputPartCols, columns) = child.output.partition { attr => - tablePartitionNames.contains(attr.name) - } // All partition columns are dynamic because this InsertIntoTable had no partitioning - val partColumns = tablePartitionNames.map { name => - inputPartCols.find(_.name == name).getOrElse( - throw new AnalysisException(s"Cannot find partition column $name")) + tablePartitionNames.filterNot { name => + child.output.exists(_.name == name) + }.map { name => + throw new AnalysisException(s"Cannot find partition column $name") } // Assume partition columns are correctly placed at the end of the child's output i.copy( From 3030144b3bbe4becaa438e6155e90b53aaeb94cf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 15 Jun 2016 02:56:05 +0000 Subject: [PATCH 4/4] Address comment and add test case. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 6 ------ .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 12 ++++++++++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 24a0cfa10c4e..928100e6774d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -470,12 +470,6 @@ class Analyzer( // Assume partition columns are correctly placed at the end of the child's output i.copy(table = EliminateSubqueryAliases(table)) } else { - // All partition columns are dynamic because this InsertIntoTable had no partitioning - tablePartitionNames.filterNot { name => - child.output.exists(_.name == name) - }.map { name => - throw new AnalysisException(s"Cannot find partition column $name") - } // Assume partition columns are correctly placed at the end of the child's output i.copy( table = EliminateSubqueryAliases(table), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index dd5ba155c687..7d669288293c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -229,6 +229,18 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } + test("Resolve dynamic partition columns") { + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + sql("CREATE TABLE partitioned (id bigint) PARTITIONED BY (data string, part string)") + val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) + .toDF("id", "data2", "part") + + // Dynamic partition columns are resolved by ordering, not name. + data.write.insertInto("partitioned") + checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq) + } + } + test("Test partition mode = strict") { withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) { sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")