From a11e9e1bcfa848777dee2eee4da9dc47fdb27349 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 17 Aug 2017 20:35:35 +0100 Subject: [PATCH 1/5] SPARK-21762 handle FNFE events in BasicWriteStatsTracker; add a suite of tests for various file states. Change-Id: I3269cb901a38b33e399ebef10b2dbcd51ccf9b75 --- .../datasources/BasicWriteStatsTracker.scala | 14 +- .../BasicWriteTaskStatsTrackerSuite.scala | 192 ++++++++++++++++++ 2 files changed, 204 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index b8f7d130d569f..2f318e79db2d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -44,7 +47,7 @@ case class BasicWriteTaskStats( * @param hadoopConf */ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) - extends WriteTaskStatsTracker { + extends WriteTaskStatsTracker with Logging { private[this] var numPartitions: Int = 0 private[this] var numFiles: Int = 0 @@ -57,7 +60,14 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) private def getFileSize(filePath: String): Long = { val path = new Path(filePath) val fs = path.getFileSystem(hadoopConf) - fs.getFileStatus(path).getLen() + try { + fs.getFileStatus(path).getLen() + } catch { + case e: FileNotFoundException => + // may arise against eventually consistent object stores + logInfo(s"File $path is not yet visible", e) + 0 + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala new file mode 100644 index 0000000000000..bd32857f10f3a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.nio.charset.Charset + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +/** + * Test how BasicWriteTaskStatsTracker handles files. + */ +class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { + + private val tempDir = Utils.createTempDir() + private val tempDirPath = new Path(tempDir.toURI) + private val conf = new Configuration() + private val localfs = tempDirPath.getFileSystem(conf) + private val data1 = "0123456789".getBytes(Charset.forName("US-ASCII")) + private val data2 = "012".getBytes(Charset.forName("US-ASCII")) + private val len1 = data1.length + private val len2 = data2.length + + /** + * In teardown delete the temp dir. + */ + protected override def afterAll(): Unit = { + Utils.deleteRecursively(tempDir) + } + + /** + * Assert that the stats match that expected. + * @param tracker tracker to check + * @param files number of files expected + * @param bytes total number of bytes expected + */ + private def assertStats( + tracker: BasicWriteTaskStatsTracker, + files: Int, + bytes: Int): Unit = { + val stats = tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + assert(files === stats.numFiles, "Wrong number of files") + assert(bytes === stats.numBytes, "Wrong byte count of file size") + } + + test("No files in run") { + val tracker = new BasicWriteTaskStatsTracker(conf) + assertStats(tracker, 0, 0) + } + + test("Missing File") { + val missing = new Path(tempDirPath, "missing") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(missing.toString) + assertStats(tracker, 1, 0) + } + + test("0 byte file") { + val file = new Path(tempDirPath, "file0") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file.toString) + touch(file) + assertStats(tracker, 1, 0) + } + + test("File with data") { + val file = new Path(tempDirPath, "file-with-data") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file.toString) + write1(file) + assertStats(tracker, 1, len1) + } + + test("Open file") { + val file = new Path(tempDirPath, "file-open") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file.toString) + val stream = localfs.create(file, true) + try { + assertStats(tracker, 1, 0) + stream.write(data1) + stream.flush() + // file should exist, but size undefined + val stats = tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + assert(1 === stats.numFiles, "Wrong number of files") + } finally + stream.close() + } + + test("Two files") { + val file1 = new Path(tempDirPath, "f-2-1") + val file2 = new Path(tempDirPath, "f-2-2") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file1.toString) + write1(file1) + tracker.newFile(file2.toString) + write2(file2) + assertStats(tracker, 2, len1 + len2) + } + + test("Three files, last one empty") { + val file1 = new Path(tempDirPath, "f-3-1") + val file2 = new Path(tempDirPath, "f-3-2") + val file3 = new Path(tempDirPath, "f-3-2") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file1.toString) + write1(file1) + tracker.newFile(file2.toString) + write2(file2) + tracker.newFile(file3.toString) + touch(file3) + assertStats(tracker, 3, len1 + len2) + } + + test("Three files, one not found") { + val file1 = new Path(tempDirPath, "f-4-1") + val file2 = new Path(tempDirPath, "f-4-2") + val file3 = new Path(tempDirPath, "f-3-2") + val tracker = new BasicWriteTaskStatsTracker(conf) + // file 1 + tracker.newFile(file1.toString) + write1(file1) + + // file 2 is noted, but not visible + tracker.newFile(file2.toString) + touch(file3) + + // file 3 is created + tracker.newFile(file3.toString) + write2(file3) + assertStats(tracker, 3, len1 + len2) + } + + /** + * Write a 0-byte file. + * @param file file path + */ + private def touch(file: Path): Unit = { + localfs.create(file, true).close() + } + + /** + * Write a byte array. + * @param file path to file + * @param data data + * @return bytes written + */ + private def write(file: Path, data: Array[Byte]): Integer = { + val stream = localfs.create(file, true) + try { + stream.write(data) + } finally + stream.close() + data.length + } + + /** + * Write a data1 array. + * @param file file + */ + private def write1(file: Path): Unit = { + write(file, data1) + } + + /** + * Write a data2 array. + * + * @param file file + */ + private def write2(file: Path): Unit = { + write(file, data2) + } + +} From f86b234be38450bcf42a036bf9d1993dfc5d50f7 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 17 Aug 2017 21:01:50 +0100 Subject: [PATCH 2/5] SPARK-21762 add tests for "" and null filenames Change-Id: I38ac11c808849e2fd91f4931f4cb5cdfad43e2af --- .../BasicWriteTaskStatsTrackerSuite.scala | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala index bd32857f10f3a..536c00e0c8dee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala @@ -56,11 +56,15 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { tracker: BasicWriteTaskStatsTracker, files: Int, bytes: Int): Unit = { - val stats = tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + val stats = finalStatus(tracker) assert(files === stats.numFiles, "Wrong number of files") assert(bytes === stats.numBytes, "Wrong byte count of file size") } + private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = { + tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + } + test("No files in run") { val tracker = new BasicWriteTaskStatsTracker(conf) assertStats(tracker, 0, 0) @@ -73,6 +77,22 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { assertStats(tracker, 1, 0) } + test("Empty filename is forwarded") { + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile("") + intercept[IllegalArgumentException] { + finalStatus(tracker) + } + } + + test("Null filename is only picked up in final status") { + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(null) + intercept[IllegalArgumentException] { + finalStatus(tracker) + } + } + test("0 byte file") { val file = new Path(tempDirPath, "file0") val tracker = new BasicWriteTaskStatsTracker(conf) @@ -98,11 +118,10 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { assertStats(tracker, 1, 0) stream.write(data1) stream.flush() - // file should exist, but size undefined - val stats = tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] - assert(1 === stats.numFiles, "Wrong number of files") - } finally + assert(1 === finalStatus(tracker).numFiles, "Wrong number of files") + } finally { stream.close() + } } test("Two files") { @@ -167,8 +186,9 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { val stream = localfs.create(file, true) try { stream.write(data) - } finally - stream.close() + } finally { + stream.close() + } data.length } From d7a63bc8c162bbea771cfc8b338e60f8070b3fc7 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 18 Aug 2017 11:11:29 +0100 Subject: [PATCH 3/5] SPARK-21762, remove needless touch(file3) Change-Id: I6d101ece0cccbd8403dff10004575a24109e6f1b --- .../BasicWriteTaskStatsTrackerSuite.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala index 536c00e0c8dee..ece0fb25e8251 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala @@ -27,6 +27,12 @@ import org.apache.spark.util.Utils /** * Test how BasicWriteTaskStatsTracker handles files. + * + * Two different datasets are written (alongside 0), one of + * length 10, one of 3. They were chosen to be distinct enough + * that it is straightforward to determine which file lengths were added + * from the sum of all files added. Lengths like "10" and "5" would + * be less informative. */ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { @@ -158,13 +164,14 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { tracker.newFile(file1.toString) write1(file1) - // file 2 is noted, but not visible + // file 2 is noted, but not created tracker.newFile(file2.toString) - touch(file3) - // file 3 is created + // file 3 is noted & then created tracker.newFile(file3.toString) write2(file3) + + // the expeected size is file1 + file3 assertStats(tracker, 3, len1 + len2) } From 03cb7817cc8b2b335f86db2bec843894c699fffc Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 13 Oct 2017 11:35:53 +0100 Subject: [PATCH 4/5] SPARK-21762 * Use Option to track whether or not current file is set; guarantees once-only invocation, amongst other things * separate counting of submitted files from number of files actually seen * Log at debug if an FNFE when caught * Log at info only at the end of a sequance of writes Change-Id: Id242c11338be1f41a3f9a5b8b30c796ac5b002a2 --- .../datasources/BasicWriteStatsTracker.scala | 41 +++++++++++++------ .../BasicWriteTaskStatsTrackerSuite.scala | 7 ++-- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index 2f318e79db2d3..11af0aaa7b206 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -51,22 +51,27 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) private[this] var numPartitions: Int = 0 private[this] var numFiles: Int = 0 + private[this] var submittedFiles: Int = 0 private[this] var numBytes: Long = 0L private[this] var numRows: Long = 0L - private[this] var curFile: String = null + private[this] var curFile: Option[String] = None - - private def getFileSize(filePath: String): Long = { + /** + * Get the size of the file expected to have been written by a worker. + * @param filePath path to the file + * @return the file size or None if the file was not found. + */ + private def getFileSize(filePath: String): Option[Long] = { val path = new Path(filePath) val fs = path.getFileSystem(hadoopConf) try { - fs.getFileStatus(path).getLen() + Some(fs.getFileStatus(path).getLen()) } catch { case e: FileNotFoundException => // may arise against eventually consistent object stores - logInfo(s"File $path is not yet visible", e) - 0 + logDebug(s"File $path is not yet visible", e) + None } } @@ -80,12 +85,19 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) } override def newFile(filePath: String): Unit = { - if (numFiles > 0) { - // we assume here that we've finished writing to disk the previous file by now - numBytes += getFileSize(curFile) + statCurrentFile() + curFile = Some(filePath) + submittedFiles += 1 + } + + private def statCurrentFile(): Unit = { + curFile.foreach { path => + getFileSize(path).foreach { len => + numBytes += len + numFiles += 1 + } + curFile = None } - curFile = filePath - numFiles += 1 } override def newRow(row: InternalRow): Unit = { @@ -93,8 +105,11 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) } override def getFinalStats(): WriteTaskStats = { - if (numFiles > 0) { - numBytes += getFileSize(curFile) + statCurrentFile() + if (submittedFiles != numFiles) { + logInfo(s"Expected $submittedFiles files, but only saw $numFiles. " + + "This could be due to the output format not writing empty files, " + + "or files being not immediately visible in the filesystem.") } BasicWriteTaskStats(numPartitions, numFiles, numBytes, numRows) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala index ece0fb25e8251..bf3c8ede9a980 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala @@ -80,7 +80,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { val missing = new Path(tempDirPath, "missing") val tracker = new BasicWriteTaskStatsTracker(conf) tracker.newFile(missing.toString) - assertStats(tracker, 1, 0) + assertStats(tracker, 0, 0) } test("Empty filename is forwarded") { @@ -171,8 +171,9 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { tracker.newFile(file3.toString) write2(file3) - // the expeected size is file1 + file3 - assertStats(tracker, 3, len1 + len2) + // the expected size is file1 + file3; only two files are reported + // as found + assertStats(tracker, 2, len1 + len2) } /** From c0e81a1c87011efdc010f1c9ba28dde003458667 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 13 Oct 2017 12:38:06 +0100 Subject: [PATCH 5/5] SPARK-21762: pull in Donjoon's test from PR #19477 This is going to create merge conflict with this branch until I rebase it, which I'm about to Change-Id: Ie2309066ad7892cb20155d9de8248c1682bba526 --- .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 94fa43dec7313..60935c3e85c43 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2110,4 +2110,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + Seq("orc", "parquet", "csv", "json", "text").foreach { format => + test(s"Writing empty datasets should not fail - $format") { + withTempDir { dir => + Seq("str").toDS.limit(0).write.format(format).save(dir.getCanonicalPath + "/tmp") + } + } + } }