From f5999b9577aaaab0d2ca27c699dfc7a1662933dd Mon Sep 17 00:00:00 2001 From: jayadevanmurali Date: Wed, 18 Jan 2017 23:55:15 +0530 Subject: [PATCH 1/7] Added test case to handle SPARK-19059 --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 563d0687d0d02..f78619cd71369 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2499,4 +2499,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test( + "SPARK-19059: Unable to retrieve data from parquet table whose name startswith underscore") { + sql("CREATE TABLE `_tbl`(i INT) USING parquet") + sql("INSERT INTO `_tbl` VALUES (1), (2), (3)") + checkAnswer( sql("SELECT * FROM `_tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil) + sql("ALTER TABLE `_tbl` RENAME TO `tbl`") + checkAnswer( sql("SELECT * FROM `tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil) + sql("DROP TABLE `tbl`") + sql("CREATE TABLE `tbl`(i INT) USING parquet") + sql("INSERT INTO `tbl` VALUES (1), (2), (3)") + checkAnswer( sql("SELECT * FROM `tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil) + sql("DROP TABLE `tbl`") + } } From 74e4a1a2740262fcf84e8c0704ddf2df57e614a0 Mon Sep 17 00:00:00 2001 From: jayadevanmurali Date: Thu, 19 Jan 2017 00:08:36 +0530 Subject: [PATCH 2/7] Updated listLeafFiles() handile SPARK-19059 --- .../PartitioningAwareFileIndex.scala | 91 +++++++++---------- 1 file changed, 45 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 82c1599a39df6..ffa72d223e4a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -385,55 +385,54 @@ object PartitioningAwareFileIndex extends Logging { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) val name = path.getName.toLowerCase - if (shouldFilterOut(name)) { - Seq.empty[FileStatus] - } else { - // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] + + // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val statuses = try fs.listStatus(path) catch { + case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } + + val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + + val allLeafStatuses = { + val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) + val nestedFiles: Seq[FileStatus] = sessionOpt match { + case Some(session) => + bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) + case _ => + dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) } + val allFiles = topLevelFiles ++ nestedFiles + if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles + } - val allLeafStatuses = { - val (dirs, topLevelFiles) = statuses.partition(_.isDirectory) - val nestedFiles: Seq[FileStatus] = sessionOpt match { - case Some(session) => - bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) - case _ => - dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) + allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { + case f: LocatedFileStatus => + f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) } - val allFiles = topLevelFiles ++ nestedFiles - if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles - } - - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { - case f: LocatedFileStatus => - f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - lfs - } + lfs } } From dec96d848948d6c5f263b1e5535345d24c1058d3 Mon Sep 17 00:00:00 2001 From: jayadevanmurali Date: Thu, 19 Jan 2017 00:22:05 +0530 Subject: [PATCH 3/7] Update PartitioningAwareFileIndex.scala --- .../execution/datasources/PartitioningAwareFileIndex.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index ffa72d223e4a1..ecb725ceba686 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -385,7 +385,7 @@ object PartitioningAwareFileIndex extends Logging { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) val name = path.getName.toLowerCase - + // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. @@ -394,7 +394,7 @@ object PartitioningAwareFileIndex extends Logging { logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } - + val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) val allLeafStatuses = { From 4334b2ba2a882a94d59459a3d73b5fdb6fda6b80 Mon Sep 17 00:00:00 2001 From: jayadevanmurali Date: Thu, 19 Jan 2017 00:28:22 +0530 Subject: [PATCH 4/7] Update PartitioningAwareFileIndex.scala --- .../execution/datasources/PartitioningAwareFileIndex.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index ecb725ceba686..fe9c6578b1e01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -385,7 +385,7 @@ object PartitioningAwareFileIndex extends Logging { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) val name = path.getName.toLowerCase - + // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. @@ -394,7 +394,7 @@ object PartitioningAwareFileIndex extends Logging { logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } - + val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) val allLeafStatuses = { From 499711d96f5f776baf482e0cbc12cd55f8c9b2c2 Mon Sep 17 00:00:00 2001 From: jayadevanmurali Date: Thu, 19 Jan 2017 09:02:16 +0530 Subject: [PATCH 5/7] corrected style error --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 041a962b4a2c0..692603f91054e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2497,6 +2497,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } finally { newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue) } + } + } test("should be able to resolve a persistent view") { withTable("t1", "t2") { @@ -2511,6 +2513,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + test( "SPARK-19059: Unable to retrieve data from parquet table whose name startswith underscore") { sql("CREATE TABLE `_tbl`(i INT) USING parquet") From 71be60f38bbc18e05b90f4f4837dcda6cde2460d Mon Sep 17 00:00:00 2001 From: jayadevanmurali Date: Thu, 19 Jan 2017 09:13:49 +0530 Subject: [PATCH 6/7] corrected style error --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 692603f91054e..8dab50286e8ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2499,7 +2499,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } - + test("should be able to resolve a persistent view") { withTable("t1", "t2") { withView("v1") { @@ -2513,7 +2513,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } - + test( "SPARK-19059: Unable to retrieve data from parquet table whose name startswith underscore") { sql("CREATE TABLE `_tbl`(i INT) USING parquet") From ea6bd7d00c4d6ef5aea158a3fc8c3bfc5a0c02e4 Mon Sep 17 00:00:00 2001 From: jayadevanmurali Date: Thu, 19 Jan 2017 11:48:20 +0530 Subject: [PATCH 7/7] incorporated code review comments --- .../org/apache/spark/sql/SQLQuerySuite.scala | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8dab50286e8ba..8f1beaa3a1685 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2514,17 +2514,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test( - "SPARK-19059: Unable to retrieve data from parquet table whose name startswith underscore") { - sql("CREATE TABLE `_tbl`(i INT) USING parquet") - sql("INSERT INTO `_tbl` VALUES (1), (2), (3)") - checkAnswer( sql("SELECT * FROM `_tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil) - sql("ALTER TABLE `_tbl` RENAME TO `tbl`") - checkAnswer( sql("SELECT * FROM `tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil) - sql("DROP TABLE `tbl`") - sql("CREATE TABLE `tbl`(i INT) USING parquet") - sql("INSERT INTO `tbl` VALUES (1), (2), (3)") - checkAnswer( sql("SELECT * FROM `tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil) - sql("DROP TABLE `tbl`") + test("SPARK-19059: read file based table whose name starts with underscore") { + withTable("_tbl") { + sql("CREATE TABLE `_tbl`(i INT) USING parquet") + sql("INSERT INTO `_tbl` VALUES (1), (2), (3)") + checkAnswer( sql("SELECT * FROM `_tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil) + } } }