From 355496ec02d706d3c68c6a57c51b2ca83532532f Mon Sep 17 00:00:00 2001 From: Yizhong Zhang Date: Thu, 7 Nov 2019 11:49:51 +0800 Subject: [PATCH 01/11] Fix drop partition MetaException:File does not exist. --- .../sql/hive/client/HiveClientImpl.scala | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 12c9a972c1aff..1a2b8f67fa4ee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -28,7 +28,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.hive.common.{FileUtils, StatsSetupConst} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType} @@ -616,6 +616,18 @@ private[hive] class HiveClientImpl( shim.createPartitions(client, db, table, parts, ignoreIfExists) } + @throws[Exception] + private def isEmptyPath(dirPath: Path): Boolean = { + val inpFs = dirPath.getFileSystem(conf) + if (inpFs.exists(dirPath)) { + val fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER) + if (fStats.nonEmpty) { + return false + } + } + true + } + override def dropPartitions( db: String, table: String, @@ -633,6 +645,15 @@ private[hive] class HiveClientImpl( // whose specs are supersets of this partial spec. E.g. If a table has partitions // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. val parts = client.getPartitions(hiveTable, s.asJava).asScala + parts.foreach { partition => + val partPath = partition.getDataLocation + if (isEmptyPath(partPath)) { + val fs = partPath.getFileSystem(conf) + fs.mkdirs(partPath) + fs.deleteOnExit(partPath) + } + partition + } if (parts.isEmpty && !ignoreIfNotExists) { throw new AnalysisException( s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " + From 74d5984b04bb648bc657a75f8757689ffb107eb7 Mon Sep 17 00:00:00 2001 From: Yizhong Zhang Date: Thu, 14 Nov 2019 10:54:45 +0800 Subject: [PATCH 02/11] Add notes. --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 1a2b8f67fa4ee..86f31f7cfe150 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -645,6 +645,8 @@ private[hive] class HiveClientImpl( // whose specs are supersets of this partial spec. E.g. If a table has partitions // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. val parts = client.getPartitions(hiveTable, s.asJava).asScala + // Check whether the partition we are going to drop is empty. + // We make a dummy one for the empty partition. See [SPARK-29786] for more details. parts.foreach { partition => val partPath = partition.getDataLocation if (isEmptyPath(partPath)) { From 84b1da895fb56df0a189813e11eb1d8157b77cea Mon Sep 17 00:00:00 2001 From: heguozi Date: Wed, 18 Mar 2020 19:36:14 +0800 Subject: [PATCH 03/11] 1584531350 --- .../sql/hive/client/HiveClientImpl.scala | 10 ++++---- .../spark/sql/hive/client/VersionsSuite.scala | 23 ++++++++++++++++++- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 86f31f7cfe150..9bd84a144dbd3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -645,6 +645,11 @@ private[hive] class HiveClientImpl( // whose specs are supersets of this partial spec. E.g. If a table has partitions // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. val parts = client.getPartitions(hiveTable, s.asJava).asScala + if (parts.isEmpty && !ignoreIfNotExists) { + throw new AnalysisException( + s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " + + s"database '$db'") + } // Check whether the partition we are going to drop is empty. // We make a dummy one for the empty partition. See [SPARK-29786] for more details. parts.foreach { partition => @@ -656,11 +661,6 @@ private[hive] class HiveClientImpl( } partition } - if (parts.isEmpty && !ignoreIfNotExists) { - throw new AnalysisException( - s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " + - s"database '$db'") - } parts.map(_.getValues) }.distinct var droppedParts = ArrayBuffer.empty[java.util.List[String]] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index e3797041883ac..13be9ed9c816c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -114,11 +114,12 @@ class VersionsSuite extends SparkFunSuite with Logging { private var versionSpark: TestHiveVersion = null + private val hadoopConf = new Configuration() + versions.foreach { version => test(s"$version: create client") { client = null System.gc() // Hack to avoid SEGV on some JVM versions. - val hadoopConf = new Configuration() hadoopConf.set("test", "success") // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 @@ -531,6 +532,26 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.getPartitionOption("default", "src_part", spec).isEmpty) } + test(s"$version: dropPartitions when file not exists") { + val partitions = (1 to testPartitionCount).map { key2 => + CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat) + } + client.createPartitions("default", "src_part", partitions, ignoreIfExists = true) + val spec = Map("key1" -> "1", "key2" -> "2") + val hiveTable = client.getTable("default", "src_part") + val parts = client.getPartitions(hiveTable, Some(spec)) + parts.foreach { partition => + val partPath = new Path(partition.location) + val fs = partPath.getFileSystem(hadoopConf) + if (fs.exists(partPath)) { + fs.delete(partPath, true) + } + } + client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, + purge = true, retainData = false) + assert(client.getPartitionOption("default", "src_part", spec).isEmpty) + } + /////////////////////////////////////////////////////////////////////////// // Function related API /////////////////////////////////////////////////////////////////////////// From 248b6cf6c526f58e928239f1b7f3c7b66c5cb4de Mon Sep 17 00:00:00 2001 From: heguozi Date: Fri, 20 Mar 2020 10:13:45 +0800 Subject: [PATCH 04/11] UT failed #119927, getDataLocation->getPartitionPath --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 9bd84a144dbd3..7b313a25800fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -648,12 +648,12 @@ private[hive] class HiveClientImpl( if (parts.isEmpty && !ignoreIfNotExists) { throw new AnalysisException( s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " + - s"database '$db'") + s"database '$db'") } // Check whether the partition we are going to drop is empty. // We make a dummy one for the empty partition. See [SPARK-29786] for more details. parts.foreach { partition => - val partPath = partition.getDataLocation + val partPath = partition.getPartitionPath if (isEmptyPath(partPath)) { val fs = partPath.getFileSystem(conf) fs.mkdirs(partPath) From c20c390acd00f2f3adc219ba373fc5f619c2299a Mon Sep 17 00:00:00 2001 From: heguozi Date: Fri, 20 Mar 2020 14:51:22 +0800 Subject: [PATCH 05/11] UT failed #120076, add HIDDEN_FILES_PATH_FILTER --- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7b313a25800fc..0d414893b5657 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.hive.common.{FileUtils, StatsSetupConst} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -616,11 +616,18 @@ private[hive] class HiveClientImpl( shim.createPartitions(client, db, table, parts, ignoreIfExists) } + val HIDDEN_FILES_PATH_FILTER: PathFilter = new PathFilter() { + override def accept(p: Path): Boolean = { + val name = p.getName + !name.startsWith("_") && !name.startsWith(".") + } + } + @throws[Exception] private def isEmptyPath(dirPath: Path): Boolean = { val inpFs = dirPath.getFileSystem(conf) if (inpFs.exists(dirPath)) { - val fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER) + val fStats = inpFs.listStatus(dirPath, HIDDEN_FILES_PATH_FILTER) if (fStats.nonEmpty) { return false } From 9d75854782b2cc4f2e61c57263148a63229f5c41 Mon Sep 17 00:00:00 2001 From: heguozi Date: Fri, 20 Mar 2020 16:53:05 +0800 Subject: [PATCH 06/11] optimize --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0d414893b5657..0a8f2f8c40271 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -616,11 +616,9 @@ private[hive] class HiveClientImpl( shim.createPartitions(client, db, table, parts, ignoreIfExists) } - val HIDDEN_FILES_PATH_FILTER: PathFilter = new PathFilter() { - override def accept(p: Path): Boolean = { + val HIDDEN_FILES_PATH_FILTER: PathFilter = (p: Path) => { val name = p.getName !name.startsWith("_") && !name.startsWith(".") - } } @throws[Exception] From 5a5ff7432d28c64b25c28ea74ba2d30d80ea6c2c Mon Sep 17 00:00:00 2001 From: heguozi Date: Fri, 20 Mar 2020 19:15:44 +0800 Subject: [PATCH 07/11] UT failed #120087. Hive-0.13 doesn't have `Partition.getPartitionPath()`, using Partition.getPath() instead. --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0a8f2f8c40271..e1714a987a502 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -658,7 +658,7 @@ private[hive] class HiveClientImpl( // Check whether the partition we are going to drop is empty. // We make a dummy one for the empty partition. See [SPARK-29786] for more details. parts.foreach { partition => - val partPath = partition.getPartitionPath + val partPath = partition.getPath.head if (isEmptyPath(partPath)) { val fs = partPath.getFileSystem(conf) fs.mkdirs(partPath) From f4b3793afebee82a89c6ffa9f13c0187192496a7 Mon Sep 17 00:00:00 2001 From: heguozi Date: Sat, 21 Mar 2020 01:11:25 +0800 Subject: [PATCH 08/11] UT failed #120093, purge and ignoreIfNotExists should be false. --- .../org/apache/spark/sql/hive/client/VersionsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 13be9ed9c816c..b7936cbb9c4db 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -547,8 +547,8 @@ class VersionsSuite extends SparkFunSuite with Logging { fs.delete(partPath, true) } } - client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = true, retainData = false) + client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = false, + purge = false, retainData = false) assert(client.getPartitionOption("default", "src_part", spec).isEmpty) } From 4d1c35e23cc206c84665827da40129d64b862ada Mon Sep 17 00:00:00 2001 From: heguozi Date: Fri, 27 Mar 2020 15:07:35 +0800 Subject: [PATCH 09/11] isEmptyPath->isExistPath --- .../spark/sql/hive/client/HiveClientImpl.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index e1714a987a502..7d08a956d73f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -622,15 +622,9 @@ private[hive] class HiveClientImpl( } @throws[Exception] - private def isEmptyPath(dirPath: Path): Boolean = { + private def isExistPath(dirPath: Path): Boolean = { val inpFs = dirPath.getFileSystem(conf) - if (inpFs.exists(dirPath)) { - val fStats = inpFs.listStatus(dirPath, HIDDEN_FILES_PATH_FILTER) - if (fStats.nonEmpty) { - return false - } - } - true + inpFs.exists(dirPath) } override def dropPartitions( @@ -659,7 +653,7 @@ private[hive] class HiveClientImpl( // We make a dummy one for the empty partition. See [SPARK-29786] for more details. parts.foreach { partition => val partPath = partition.getPath.head - if (isEmptyPath(partPath)) { + if (isExistPath(partPath)) { val fs = partPath.getFileSystem(conf) fs.mkdirs(partPath) fs.deleteOnExit(partPath) From 7cf7c6b991ba6a997476384ec703656e28505fbf Mon Sep 17 00:00:00 2001 From: heguozi Date: Fri, 27 Mar 2020 15:30:00 +0800 Subject: [PATCH 10/11] remove redundant code --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7d08a956d73f7..d2ca53a855ead 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -616,11 +616,6 @@ private[hive] class HiveClientImpl( shim.createPartitions(client, db, table, parts, ignoreIfExists) } - val HIDDEN_FILES_PATH_FILTER: PathFilter = (p: Path) => { - val name = p.getName - !name.startsWith("_") && !name.startsWith(".") - } - @throws[Exception] private def isExistPath(dirPath: Path): Boolean = { val inpFs = dirPath.getFileSystem(conf) From f8b96de283f692781568903f0c27b1532808ecc4 Mon Sep 17 00:00:00 2001 From: heguozi Date: Fri, 29 May 2020 16:29:04 +0800 Subject: [PATCH 11/11] restore '!' --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d2ca53a855ead..2a991acd43f15 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -648,7 +648,7 @@ private[hive] class HiveClientImpl( // We make a dummy one for the empty partition. See [SPARK-29786] for more details. parts.foreach { partition => val partPath = partition.getPath.head - if (isExistPath(partPath)) { + if (!isExistPath(partPath)) { val fs = partPath.getFileSystem(conf) fs.mkdirs(partPath) fs.deleteOnExit(partPath)