From 74156663514001d4ab7d6aca6a356284cc2bc019 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 17 Oct 2016 15:00:03 -0700 Subject: [PATCH 1/2] Mon Oct 17 15:00:03 PDT 2016 --- .../apache/spark/sql/catalog/Catalog.scala | 3 ++- .../spark/sql/execution/CacheManager.scala | 5 +++-- .../datasources/TableFileCatalog.scala | 14 +++++++++++-- .../sql/hive/HiveMetadataCacheSuite.scala | 21 +++++++++++++++++-- 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 889b8a02784d6..aecdda1c36498 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -343,7 +343,8 @@ abstract class Catalog { /** * Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that - * contains the given data source path. + * contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate + * everything that is cached. * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 92fd366e101fd..fb72c679e3628 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -185,9 +185,10 @@ class CacheManager extends Logging { plan match { case lr: LogicalRelation => lr.relation match { case hr: HadoopFsRelation => + val prefixToInvalidate = qualifiedPath.toString val invalidate = hr.location.rootPaths - .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory)) - .contains(qualifiedPath) + .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString) + .exists(_.startsWith(prefixToInvalidate)) if (invalidate) hr.location.refresh() invalidate case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala index a5c41b244589b..022f8d1d916cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala @@ -49,13 +49,18 @@ class TableFileCatalog( private val baseLocation = catalogTable.storage.locationUri + // Populated on-demand by calls to cachedAllPartitions + private var allPartitions: ListingFileCatalog = null + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq override def listFiles(filters: Seq[Expression]): Seq[Partition] = { filterPartitions(filters).listFiles(Nil) } - override def refresh(): Unit = {} + override def refresh(): Unit = synchronized { + allPartitions = null + } /** * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions @@ -90,7 +95,12 @@ class TableFileCatalog( } // Not used in the hot path of queries when metastore partition pruning is enabled - lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil) + def cachedAllPartitions: ListingFileCatalog = synchronized { + if (allPartitions == null) { + allPartitions = filterPartitions0(Nil) + } + allPartitions + } override def inputFiles: Array[String] = cachedAllPartitions.inputFiles } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 7af81a3a90504..2ca1cd4c07fdb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -80,9 +80,13 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi val df = spark.sql("select * from test") assert(sql("select * from test").count() == 5) + def deleteRandomFile(): Unit = { + val p = new Path(spark.table("test").inputFiles.head) + assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true)) + } + // Delete a file, then assert that we tried to read it. This means the table was cached. - val p = new Path(spark.table("test").inputFiles.head) - assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true)) + deleteRandomFile() val e = intercept[SparkException] { sql("select * from test").count() } @@ -91,6 +95,19 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi // Test refreshing the cache. spark.catalog.refreshTable("test") assert(sql("select * from test").count() == 4) + assert(spark.table("test").inputFiles.length == 4) + + // Test refresh by path separately since it goes through different code paths than + // refreshTable does. + deleteRandomFile() + spark.catalog.cacheTable("test") + spark.catalog.refreshByPath("/some-invalid-path") // no-op + val e2 = intercept[SparkException] { + sql("select * from test").count() + } + assert(e2.getMessage.contains("FileNotFoundException")) + spark.catalog.refreshByPath(dir.getAbsolutePath) + assert(sql("select * from test").count() == 3) } } } From 5088ae0b93a8480b0788abd16d531bb5d08c4c55 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 18 Oct 2016 11:27:59 -0700 Subject: [PATCH 2/2] comments --- .../execution/datasources/TableFileCatalog.scala | 16 ++++++++-------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala index 022f8d1d916cf..127466a59509c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala @@ -50,7 +50,7 @@ class TableFileCatalog( private val baseLocation = catalogTable.storage.locationUri // Populated on-demand by calls to cachedAllPartitions - private var allPartitions: ListingFileCatalog = null + private var cachedAllPartitions: ListingFileCatalog = null override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq @@ -59,7 +59,7 @@ class TableFileCatalog( } override def refresh(): Unit = synchronized { - allPartitions = null + cachedAllPartitions = null } /** @@ -70,7 +70,7 @@ class TableFileCatalog( */ def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { if (filters.isEmpty) { - cachedAllPartitions + allPartitions } else { filterPartitions0(filters) } @@ -95,14 +95,14 @@ class TableFileCatalog( } // Not used in the hot path of queries when metastore partition pruning is enabled - def cachedAllPartitions: ListingFileCatalog = synchronized { - if (allPartitions == null) { - allPartitions = filterPartitions0(Nil) + def allPartitions: ListingFileCatalog = synchronized { + if (cachedAllPartitions == null) { + cachedAllPartitions = filterPartitions0(Nil) } - allPartitions + cachedAllPartitions } - override def inputFiles: Array[String] = cachedAllPartitions.inputFiles + override def inputFiles: Array[String] = allPartitions.inputFiles } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 4a2aaa7d4f6ca..a001504201267 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (lazyPruningEnabled) { catalog } else { - catalog.cachedAllPartitions + catalog.allPartitions } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet