From a14ff6974446b8e692b03c3e3f1cab52693cc6c4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 14 Feb 2018 21:13:24 -0800 Subject: [PATCH 1/3] [SPARK-23434][SQL] Spark should not warn `metadata directory` for a HDFS file path --- .../spark/sql/execution/streaming/FileStreamSink.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 2715fa93d0e98..7a5681291dac2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -42,9 +42,13 @@ object FileStreamSink extends Logging { try { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) - val metadataPath = new Path(hdfsPath, metadataDir) - val res = fs.exists(metadataPath) - res + if (fs.isDirectory(hdfsPath)) { + val metadataPath = new Path(hdfsPath, metadataDir) + val res = fs.exists(metadataPath) + res + } else { + false + } } catch { case NonFatal(e) => logWarning(s"Error while looking for metadata directory.") From a7432b7f471bd13d099fd16482c69c3d90800efa Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 20 Feb 2018 00:48:02 -0800 Subject: [PATCH 2/3] Address comment --- .../apache/spark/sql/execution/streaming/FileStreamSink.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 7a5681291dac2..d85872e4038a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -44,8 +44,7 @@ object FileStreamSink extends Logging { val fs = hdfsPath.getFileSystem(hadoopConf) if (fs.isDirectory(hdfsPath)) { val metadataPath = new Path(hdfsPath, metadataDir) - val res = fs.exists(metadataPath) - res + fs.exists(metadataPath) } else { false } From 27188e7c2bfdd6a74a4f1a41d475e8d1e9c0d65a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 20 Feb 2018 00:49:09 -0800 Subject: [PATCH 3/3] update more --- .../apache/spark/sql/execution/streaming/FileStreamSink.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index d85872e4038a8..87a17cebdc10c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -43,8 +43,7 @@ object FileStreamSink extends Logging { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) if (fs.isDirectory(hdfsPath)) { - val metadataPath = new Path(hdfsPath, metadataDir) - fs.exists(metadataPath) + fs.exists(new Path(hdfsPath, metadataDir)) } else { false }