From deb63bd9f3586b7f9c2e203f8486d6a7eb49bc72 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Mar 2017 14:57:25 +0800 Subject: [PATCH 1/3] [SPARK-19832][SQL]DynamicPartitionWriteTask get partitionPath should escape the partition name --- .../datasources/FileFormatWriter.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 28 +++++++++++++-- .../sql/hive/execution/HiveDDLSuite.scala | 35 ++++++++++++++++++- 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index c17796811cdfd..71e7ba6c62cc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -337,7 +337,7 @@ object FileFormatWriter extends Logging { Seq(Cast(c, StringType, Option(DateTimeUtils.defaultTimeZone().getID))), Seq(StringType)) val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped) - val partitionName = Literal(c.name + "=") :: str :: Nil + val partitionName = Literal(ExternalCatalogUtils.escapePathName(c.name) + "=") :: str :: Nil if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 8b8cd0fdf4db2..72ce729268b90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -26,9 +26,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -1996,4 +1994,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + Seq("a b", "a:b", "a%b", "a,b").foreach { specialCharInLoc => + test(s"partition name of datasource table contains $specialCharInLoc") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, `$specialCharInLoc` string) + |USING parquet + |PARTITIONED BY(`$specialCharInLoc`) + |LOCATION '$dir' + """.stripMargin) + + assert(dir.listFiles().isEmpty) + spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialCharInLoc`=2) SELECT 1") + val partEscaped = s"${ExternalCatalogUtils.escapePathName(specialCharInLoc)}=2" + spark.sql("show partitions t").show(false) + val partFile = new File(dir, partEscaped) + assert(partFile.listFiles().length >= 1) + checkAnswer(spark.table("t"), Row("1", "2") :: Nil) + } + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 81ae5b7bdb672..ab231a358aedd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveExternalCatalog @@ -1686,4 +1686,37 @@ class HiveDDLSuite } } } + + Seq("parquet", "hive").foreach { datasource => + Seq("a b", "a:b", "a%b", "a,b").foreach { specialCharInLoc => + test(s"partition name of $datasource table contains $specialCharInLoc") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, `$specialCharInLoc` string) + |USING $datasource + |PARTITIONED BY(`$specialCharInLoc`) + |LOCATION '$dir' + """.stripMargin) + + assert(dir.listFiles().isEmpty) + spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialCharInLoc`=2) SELECT 1") + val partEscaped = s"${ExternalCatalogUtils.escapePathName(specialCharInLoc)}=2" + val partFile = new File(dir, partEscaped) + assert(partFile.listFiles().length >= 1) + checkAnswer(spark.table("t"), Row("1", "2") :: Nil) + + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialCharInLoc`) SELECT 3, 4") + val partEscaped1 = s"${ExternalCatalogUtils.escapePathName(specialCharInLoc)}=4" + val partFile1 = new File(dir, partEscaped1) + assert(partFile1.listFiles().length >= 1) + checkAnswer(spark.table("t"), Row("1", "2") :: Row("3", "4") :: Nil) + } + } + } + } + } + } } From 9800a7d23857ccf7f8c35f99225cece54f3d385a Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Mar 2017 15:01:02 +0800 Subject: [PATCH 2/3] remove a line --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 72ce729268b90..e0da7f5957235 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2010,7 +2010,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(dir.listFiles().isEmpty) spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialCharInLoc`=2) SELECT 1") val partEscaped = s"${ExternalCatalogUtils.escapePathName(specialCharInLoc)}=2" - spark.sql("show partitions t").show(false) val partFile = new File(dir, partEscaped) assert(partFile.listFiles().length >= 1) checkAnswer(spark.table("t"), Row("1", "2") :: Nil) From f8f2be8975a5d6a0f05659ed3046ea1f4a50ed6c Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Mar 2017 16:58:48 +0800 Subject: [PATCH 3/3] fix some test name --- .../spark/sql/execution/command/DDLSuite.scala | 12 ++++++------ .../spark/sql/hive/execution/HiveDDLSuite.scala | 16 ++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e0da7f5957235..4b1d1fe3838b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1995,21 +1995,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - Seq("a b", "a:b", "a%b", "a,b").foreach { specialCharInLoc => - test(s"partition name of datasource table contains $specialCharInLoc") { + Seq("a b", "a:b", "a%b", "a,b").foreach { specialChars => + test(s"data source table:partition column name containing $specialChars") { withTable("t") { withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a string, `$specialCharInLoc` string) + |CREATE TABLE t(a string, `$specialChars` string) |USING parquet - |PARTITIONED BY(`$specialCharInLoc`) + |PARTITIONED BY(`$specialChars`) |LOCATION '$dir' """.stripMargin) assert(dir.listFiles().isEmpty) - spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialCharInLoc`=2) SELECT 1") - val partEscaped = s"${ExternalCatalogUtils.escapePathName(specialCharInLoc)}=2" + spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialChars`=2) SELECT 1") + val partEscaped = s"${ExternalCatalogUtils.escapePathName(specialChars)}=2" val partFile = new File(dir, partEscaped) assert(partFile.listFiles().length >= 1) checkAnswer(spark.table("t"), Row("1", "2") :: Nil) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ab231a358aedd..05d878d19d44f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1688,28 +1688,28 @@ class HiveDDLSuite } Seq("parquet", "hive").foreach { datasource => - Seq("a b", "a:b", "a%b", "a,b").foreach { specialCharInLoc => - test(s"partition name of $datasource table contains $specialCharInLoc") { + Seq("a b", "a:b", "a%b", "a,b").foreach { specialChars => + test(s"partition column name of $datasource table containing $specialChars") { withTable("t") { withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a string, `$specialCharInLoc` string) + |CREATE TABLE t(a string, `$specialChars` string) |USING $datasource - |PARTITIONED BY(`$specialCharInLoc`) + |PARTITIONED BY(`$specialChars`) |LOCATION '$dir' """.stripMargin) assert(dir.listFiles().isEmpty) - spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialCharInLoc`=2) SELECT 1") - val partEscaped = s"${ExternalCatalogUtils.escapePathName(specialCharInLoc)}=2" + spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialChars`=2) SELECT 1") + val partEscaped = s"${ExternalCatalogUtils.escapePathName(specialChars)}=2" val partFile = new File(dir, partEscaped) assert(partFile.listFiles().length >= 1) checkAnswer(spark.table("t"), Row("1", "2") :: Nil) withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialCharInLoc`) SELECT 3, 4") - val partEscaped1 = s"${ExternalCatalogUtils.escapePathName(specialCharInLoc)}=4" + spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialChars`) SELECT 3, 4") + val partEscaped1 = s"${ExternalCatalogUtils.escapePathName(specialChars)}=4" val partFile1 = new File(dir, partEscaped1) assert(partFile1.listFiles().length >= 1) checkAnswer(spark.table("t"), Row("1", "2") :: Row("3", "4") :: Nil)