From 378fbc4473ee54f82323e052c9e98fad8f578c1e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 3 Dec 2019 16:16:35 -0800 Subject: [PATCH 1/3] allow insert overwrite same table if dynamic partition overwrite. --- .../datasources/DataSourceStrategy.scala | 12 ++++-- .../InsertIntoHadoopFsRelationCommand.scala | 29 ++++++++------ .../spark/sql/sources/InsertSuite.scala | 38 +++++++++++++++++++ 3 files changed, 64 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 46444f0a05605..e6e9a96274a62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -188,15 +188,13 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } val outputPath = t.location.rootPaths.head - if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath) - val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append val partitionSchema = actualQuery.resolve( t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get } - InsertIntoHadoopFsRelationCommand( + val insertCommand = InsertIntoHadoopFsRelationCommand( outputPath, staticPartitions, i.ifPartitionNotExists, @@ -209,6 +207,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast table, Some(t.location), actualQuery.output.map(_.name)) + + // For dynamic partition overwrite, we do not delete partition directories ahead. + // We write to staging directories and move to final partition directories after writing + // job is done. So it is ok to have outputPath try to overwrite inputpath. + if (overwrite && !insertCommand.dynamicPartitionOverwriteEnabled) { + DDLUtils.verifyNotReadPath(actualQuery, outputPath) + } + insertCommand } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fbe874b3e8bc5..a1b765c4ee585 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.util.SchemaUtils @@ -60,6 +61,21 @@ case class InsertIntoHadoopFsRelationCommand( extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName + private lazy val parameters = CaseInsensitiveMap(options) + + private[sql] lazy val dynamicPartitionOverwriteEnabled: Boolean = { + val partitionOverwriteMode = parameters.get("partitionOverwriteMode") + // scalastyle:off caselocale + .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase)) + // scalastyle:on caselocale + .getOrElse(SQLConf.get.partitionOverwriteMode) + val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + // This config only makes sense when we are overwriting a partitioned dataset with dynamic + // partition columns. + enableDynamicOverwrite && mode == SaveMode.Overwrite && + staticPartitions.size < partitionColumns.length + } + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkColumnNameDuplication( @@ -90,18 +106,7 @@ case class InsertIntoHadoopFsRelationCommand( fs, catalogTable.get, qualifiedOutputPath, matchingPartitions) } - val parameters = CaseInsensitiveMap(options) - - val partitionOverwriteMode = parameters.get("partitionOverwriteMode") - // scalastyle:off caselocale - .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase)) - // scalastyle:on caselocale - .getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode) - val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC - // This config only makes sense when we are overwriting a partitioned dataset with dynamic - // partition columns. - val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite && - staticPartitions.size < partitionColumns.length + val dynamicPartitionOverwrite = dynamicPartitionOverwriteEnabled val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index fbde38322fca8..469c5afe81ee0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -270,6 +270,44 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { "INSERT OVERWRITE to a table while querying it should not be allowed.") } + test("it is allowed to write to a table while querying it for dynamic partition overwrite.") { + Seq(PartitionOverwriteMode.DYNAMIC.toString, + PartitionOverwriteMode.STATIC.toString).foreach { usedMode => + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> usedMode) { + withTable("insertTable") { + sql( + """ + |CREATE TABLE insertTable(i int, part1 int, part2 int) USING PARQUET + |PARTITIONED BY (part1, part2) + """.stripMargin) + + sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2=1) SELECT 1") + checkAnswer(spark.table("insertTable"), Row(1, 1, 1)) + + if (usedMode == PartitionOverwriteMode.DYNAMIC.toString) { + sql( + """ + |INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2) + |SELECT i + 1, part2 FROM insertTable + """.stripMargin) + checkAnswer(spark.table("insertTable"), Row(2, 1, 1)) + } else { + val message = intercept[AnalysisException] { + sql( + """ + |INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2) + |SELECT i + 1, part2 FROM insertTable + """.stripMargin) + }.getMessage + assert( + message.contains("Cannot overwrite a path that is also being read from."), + "INSERT OVERWRITE to a table while querying it should not be allowed.") + } + } + } + } + } + test("Caching") { // write something to the jsonTable sql( From b1fbb415979b407b27c866c597e3114c81852bb4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 5 Dec 2019 17:22:41 -0800 Subject: [PATCH 2/3] Address comments. --- .../spark/sql/sources/InsertSuite.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 469c5afe81ee0..dde6831dedbc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -272,32 +272,42 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { test("it is allowed to write to a table while querying it for dynamic partition overwrite.") { Seq(PartitionOverwriteMode.DYNAMIC.toString, - PartitionOverwriteMode.STATIC.toString).foreach { usedMode => - withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> usedMode) { + PartitionOverwriteMode.STATIC.toString).foreach { mode => + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> mode) { withTable("insertTable") { sql( """ |CREATE TABLE insertTable(i int, part1 int, part2 int) USING PARQUET |PARTITIONED BY (part1, part2) - """.stripMargin) + """.stripMargin) sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2=1) SELECT 1") checkAnswer(spark.table("insertTable"), Row(1, 1, 1)) + sql("INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2=2) SELECT 2") + checkAnswer(spark.table("insertTable"), Row(1, 1, 1) :: Row(2, 1, 2) :: Nil) - if (usedMode == PartitionOverwriteMode.DYNAMIC.toString) { + if (mode == PartitionOverwriteMode.DYNAMIC.toString) { sql( """ |INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2) |SELECT i + 1, part2 FROM insertTable """.stripMargin) - checkAnswer(spark.table("insertTable"), Row(2, 1, 1)) + checkAnswer(spark.table("insertTable"), Row(2, 1, 1) :: Row(3, 1, 2) :: Nil) + + sql( + """ + |INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2) + |SELECT i + 1, part2 + 1 FROM insertTable + """.stripMargin) + checkAnswer(spark.table("insertTable"), + Row(2, 1, 1) :: Row(3, 1, 2) :: Row(4, 1, 3) :: Nil) } else { val message = intercept[AnalysisException] { sql( """ |INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2) |SELECT i + 1, part2 FROM insertTable - """.stripMargin) + """.stripMargin) }.getMessage assert( message.contains("Cannot overwrite a path that is also being read from."), From 42606bca78a15c30dd439ccdfee52f85c9c97cf5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 6 Dec 2019 08:25:54 -0800 Subject: [PATCH 3/3] Address comments. --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 4 +--- .../test/scala/org/apache/spark/sql/sources/InsertSuite.scala | 3 ++- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e6e9a96274a62..8ddeb5edf9431 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -211,7 +211,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast // For dynamic partition overwrite, we do not delete partition directories ahead. // We write to staging directories and move to final partition directories after writing // job is done. So it is ok to have outputPath try to overwrite inputpath. - if (overwrite && !insertCommand.dynamicPartitionOverwriteEnabled) { + if (overwrite && !insertCommand.dynamicPartitionOverwrite) { DDLUtils.verifyNotReadPath(actualQuery, outputPath) } insertCommand diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index a1b765c4ee585..f11972115e09f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -63,7 +63,7 @@ case class InsertIntoHadoopFsRelationCommand( private lazy val parameters = CaseInsensitiveMap(options) - private[sql] lazy val dynamicPartitionOverwriteEnabled: Boolean = { + private[sql] lazy val dynamicPartitionOverwrite: Boolean = { val partitionOverwriteMode = parameters.get("partitionOverwriteMode") // scalastyle:off caselocale .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase)) @@ -106,8 +106,6 @@ case class InsertIntoHadoopFsRelationCommand( fs, catalogTable.get, qualifiedOutputPath, matchingPartitions) } - val dynamicPartitionOverwrite = dynamicPartitionOverwriteEnabled - val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index dde6831dedbc6..871cb1ff151ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -270,7 +270,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { "INSERT OVERWRITE to a table while querying it should not be allowed.") } - test("it is allowed to write to a table while querying it for dynamic partition overwrite.") { + test("SPARK-30112: it is allowed to write to a table while querying it for " + + "dynamic partition overwrite.") { Seq(PartitionOverwriteMode.DYNAMIC.toString, PartitionOverwriteMode.STATIC.toString).foreach { mode => withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> mode) {