From c40f8a1a6ba12dd51e270e71594450c8e0df8dce Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 22 Aug 2019 23:10:16 -0700 Subject: [PATCH] [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc files This PR fixes the leak of crc files from CheckpointFileManager when FileContextBasedCheckpointFileManager is being used. Spark hits the Hadoop bug, [HADOOP-16255](https://issues.apache.org/jira/browse/HADOOP-16255) which seems to be a long-standing issue. This is there're two `renameInternal` methods: ``` public void renameInternal(Path src, Path dst) public void renameInternal(final Path src, final Path dst, boolean overwrite) ``` which should be overridden to handle all cases but ChecksumFs only overrides method with 2 params, so when latter is called FilterFs.renameInternal(...) is called instead, and it will do rename with RawLocalFs as underlying filesystem. The bug is related to FileContext, so FileSystemBasedCheckpointFileManager is not affected. [SPARK-17475](https://issues.apache.org/jira/browse/SPARK-17475) took a workaround for this bug, but [SPARK-23966](https://issues.apache.org/jira/browse/SPARK-23966) seemed to bring regression. This PR deletes crc file as "best-effort" when renaming, as failing to delete crc file is not that critical to fail the task. This PR prevents crc files not being cleaned up even purging batches. Too many files in same directory often hurts performance, as well as each crc file occupies more space than its own size so possible to occupy nontrivial amount of space when batches go up to 100000+. No. Some unit tests are modified to check leakage of crc files. Closes #25488 from HeartSaVioR/SPARK-28025. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Shixiong Zhu --- .../streaming/CheckpointFileManager.scala | 14 +++++++++ .../CheckpointFileManagerSuite.scala | 16 ++++++++++ .../streaming/HDFSMetadataLogSuite.scala | 30 +++++++++++++++---- 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index b3e4240c315bc..285af65f78ca1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -329,6 +329,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) + // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved + mayRemoveCrcFile(srcPath) } @@ -345,5 +347,17 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs case _ => false } + + private def mayRemoveCrcFile(path: Path): Unit = { + try { + val checksumFile = new Path(path.getParent, s".${path.getName}.crc") + if (exists(checksumFile)) { + // checksum file exists, deleting it + delete(checksumFile) + } + } catch { + case NonFatal(_) => // ignore, we are removing crc file as "best-effort" + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index fe59cb25d5005..ab6fbeda77681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { assert(fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception + // crc file should not be leaked when origin file doesn't exist. + // The implementation of Hadoop filesystem may filter out checksum file, so + // listing files from local filesystem. + val fileNames = new File(path.getParent.toString).listFiles().toSeq + .filter(p => p.isFile).map(p => p.getName) + val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc")) + val originFileNamesForExistingCrcFiles = crcFiles.map { name => + // remove first "." and last ".crc" + name.substring(1, name.length - 4) + } + + // Check all origin files exist for all crc files. + assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet), + s"Some of origin files for crc files don't exist - crc files: $crcFiles / " + + s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames") + // Open and delete fm.open(path).close() fm.delete(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 0e36e7f5da122..e832422a93b51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.concurrent.Waiters._ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.UninterruptibleThread @@ -59,6 +60,21 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } test("HDFSMetadataLog: purge") { + testPurge() + } + + Seq( + classOf[FileSystemBasedCheckpointFileManager], + classOf[FileContextBasedCheckpointFileManager] + ).map(_.getCanonicalName).foreach { cls => + test(s"HDFSMetadataLog: purge - explicit file manager - $cls") { + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> cls) { + testPurge() + } + } + } + + private def testPurge(): Unit = { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) @@ -75,12 +91,16 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { assert(metadataLog.get(2).isDefined) assert(metadataLog.getLatest().get._1 == 2) - // There should be exactly one file, called "2", in the metadata directory. + // There should be at most two files, called "2", and optionally crc file, + // in the metadata directory. // This check also tests for regressions of SPARK-17475 - val allFiles = new File(metadataLog.metadataPath.toString).listFiles() - .filter(!_.getName.startsWith(".")).toSeq - assert(allFiles.size == 1) - assert(allFiles(0).getName() == "2") + val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + assert(allFiles.size <= 2) + assert(allFiles.exists(_.getName == "2")) + if (allFiles.size == 2) { + // there's possibly crc file being left as well + assert(allFiles.exists(_.getName == ".2.crc")) + } } }