From 30b82df1b3dc35049724c4e2ccc0613bbaa3b8fb Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 5 Nov 2018 13:32:51 +0900 Subject: [PATCH 01/11] SPARK-20568 Rename files which are completed in previous batch This commit rebases 12 commits to make rebase easier. * Add new option to programming guide doc * Address review comments - introduce two modes: "archive", "delete" * Add missing explanation * Minor correction * Address review comments from @gaborgsomogyi * Address review comments from @zsxwing * Fix a bug regarding finding base directory which don't have glob pattern in ancestor * Apply a new approach: find overlap against final archive path instead of base archive path * leverage GlobFilter in Hadoop * add a new UT verifying with some corner cases * Refactor a bit * Address review comments from @gaborgsomogyi * Address review comment from @zsxwing --- .../structured-streaming-programming-guide.md | 7 + .../streaming/FileStreamOptions.scala | 46 +++ .../streaming/FileStreamSource.scala | 156 ++++++++- .../sql/streaming/FileStreamSourceSuite.scala | 313 +++++++++++++++++- 4 files changed, 509 insertions(+), 13 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 2a405f36fd5fd..2bd9e1558b08e 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -546,6 +546,13 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
+ cleanSource: option to clean up completed files after processing.
+ Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op".
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again.
+ Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"
+ NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation.
+ NOTE 2: The source path should not be used from multiple sources or queries when enabling this option, because source files will be moved or deleted which behavior may impact the other sources and queries.
+ NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.

For file-format-specific options, see the related methods in DataStreamReader (Scala/Java/Python/ try { @@ -86,3 +112,23 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging }.getOrElse(default) } } + +object CleanSourceMode extends Enumeration { + val ARCHIVE, DELETE, NO_OP = Value + + def fromString(value: String): CleanSourceMode.Value = { + val matchedModeOpt = CleanSourceMode.values.find(_.toString == value.toUpperCase(Locale.ROOT)) + matchedModeOpt match { + case None => + throw new IllegalArgumentException(s"Invalid mode for clean source option $value." + + s" Must be one of ${CleanSourceMode.values.mkString(",")}") + case Some(matchedMode) => + matchedMode + } + } + + def fromString(value: Option[String]): CleanSourceMode.Value = value match { + case Some(mode) => fromString(mode) + case None => NO_OP + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 92eef6af2238c..c91e30d422e44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.streaming import java.net.URI import java.util.concurrent.TimeUnit._ -import org.apache.hadoop.fs.{FileStatus, Path} +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -53,6 +55,9 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contain glob patterns } + private val sourceCleaner = new FileStreamSourceCleaner(fs, qualifiedBasePath, + sourceOptions.sourceArchiveDir) + private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ { if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) { Map("basePath" -> path) @@ -237,6 +242,7 @@ class FileStreamSource( val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => (status.getPath.toUri.toString, status.getModificationTime) } + val endTime = System.nanoTime val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime) if (listingTimeMs > 2000) { @@ -258,8 +264,26 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { - // No-op for now; FileStreamSource currently garbage-collects files based on timestamp - // and the value of the maxFileAge parameter. + val logOffset = FileStreamSourceOffset(end).logOffset + + if (sourceOptions.cleanSource != CleanSourceMode.NO_OP) { + val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) + val validFileEntities = files.filter(_.batchId == logOffset) + logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") + sourceOptions.cleanSource match { + case CleanSourceMode.ARCHIVE => + validFileEntities.foreach(sourceCleaner.archive) + + case CleanSourceMode.DELETE => + validFileEntities.foreach(sourceCleaner.remove) + + case _ => + } + } else { + // No-op for now; FileStreamSource currently garbage-collects files based on timestamp + // and the value of the maxFileAge parameter. + } + } override def stop(): Unit = {} @@ -267,7 +291,6 @@ class FileStreamSource( object FileStreamSource { - /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long @@ -330,4 +353,129 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + + private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + + private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + + def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { + logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { + doArchive(curPath, newPath) + } + } + + def remove(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { + logDebug(s"Removing completed file $curPath") + + if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") + } + } catch { + case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } + } + + private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { + filters += new GlobFilter(currentPath.getName) + currentPath = currentPath.getParent + } + + filters.toList + } + + private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePathString.isDefined) + val baseArchivePathStr = baseArchivePathString.get + val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) { + baseArchivePathStr.substring(0, baseArchivePathStr.length - 1) + } else { + baseArchivePathStr + } + + new Path(normalizedBaseArchiveDirPath + pathUri.getPath) + } + + private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { + if (baseArchivePath.get.depth() > 2) { + // there's no chance for archive file to be matched against source pattern + return false + } + + var matched = true + + // new path will never match against source path when the depth is not a range of + // the depth of source path ~ (the depth of source path + 1) + // because the source files are picked when they match against source pattern or + // their parent directories match against source pattern + val depthSourcePattern = sourceGlobFilters.length + val depthArchiveFile = archiveFile.depth() + + // we already checked against the depth of archive path, but rechecking wouldn't hurt + if (depthArchiveFile < depthSourcePattern || depthArchiveFile > depthSourcePattern + 1) { + // never matched + matched = false + } else { + var pathToCompare = if (depthArchiveFile == depthSourcePattern + 1) { + archiveFile.getParent + } else { + archiveFile + } + + // Now pathToCompare should have same depth as sourceGlobFilters.length + var index = 0 + do { + // GlobFilter only matches against its name, not full path so it's safe to compare + if (!sourceGlobFilters(index).accept(pathToCompare)) { + matched = false + } else { + pathToCompare = pathToCompare.getParent + index += 1 + } + } while (matched && !pathToCompare.isRoot) + } + + matched + } + + private def doArchive(sourcePath: Path, archivePath: Path): Unit = { + try { + logDebug(s"Creating directory if it doesn't exist ${archivePath.getParent}") + if (!fileSystem.exists(archivePath.getParent)) { + fileSystem.mkdirs(archivePath.getParent) + } + + logDebug(s"Archiving completed file $sourcePath to $archivePath") + if (!fileSystem.rename(sourcePath, archivePath)) { + logWarning(s"Fail to move $sourcePath to $archivePath / skip moving file.") + } + } catch { + case NonFatal(e) => + logWarning(s"Fail to move $sourcePath to $archivePath / skip moving file.", e) + } + } + + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 75a0ae7cfe06a..a16e39b94123e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -20,23 +20,26 @@ package org.apache.spark.sql.streaming import java.io.File import java.net.URI +import scala.collection.mutable import scala.util.Random -import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.util.Progressable import org.scalatest.PrivateMethodTester -import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap} +import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, FileStreamSourceCleaner, SeenFilesMap} import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{StructType, _} import org.apache.spark.util.Utils abstract class FileStreamSourceTest @@ -177,7 +180,6 @@ abstract class FileStreamSourceTest } } - protected def withTempDirs(body: (File, File) => Unit): Unit = { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") @@ -189,6 +191,19 @@ abstract class FileStreamSourceTest } } + protected def withThreeTempDirs(body: (File, File, File) => Unit): Unit = { + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") + val archive = Utils.createTempDir(namePrefix = "streaming.archive") + try { + body(src, tmp, archive) + } finally { + Utils.deleteRecursively(src) + Utils.deleteRecursively(tmp) + Utils.deleteRecursively(archive) + } + } + val valueSchema = new StructType().add("value", StringType) } @@ -1386,9 +1401,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest { latestFirst: Boolean, firstBatch: String, secondBatch: String, - maxFileAge: Option[String] = None): Unit = { + maxFileAge: Option[String] = None, + cleanSource: CleanSourceMode.Value = CleanSourceMode.NO_OP, + archiveDir: Option[String] = None): Unit = { val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++ - maxFileAge.map("maxFileAge" -> _) + maxFileAge.map("maxFileAge" -> _) ++ + Seq("cleanSource" -> cleanSource.toString) ++ + archiveDir.map("sourceArchiveDir" -> _) val fileStream = createFileStream( "text", src.getCanonicalPath, @@ -1542,12 +1561,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val unioned = source1.union(source2) def addMultiTextFileData( - source1Content: String, - source2Content: String): StreamAction = { + source1Content: String, + source2Content: String): StreamAction = { val actions = Seq( AddTextFileData(source1Content, sourceDir1, tmp), AddTextFileData(source2Content, sourceDir2, tmp) - ).filter(_.content != null) // don't write to a source dir if no content specified + ).filter(_.content != null) // don't write to a source dir if no content specified StreamProgressLockedActions(actions, desc = actions.mkString("[ ", " | ", " ]")) } @@ -1596,6 +1615,282 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + test("remove completed files when remove option is enabled") { + def assertFileIsRemoved(files: Array[String], fileName: String): Unit = { + assert(!files.exists(_.startsWith(fileName))) + } + + def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = { + assert(files.exists(_.startsWith(fileName))) + } + + withTempDirs { case (src, tmp) => + withSQLConf( + SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", + // Force deleting the old logs + SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1" + ) { + val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1", + "cleanSource" -> "delete") + + val fileStream = createFileStream("text", src.getCanonicalPath, options = option) + val filtered = fileStream.filter($"value" contains "keep") + + testStream(filtered)( + AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"), + CheckAnswer("keep1"), + AssertOnQuery("input file removed") { _: StreamExecution => + // it doesn't rename any file yet + assertFileIsNotRemoved(src.list(), "keep1") + true + }, + AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"), + CheckAnswer("keep1", "keep2"), + AssertOnQuery("input file removed") { _: StreamExecution => + val files = src.list() + + // it renames input file for first batch, but not for second batch yet + assertFileIsRemoved(files, "keep1") + assertFileIsNotRemoved(files, "ke ep2 %") + + true + }, + AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"), + CheckAnswer("keep1", "keep2", "keep3"), + AssertOnQuery("input file renamed") { _: StreamExecution => + val files = src.list() + + // it renames input file for second batch, but not third batch yet + assertFileIsRemoved(files, "ke ep2 %") + assertFileIsNotRemoved(files, "keep3") + + true + } + ) + } + } + } + + test("move completed files to archive directory when archive option is enabled") { + + withThreeTempDirs { case (src, tmp, archiveDir) => + withSQLConf( + SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", + // Force deleting the old logs + SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1" + ) { + val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1", + "cleanSource" -> "archive", "sourceArchiveDir" -> archiveDir.getAbsolutePath) + + val fileStream = createFileStream("text", s"${src.getCanonicalPath}/*/*", + options = option) + val filtered = fileStream.filter($"value" contains "keep") + + // src/k %1 + // file: src/k %1/keep1 + val dirForKeep1 = new File(src, "k %1") + // src/k %1/k 2 + // file: src/k %1/k 2/keep2 + val dirForKeep2 = new File(dirForKeep1, "k 2") + // src/k3 + // file: src/k3/keep3 + val dirForKeep3 = new File(src, "k3") + + val expectedMovedDir1 = new File(archiveDir.getAbsolutePath + dirForKeep1.toURI.getPath) + val expectedMovedDir2 = new File(archiveDir.getAbsolutePath + dirForKeep2.toURI.getPath) + val expectedMovedDir3 = new File(archiveDir.getAbsolutePath + dirForKeep3.toURI.getPath) + + testStream(filtered)( + AddTextFileData("keep1", dirForKeep1, tmp, tmpFilePrefix = "keep1"), + CheckAnswer("keep1"), + AssertOnQuery("input file archived") { _: StreamExecution => + // it doesn't rename any file yet + assertFileIsNotMoved(dirForKeep1, expectedMovedDir1, "keep1") + true + }, + AddTextFileData("keep2", dirForKeep2, tmp, tmpFilePrefix = "keep2 %"), + CheckAnswer("keep1", "keep2"), + AssertOnQuery("input file archived") { _: StreamExecution => + // it renames input file for first batch, but not for second batch yet + assertFileIsMoved(dirForKeep1, expectedMovedDir1, "keep1") + assertFileIsNotMoved(dirForKeep2, expectedMovedDir2, "keep2 %") + true + }, + AddTextFileData("keep3", dirForKeep3, tmp, tmpFilePrefix = "keep3"), + CheckAnswer("keep1", "keep2", "keep3"), + AssertOnQuery("input file archived") { _: StreamExecution => + // it renames input file for second batch, but not third batch yet + assertFileIsMoved(dirForKeep2, expectedMovedDir2, "keep2 %") + assertFileIsNotMoved(dirForKeep3, expectedMovedDir3, "keep3") + + true + }, + AddTextFileData("keep4", dirForKeep3, tmp, tmpFilePrefix = "keep4"), + CheckAnswer("keep1", "keep2", "keep3", "keep4"), + AssertOnQuery("input file archived") { _: StreamExecution => + // it renames input file for third batch, but not fourth batch yet + assertFileIsMoved(dirForKeep3, expectedMovedDir3, "keep3") + assertFileIsNotMoved(dirForKeep3, expectedMovedDir3, "keep4") + + true + } + ) + } + } + } + + class FakeFileSystem extends FileSystem { + val requestsExists = new mutable.MutableList[Path]() + val requestsMkdirs = new mutable.MutableList[Path]() + val requestsRename = new mutable.MutableList[(Path, Path)]() + + override def exists(f: Path): Boolean = { + requestsExists += f + true + } + + override def mkdirs(f: Path, permission: FsPermission): Boolean = { + requestsMkdirs += f + true + } + + override def rename(src: Path, dst: Path): Boolean = { + requestsRename += ((src, dst)) + true + } + + def clearRecords(): Unit = { + requestsExists.clear() + requestsMkdirs.clear() + requestsRename.clear() + } + + override def getUri: URI = throw new NotImplementedError + + override def open(f: Path, bufferSize: Int): FSDataInputStream = throw new NotImplementedError + + override def create( + f: Path, + permission: FsPermission, + overwrite: Boolean, + bufferSize: Int, + replication: Short, + blockSize: Long, + progress: Progressable): FSDataOutputStream = throw new NotImplementedError + + override def append(f: Path, bufferSize: Int, progress: Progressable): FSDataOutputStream = + throw new NotImplementedError + + override def delete(f: Path, recursive: Boolean): Boolean = throw new NotImplementedError + + override def listStatus(f: Path): Array[FileStatus] = throw new NotImplementedError + + override def setWorkingDirectory(new_dir: Path): Unit = throw new NotImplementedError + + override def getWorkingDirectory: Path = throw new NotImplementedError + + override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError + } + + test("FileStreamSourceCleaner - archive - destinations match against source pattern") { + + def assertNoMove(fs: FakeFileSystem): Unit = { + assert(fs.requestsExists.isEmpty) + assert(fs.requestsMkdirs.isEmpty) + assert(fs.requestsRename.isEmpty) + } + + def assertMoveFile(fs: FakeFileSystem, sourcePath: Path, expectedArchivePath: Path): Unit = { + assert(fs.requestsExists.nonEmpty) + assert(fs.requestsExists.head === expectedArchivePath.getParent) + assert(fs.requestsMkdirs.isEmpty) + assert(fs.requestsRename.nonEmpty) + assert(fs.requestsRename.head === ((sourcePath, expectedArchivePath))) + } + + val fakeFileSystem = new FakeFileSystem() + + val sourcePatternPath = new Path("/hello*/h{e,f}ll?") + val baseArchiveDirPath = "/hello" + + val sourceCleaner = new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, + Some(baseArchiveDirPath)) + + // file 1: /hello/helln + // final destination = /hello/hello/helln + // parent directory of final destination matches source pattern + val sourcePath1 = new Path("/hello/helln") + sourceCleaner.archive(FileEntry(sourcePath1.toUri.getPath, 0, 0)) + + assertNoMove(fakeFileSystem) + + fakeFileSystem.clearRecords() + + // file 2: /hello/hfllo/spark + // final destination = /hello/hello/hfllo/spark + // no match + val sourcePath2 = new Path("/hello/hfllo/spark") + val expectedDestPath2 = new Path("/hello/hello/hfllo/spark") + sourceCleaner.archive(FileEntry(sourcePath2.toUri.getPath, 0, 0)) + + assertMoveFile(fakeFileSystem, sourcePath2, expectedDestPath2) + + fakeFileSystem.clearRecords() + + // file 3: /hello1/hflln + // final destination = /hello/hello1/hflln + // no match + val sourcePath3 = new Path("/hello1/hflln") + val expectedDestPath3 = new Path("/hello/hello1/hflln") + sourceCleaner.archive(FileEntry(sourcePath3.toUri.getPath, 0, 0)) + + assertMoveFile(fakeFileSystem, sourcePath3, expectedDestPath3) + + fakeFileSystem.clearRecords() + + // corner case: this should end up with all archive destinations to be + // matched against source pattern + val baseArchiveDirPath2 = "/" + + val sourceCleaner2 = new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, + Some(baseArchiveDirPath2)) + + // file 4 (& final destination): /hello/helln + // final destination matches source pattern + val sourcePath4 = new Path("/hello/helln") + sourceCleaner2.archive(FileEntry(sourcePath4.toUri.getPath, 0, 0)) + + assertNoMove(fakeFileSystem) + + fakeFileSystem.clearRecords() + + // file 5 (& final destination): /hello/hfllo/spark + // final destination matches source pattern + val sourcePath5 = new Path("/hello/hfllo/spark") + sourceCleaner2.archive(FileEntry(sourcePath5.toUri.getPath, 0, 0)) + + assertNoMove(fakeFileSystem) + + fakeFileSystem.clearRecords() + } + + private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = { + assert(sourceDir.exists()) + assert(sourceDir.list().exists(_.startsWith(filePrefix))) + if (!expectedDir.exists()) { + // OK + } else { + assert(!expectedDir.list().exists(_.startsWith(filePrefix))) + } + } + + private def assertFileIsMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = { + assert(sourceDir.exists()) + assert(!sourceDir.list().exists(_.startsWith(filePrefix))) + assert(expectedDir.exists()) + assert(expectedDir.list().exists(_.startsWith(filePrefix))) + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From 1697c86cc082d26b74ab1b15b636a5a2094a20d3 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 13 Feb 2019 19:25:38 +0900 Subject: [PATCH 02/11] Address review comment from mikedias --- docs/structured-streaming-programming-guide.md | 2 +- .../spark/sql/execution/streaming/FileStreamSource.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 2bd9e1558b08e..4d3b849e6a02e 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -550,7 +550,7 @@ Here are the details of all the sources in Spark. Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op".
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again.
Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"
- NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation.
+ NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option would reduce the cost to list source files which is considered as a heavy operation.
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option, because source files will be moved or deleted which behavior may impact the other sources and queries.
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index c91e30d422e44..b1a628bf3aae4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -275,7 +275,7 @@ class FileStreamSource( validFileEntities.foreach(sourceCleaner.archive) case CleanSourceMode.DELETE => - validFileEntities.foreach(sourceCleaner.remove) + validFileEntities.foreach(sourceCleaner.delete) case _ => } @@ -379,7 +379,7 @@ object FileStreamSource { } } - def remove(entry: FileEntry): Unit = { + def delete(entry: FileEntry): Unit = { val curPath = new Path(new URI(entry.path)) try { logDebug(s"Removing completed file $curPath") From 2f5a73fd7465d9c80fd18584c4241c90f56bc479 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 21 Aug 2019 07:15:54 +0900 Subject: [PATCH 03/11] Fix nit for style --- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index a16e39b94123e..832b9587fa21a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1561,8 +1561,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val unioned = source1.union(source2) def addMultiTextFileData( - source1Content: String, - source2Content: String): StreamAction = { + source1Content: String, + source2Content: String): StreamAction = { val actions = Seq( AddTextFileData(source1Content, sourceDir1, tmp), AddTextFileData(source2Content, sourceDir2, tmp) From 6e2a824d0294f1bd3c48caeb6404ecce17605ac3 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 15 Oct 2019 11:06:16 +0900 Subject: [PATCH 04/11] Reflect review comments --- .../structured-streaming-programming-guide.md | 4 +-- .../streaming/FileStreamOptions.scala | 27 +++++++------------ .../streaming/FileStreamSource.scala | 11 ++------ .../sql/streaming/FileStreamSourceSuite.scala | 2 +- 4 files changed, 14 insertions(+), 30 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 4d3b849e6a02e..f083f5eaad138 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -547,10 +547,10 @@ Here are the details of all the sources in Spark. "s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
cleanSource: option to clean up completed files after processing.
- Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op".
+ Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again.
Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"
- NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option would reduce the cost to list source files which is considered as a heavy operation.
+ NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which is considered as a heavy operation.
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option, because source files will be moved or deleted which behavior may impact the other sources and queries.
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 95f30c81077a0..712ed1585bc8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -89,7 +89,7 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") /** - * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + * Defines how to clean up completed files. Available options are "archive", "delete", "off". */ val cleanSource: CleanSourceMode.Value = { val matchedMode = CleanSourceMode.fromString(parameters.get("cleanSource")) @@ -114,21 +114,12 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging } object CleanSourceMode extends Enumeration { - val ARCHIVE, DELETE, NO_OP = Value - - def fromString(value: String): CleanSourceMode.Value = { - val matchedModeOpt = CleanSourceMode.values.find(_.toString == value.toUpperCase(Locale.ROOT)) - matchedModeOpt match { - case None => - throw new IllegalArgumentException(s"Invalid mode for clean source option $value." + - s" Must be one of ${CleanSourceMode.values.mkString(",")}") - case Some(matchedMode) => - matchedMode - } - } - - def fromString(value: Option[String]): CleanSourceMode.Value = value match { - case Some(mode) => fromString(mode) - case None => NO_OP - } + val ARCHIVE, DELETE, OFF = Value + + def fromString(value: Option[String]): CleanSourceMode.Value = value.map { v => + CleanSourceMode.values.find(_.toString == v.toUpperCase(Locale.ROOT)) + .getOrElse(throw new IllegalArgumentException( + s"Invalid mode for clean source option $value." + + s" Must be one of ${CleanSourceMode.values.mkString(",")}")) + }.getOrElse(OFF) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index b1a628bf3aae4..71ac8edbda37b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -266,7 +266,7 @@ class FileStreamSource( override def commit(end: Offset): Unit = { val logOffset = FileStreamSourceOffset(end).logOffset - if (sourceOptions.cleanSource != CleanSourceMode.NO_OP) { + if (sourceOptions.cleanSource != CleanSourceMode.OFF) { val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) val validFileEntities = files.filter(_.batchId == logOffset) logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") @@ -408,14 +408,7 @@ object FileStreamSource { private def buildArchiveFilePath(pathUri: URI): Path = { require(baseArchivePathString.isDefined) - val baseArchivePathStr = baseArchivePathString.get - val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) { - baseArchivePathStr.substring(0, baseArchivePathStr.length - 1) - } else { - baseArchivePathStr - } - - new Path(normalizedBaseArchiveDirPath + pathUri.getPath) + new Path(baseArchivePathString.get.stripSuffix("/") + pathUri.getPath) } private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 832b9587fa21a..8d8d5ba358287 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1402,7 +1402,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { firstBatch: String, secondBatch: String, maxFileAge: Option[String] = None, - cleanSource: CleanSourceMode.Value = CleanSourceMode.NO_OP, + cleanSource: CleanSourceMode.Value = CleanSourceMode.OFF, archiveDir: Option[String] = None): Unit = { val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++ maxFileAge.map("maxFileAge" -> _) ++ From 33a5331c4517eda23c512a4654981ca3aed9a9a5 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 16 Oct 2019 08:46:12 +0900 Subject: [PATCH 05/11] Reflect review comments --- .../streaming/FileStreamSource.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 71ac8edbda37b..b78dadc681c2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -407,10 +407,25 @@ object FileStreamSource { } private def buildArchiveFilePath(pathUri: URI): Path = { - require(baseArchivePathString.isDefined) - new Path(baseArchivePathString.get.stripSuffix("/") + pathUri.getPath) + require(baseArchivePath.isDefined) + new Path(baseArchivePath.get, pathUri.getPath.stripPrefix("/")) } + /** + * This method checks whether the destination of archive file will be under the source path + * (which contains glob) to prevent the possibility of overwriting/re-reading as input. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this method leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { if (baseArchivePath.get.depth() > 2) { // there's no chance for archive file to be matched against source pattern From b1a6bec350ae5c6be60bfbd9abeb64901b9ff736 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 24 Oct 2019 12:12:22 +0900 Subject: [PATCH 06/11] Reflect review comments --- .../structured-streaming-programming-guide.md | 6 +- .../streaming/FileStreamSource.scala | 123 ++++++++++-------- .../sql/streaming/FileStreamSourceSuite.scala | 66 ++++++---- 3 files changed, 112 insertions(+), 83 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index f083f5eaad138..87af6e20eb27e 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -548,10 +548,10 @@ Here are the details of all the sources in Spark. "s3a://a/b/c/dataset.txt"
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
- When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again.
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included as new source files.
Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"
- NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which is considered as a heavy operation.
- NOTE 2: The source path should not be used from multiple sources or queries when enabling this option, because source files will be moved or deleted which behavior may impact the other sources and queries.
+ NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
+ NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.

For file-format-specific options, see the related methods in DataStreamReader diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index b78dadc681c2a..2785ae653b83b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -55,8 +55,17 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contain glob patterns } - private val sourceCleaner = new FileStreamSourceCleaner(fs, qualifiedBasePath, - sourceOptions.sourceArchiveDir) + private val sourceCleaner: FileStreamSourceCleaner = { + val (archiveFs, qualifiedArchivePath) = sourceOptions.sourceArchiveDir match { + case Some(dir) => + val path = new Path(dir) + val fs = path.getFileSystem(hadoopConf) + (Some(fs), Some(fs.makeQualified(path))) + + case None => (None, None) + } + new FileStreamSourceCleaner(fs, qualifiedBasePath, archiveFs, qualifiedArchivePath) + } private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ { if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) { @@ -357,25 +366,54 @@ object FileStreamSource { private[sql] class FileStreamSourceCleaner( fileSystem: FileSystem, sourcePath: Path, - baseArchivePathString: Option[String]) extends Logging { + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { + require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) - private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + private val sameFsSourceAndArchive: Boolean = { + baseArchiveFileSystem.exists { fs => + if (fileSystem.getUri != fs.getUri) { + logWarning("Base archive path is located to the different filesystem with source, " + + s"which is not supported. source path: ${sourcePath} / base archive path: " + + s"${baseArchivePath.get}") + false + } else { + true + } + } + } + + /** + * This is a flag to skip matching archived path with source path. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this flag leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ + private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 2) def archive(entry: FileEntry): Unit = { require(baseArchivePath.isDefined) - val curPath = new Path(new URI(entry.path)) - val curPathUri = curPath.toUri - - val newPath = buildArchiveFilePath(curPathUri) + if (sameFsSourceAndArchive) { + val curPath = new Path(new URI(entry.path)) + val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) - if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { - logWarning(s"Fail to move $curPath to $newPath - destination matches " + - s"to source path/pattern. Skip moving file.") - } else { - doArchive(curPath, newPath) + if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) { + logWarning(s"Skip moving $curPath to $newPath - destination matches " + + s"to source path/pattern.") + } else { + doArchive(curPath, newPath) + } } } @@ -385,7 +423,7 @@ object FileStreamSource { logDebug(s"Removing completed file $curPath") if (!fileSystem.delete(curPath, false)) { - logWarning(s"Fail to remove $curPath / skip removing file.") + logWarning(s"Failed to remove $curPath / skip removing file.") } } catch { case NonFatal(e) => @@ -406,32 +444,11 @@ object FileStreamSource { filters.toList } - private def buildArchiveFilePath(pathUri: URI): Path = { - require(baseArchivePath.isDefined) - new Path(baseArchivePath.get, pathUri.getPath.stripPrefix("/")) - } - /** * This method checks whether the destination of archive file will be under the source path * (which contains glob) to prevent the possibility of overwriting/re-reading as input. - * - * FileStreamSource reads the files which one of below conditions is met: - * 1) file itself is matched with source path - * 2) parent directory is matched with source path - * - * Checking with glob pattern is costly, so this method leverages above information to prune - * the cases where the file cannot be matched with source path. For example, when file is - * moved to archive directory, destination path will retain input file's path as suffix, - * so destination path can't be matched with source path if archive directory's depth is - * longer than 2, as neither file nor parent directory of destination path can be matched - * with source path. */ - private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { - if (baseArchivePath.get.depth() > 2) { - // there's no chance for archive file to be matched against source pattern - return false - } - + private def pathMatchesSourcePattern(archiveFile: Path): Boolean = { var matched = true // new path will never match against source path when the depth is not a range of @@ -441,29 +458,23 @@ object FileStreamSource { val depthSourcePattern = sourceGlobFilters.length val depthArchiveFile = archiveFile.depth() - // we already checked against the depth of archive path, but rechecking wouldn't hurt - if (depthArchiveFile < depthSourcePattern || depthArchiveFile > depthSourcePattern + 1) { - // never matched - matched = false + var pathToCompare = if (depthArchiveFile == depthSourcePattern + 1) { + archiveFile.getParent } else { - var pathToCompare = if (depthArchiveFile == depthSourcePattern + 1) { - archiveFile.getParent + archiveFile + } + + // Now pathToCompare should have same depth as sourceGlobFilters.length + var index = 0 + do { + // GlobFilter only matches against its name, not full path so it's safe to compare + if (!sourceGlobFilters(index).accept(pathToCompare)) { + matched = false } else { - archiveFile + pathToCompare = pathToCompare.getParent + index += 1 } - - // Now pathToCompare should have same depth as sourceGlobFilters.length - var index = 0 - do { - // GlobFilter only matches against its name, not full path so it's safe to compare - if (!sourceGlobFilters(index).accept(pathToCompare)) { - matched = false - } else { - pathToCompare = pathToCompare.getParent - index += 1 - } - } while (matched && !pathToCompare.isRoot) - } + } while (matched && !pathToCompare.isRoot) matched } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 8d8d5ba358287..cc9cb220b3b54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1673,7 +1673,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("move completed files to archive directory when archive option is enabled") { - withThreeTempDirs { case (src, tmp, archiveDir) => withSQLConf( SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", @@ -1740,7 +1739,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } - class FakeFileSystem extends FileSystem { + class FakeFileSystem(scheme: String) extends FileSystem { val requestsExists = new mutable.MutableList[Path]() val requestsMkdirs = new mutable.MutableList[Path]() val requestsRename = new mutable.MutableList[(Path, Path)]() @@ -1766,7 +1765,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { requestsRename.clear() } - override def getUri: URI = throw new NotImplementedError + override def getUri: URI = URI.create(s"${scheme}:///") override def open(f: Path, bufferSize: Int): FSDataInputStream = throw new NotImplementedError @@ -1788,34 +1787,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest { override def setWorkingDirectory(new_dir: Path): Unit = throw new NotImplementedError - override def getWorkingDirectory: Path = throw new NotImplementedError + override def getWorkingDirectory: Path = new Path("/somewhere") override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError } test("FileStreamSourceCleaner - archive - destinations match against source pattern") { - - def assertNoMove(fs: FakeFileSystem): Unit = { - assert(fs.requestsExists.isEmpty) - assert(fs.requestsMkdirs.isEmpty) - assert(fs.requestsRename.isEmpty) - } - - def assertMoveFile(fs: FakeFileSystem, sourcePath: Path, expectedArchivePath: Path): Unit = { - assert(fs.requestsExists.nonEmpty) - assert(fs.requestsExists.head === expectedArchivePath.getParent) - assert(fs.requestsMkdirs.isEmpty) - assert(fs.requestsRename.nonEmpty) - assert(fs.requestsRename.head === ((sourcePath, expectedArchivePath))) - } - - val fakeFileSystem = new FakeFileSystem() + val fakeFileSystem = new FakeFileSystem("fake") val sourcePatternPath = new Path("/hello*/h{e,f}ll?") - val baseArchiveDirPath = "/hello" + val baseArchiveDirPath = new Path("/hello") val sourceCleaner = new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, - Some(baseArchiveDirPath)) + Some(fakeFileSystem), Some(baseArchiveDirPath)) // file 1: /hello/helln // final destination = /hello/hello/helln @@ -1851,10 +1835,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // corner case: this should end up with all archive destinations to be // matched against source pattern - val baseArchiveDirPath2 = "/" + val baseArchiveDirPath2 = new Path("/") val sourceCleaner2 = new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, - Some(baseArchiveDirPath2)) + Some(fakeFileSystem), Some(baseArchiveDirPath2)) // file 4 (& final destination): /hello/helln // final destination matches source pattern @@ -1875,6 +1859,40 @@ class FileStreamSourceSuite extends FileStreamSourceTest { fakeFileSystem.clearRecords() } + test("FileStreamSourceCleaner - archive - different filesystems between source and archive") { + val fakeFileSystem = new FakeFileSystem("fake") + val fakeFileSystem2 = new FakeFileSystem("fake2") + + val sourcePatternPath = new Path("/hello*/h{e,f}ll?") + val baseArchiveDirPath = new Path("/hello") + + val sourceCleaner = new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, + Some(fakeFileSystem2), Some(baseArchiveDirPath)) + + val sourcePath = new Path("/hello/hfllo/spark") + sourceCleaner.archive(FileEntry(sourcePath.toUri.getPath, 0, 0)) + + assertNoMove(fakeFileSystem) + assertNoMove(fakeFileSystem2) + } + + private def assertNoMove(fs: FakeFileSystem): Unit = { + assert(fs.requestsExists.isEmpty) + assert(fs.requestsMkdirs.isEmpty) + assert(fs.requestsRename.isEmpty) + } + + private def assertMoveFile( + fs: FakeFileSystem, + sourcePath: Path, + expectedArchivePath: Path): Unit = { + assert(fs.requestsExists.nonEmpty) + assert(fs.requestsExists.head === expectedArchivePath.getParent) + assert(fs.requestsMkdirs.isEmpty) + assert(fs.requestsRename.nonEmpty) + assert(fs.requestsRename.head === ((sourcePath, expectedArchivePath))) + } + private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = { assert(sourceDir.exists()) assert(sourceDir.list().exists(_.startsWith(filePrefix))) From b67778a08c600f80c2377dff1c876742f29b763d Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 26 Oct 2019 13:08:24 +0900 Subject: [PATCH 07/11] Simplify the condition to eliminate the needs to check the glob pattern, fail the query in prior if archive cannot be possible --- .../structured-streaming-programming-guide.md | 2 +- .../streaming/FileStreamSource.scala | 140 +++++------------- .../sql/streaming/FileStreamSourceSuite.scala | 77 ++-------- 3 files changed, 49 insertions(+), 170 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 87af6e20eb27e..aed9a477f39c1 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -548,7 +548,7 @@ Here are the details of all the sources in Spark. "s3a://a/b/c/dataset.txt"
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
- When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included as new source files.
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here This will ensure archived files are never included as new source files.
Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 2785ae653b83b..9c9ca24613b74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -368,52 +368,55 @@ object FileStreamSource { sourcePath: Path, baseArchiveFileSystem: Option[FileSystem], baseArchivePath: Option[Path]) extends Logging { - require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + assertParameters() - private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + private def assertParameters(): Unit = { + require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) - private val sameFsSourceAndArchive: Boolean = { - baseArchiveFileSystem.exists { fs => - if (fileSystem.getUri != fs.getUri) { - logWarning("Base archive path is located to the different filesystem with source, " + - s"which is not supported. source path: ${sourcePath} / base archive path: " + - s"${baseArchivePath.get}") - false - } else { - true - } + baseArchiveFileSystem.foreach { fs => + require(fileSystem.getUri == fs.getUri, "Base archive path is located to the different" + + s" filesystem with source, which is not supported. source path: $sourcePath" + + s" / base archive path: ${baseArchivePath.get}") } - } - /** - * This is a flag to skip matching archived path with source path. - * - * FileStreamSource reads the files which one of below conditions is met: - * 1) file itself is matched with source path - * 2) parent directory is matched with source path - * - * Checking with glob pattern is costly, so this flag leverages above information to prune - * the cases where the file cannot be matched with source path. For example, when file is - * moved to archive directory, destination path will retain input file's path as suffix, - * so destination path can't be matched with source path if archive directory's depth is - * longer than 2, as neither file nor parent directory of destination path can be matched - * with source path. - */ - private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 2) + baseArchivePath.foreach { path => + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so set this requirement to eliminate the cases + * where the archive path can be matched with source path. For example, when file is moved + * to archive directory, destination path will retain input file's path as suffix, so + * destination path can't be matched with source path if archive directory's depth is longer + * than 2, as neither file nor parent directory of destination path can be matched with + * source path. + */ + require(path.depth() > 2, "Base archive path must have a depth of at least 2 " + + "subdirectories. e.g. '/data/archive'") + } + } def archive(entry: FileEntry): Unit = { require(baseArchivePath.isDefined) - if (sameFsSourceAndArchive) { - val curPath = new Path(new URI(entry.path)) - val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) + val curPath = new Path(new URI(entry.path)) + val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) - if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) { - logWarning(s"Skip moving $curPath to $newPath - destination matches " + - s"to source path/pattern.") - } else { - doArchive(curPath, newPath) + try { + logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") + if (!fileSystem.exists(newPath.getParent)) { + fileSystem.mkdirs(newPath.getParent) } + + logDebug(s"Archiving completed file $curPath to $newPath") + if (!fileSystem.rename(curPath, newPath)) { + logWarning(s"Fail to move $curPath to $newPath / skip moving file.") + } + } catch { + case NonFatal(e) => + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) } } @@ -431,70 +434,5 @@ object FileStreamSource { logWarning(s"Fail to remove $curPath / skip removing file.", e) } } - - private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { - val filters = new scala.collection.mutable.MutableList[GlobFilter]() - - var currentPath = sourcePath - while (!currentPath.isRoot) { - filters += new GlobFilter(currentPath.getName) - currentPath = currentPath.getParent - } - - filters.toList - } - - /** - * This method checks whether the destination of archive file will be under the source path - * (which contains glob) to prevent the possibility of overwriting/re-reading as input. - */ - private def pathMatchesSourcePattern(archiveFile: Path): Boolean = { - var matched = true - - // new path will never match against source path when the depth is not a range of - // the depth of source path ~ (the depth of source path + 1) - // because the source files are picked when they match against source pattern or - // their parent directories match against source pattern - val depthSourcePattern = sourceGlobFilters.length - val depthArchiveFile = archiveFile.depth() - - var pathToCompare = if (depthArchiveFile == depthSourcePattern + 1) { - archiveFile.getParent - } else { - archiveFile - } - - // Now pathToCompare should have same depth as sourceGlobFilters.length - var index = 0 - do { - // GlobFilter only matches against its name, not full path so it's safe to compare - if (!sourceGlobFilters(index).accept(pathToCompare)) { - matched = false - } else { - pathToCompare = pathToCompare.getParent - index += 1 - } - } while (matched && !pathToCompare.isRoot) - - matched - } - - private def doArchive(sourcePath: Path, archivePath: Path): Unit = { - try { - logDebug(s"Creating directory if it doesn't exist ${archivePath.getParent}") - if (!fileSystem.exists(archivePath.getParent)) { - fileSystem.mkdirs(archivePath.getParent) - } - - logDebug(s"Archiving completed file $sourcePath to $archivePath") - if (!fileSystem.rename(sourcePath, archivePath)) { - logWarning(s"Fail to move $sourcePath to $archivePath / skip moving file.") - } - } catch { - case NonFatal(e) => - logWarning(s"Fail to move $sourcePath to $archivePath / skip moving file.", e) - } - } - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index cc9cb220b3b54..e291706bf4876 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1792,71 +1792,16 @@ class FileStreamSourceSuite extends FileStreamSourceTest { override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError } - test("FileStreamSourceCleaner - archive - destinations match against source pattern") { + test("FileStreamSourceCleaner - archive - base archive path depth <= 2") { val fakeFileSystem = new FakeFileSystem("fake") val sourcePatternPath = new Path("/hello*/h{e,f}ll?") val baseArchiveDirPath = new Path("/hello") - val sourceCleaner = new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, - Some(fakeFileSystem), Some(baseArchiveDirPath)) - - // file 1: /hello/helln - // final destination = /hello/hello/helln - // parent directory of final destination matches source pattern - val sourcePath1 = new Path("/hello/helln") - sourceCleaner.archive(FileEntry(sourcePath1.toUri.getPath, 0, 0)) - - assertNoMove(fakeFileSystem) - - fakeFileSystem.clearRecords() - - // file 2: /hello/hfllo/spark - // final destination = /hello/hello/hfllo/spark - // no match - val sourcePath2 = new Path("/hello/hfllo/spark") - val expectedDestPath2 = new Path("/hello/hello/hfllo/spark") - sourceCleaner.archive(FileEntry(sourcePath2.toUri.getPath, 0, 0)) - - assertMoveFile(fakeFileSystem, sourcePath2, expectedDestPath2) - - fakeFileSystem.clearRecords() - - // file 3: /hello1/hflln - // final destination = /hello/hello1/hflln - // no match - val sourcePath3 = new Path("/hello1/hflln") - val expectedDestPath3 = new Path("/hello/hello1/hflln") - sourceCleaner.archive(FileEntry(sourcePath3.toUri.getPath, 0, 0)) - - assertMoveFile(fakeFileSystem, sourcePath3, expectedDestPath3) - - fakeFileSystem.clearRecords() - - // corner case: this should end up with all archive destinations to be - // matched against source pattern - val baseArchiveDirPath2 = new Path("/") - - val sourceCleaner2 = new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, - Some(fakeFileSystem), Some(baseArchiveDirPath2)) - - // file 4 (& final destination): /hello/helln - // final destination matches source pattern - val sourcePath4 = new Path("/hello/helln") - sourceCleaner2.archive(FileEntry(sourcePath4.toUri.getPath, 0, 0)) - - assertNoMove(fakeFileSystem) - - fakeFileSystem.clearRecords() - - // file 5 (& final destination): /hello/hfllo/spark - // final destination matches source pattern - val sourcePath5 = new Path("/hello/hfllo/spark") - sourceCleaner2.archive(FileEntry(sourcePath5.toUri.getPath, 0, 0)) - - assertNoMove(fakeFileSystem) - - fakeFileSystem.clearRecords() + intercept[IllegalArgumentException] { + new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, + Some(fakeFileSystem), Some(baseArchiveDirPath)) + } } test("FileStreamSourceCleaner - archive - different filesystems between source and archive") { @@ -1866,14 +1811,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val sourcePatternPath = new Path("/hello*/h{e,f}ll?") val baseArchiveDirPath = new Path("/hello") - val sourceCleaner = new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, - Some(fakeFileSystem2), Some(baseArchiveDirPath)) - - val sourcePath = new Path("/hello/hfllo/spark") - sourceCleaner.archive(FileEntry(sourcePath.toUri.getPath, 0, 0)) - - assertNoMove(fakeFileSystem) - assertNoMove(fakeFileSystem2) + intercept[IllegalArgumentException] { + new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, + Some(fakeFileSystem2), Some(baseArchiveDirPath)) + } } private def assertNoMove(fs: FakeFileSystem): Unit = { From dd9d4ad14272801e1ce618851d1093884cbfc217 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 29 Oct 2019 04:12:27 +0900 Subject: [PATCH 08/11] Reflect review comments --- docs/structured-streaming-programming-guide.md | 4 ++-- .../spark/sql/execution/streaming/FileStreamSource.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index aed9a477f39c1..d399b37ab2106 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -548,8 +548,8 @@ Here are the details of all the sources in Spark. "s3a://a/b/c/dataset.txt"
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
- When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here This will ensure archived files are never included as new source files.
- Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here. This will ensure archived files are never included as new source files.
+ Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 9c9ca24613b74..ee54f03b52308 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -374,8 +374,8 @@ object FileStreamSource { require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) baseArchiveFileSystem.foreach { fs => - require(fileSystem.getUri == fs.getUri, "Base archive path is located to the different" + - s" filesystem with source, which is not supported. source path: $sourcePath" + + require(fileSystem.getUri == fs.getUri, "Base archive path is located on a different " + + s"file system than the source files. source path: $sourcePath" + s" / base archive path: ${baseArchivePath.get}") } From 178d2f4017b689cb8ad7112dd61ef3ee796026fb Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 30 Oct 2019 06:02:32 +0900 Subject: [PATCH 09/11] Reflect review comment: avoid Path constructor which has known issue --- .../apache/spark/sql/execution/streaming/FileStreamSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index ee54f03b52308..b3ea8a8616107 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -402,7 +402,7 @@ object FileStreamSource { require(baseArchivePath.isDefined) val curPath = new Path(new URI(entry.path)) - val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) + val newPath = new Path(baseArchivePath.get.toString.stripSuffix("/") + curPath.toUri.getPath) try { logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") From 21c71c456cf5b86675196942b022e060acd598b4 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 1 Nov 2019 09:21:46 +0900 Subject: [PATCH 10/11] Reflect review comments --- .../streaming/FileStreamSource.scala | 117 +++++++++--------- .../sql/streaming/FileStreamSourceSuite.scala | 55 ++------ 2 files changed, 67 insertions(+), 105 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index b3ea8a8616107..e0ca744d9d18f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -22,7 +22,8 @@ import java.util.concurrent.TimeUnit._ import scala.util.control.NonFatal -import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -55,17 +56,8 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contain glob patterns } - private val sourceCleaner: FileStreamSourceCleaner = { - val (archiveFs, qualifiedArchivePath) = sourceOptions.sourceArchiveDir match { - case Some(dir) => - val path = new Path(dir) - val fs = path.getFileSystem(hadoopConf) - (Some(fs), Some(fs.makeQualified(path))) - - case None => (None, None) - } - new FileStreamSourceCleaner(fs, qualifiedBasePath, archiveFs, qualifiedArchivePath) - } + private val sourceCleaner: Option[FileStreamSourceCleaner] = FileStreamSourceCleaner( + fs, qualifiedBasePath, sourceOptions, hadoopConf) private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ { if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) { @@ -275,24 +267,12 @@ class FileStreamSource( override def commit(end: Offset): Unit = { val logOffset = FileStreamSourceOffset(end).logOffset - if (sourceOptions.cleanSource != CleanSourceMode.OFF) { + sourceCleaner.foreach { cleaner => val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) val validFileEntities = files.filter(_.batchId == logOffset) logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") - sourceOptions.cleanSource match { - case CleanSourceMode.ARCHIVE => - validFileEntities.foreach(sourceCleaner.archive) - - case CleanSourceMode.DELETE => - validFileEntities.foreach(sourceCleaner.delete) - - case _ => - } - } else { - // No-op for now; FileStreamSource currently garbage-collects files based on timestamp - // and the value of the maxFileAge parameter. + validFileEntities.foreach(cleaner.clean) } - } override def stop(): Unit = {} @@ -363,46 +343,61 @@ object FileStreamSource { def size: Int = map.size() } - private[sql] class FileStreamSourceCleaner( + private[sql] trait FileStreamSourceCleaner { + def clean(entry: FileEntry): Unit + } + + private[sql] object FileStreamSourceCleaner { + def apply( + fileSystem: FileSystem, + sourcePath: Path, + option: FileStreamOptions, + hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match { + case CleanSourceMode.ARCHIVE => + require(option.sourceArchiveDir.isDefined) + val path = new Path(option.sourceArchiveDir.get) + val archiveFs = path.getFileSystem(hadoopConf) + val qualifiedArchivePath = archiveFs.makeQualified(path) + Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath)) + + case CleanSourceMode.DELETE => + Some(new SourceFileRemover(fileSystem)) + + case _ => None + } + } + + private[sql] class SourceFileArchiver( fileSystem: FileSystem, sourcePath: Path, - baseArchiveFileSystem: Option[FileSystem], - baseArchivePath: Option[Path]) extends Logging { + baseArchiveFileSystem: FileSystem, + baseArchivePath: Path) extends FileStreamSourceCleaner with Logging { assertParameters() private def assertParameters(): Unit = { - require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) - - baseArchiveFileSystem.foreach { fs => - require(fileSystem.getUri == fs.getUri, "Base archive path is located on a different " + - s"file system than the source files. source path: $sourcePath" + - s" / base archive path: ${baseArchivePath.get}") - } - - baseArchivePath.foreach { path => - - /** - * FileStreamSource reads the files which one of below conditions is met: - * 1) file itself is matched with source path - * 2) parent directory is matched with source path - * - * Checking with glob pattern is costly, so set this requirement to eliminate the cases - * where the archive path can be matched with source path. For example, when file is moved - * to archive directory, destination path will retain input file's path as suffix, so - * destination path can't be matched with source path if archive directory's depth is longer - * than 2, as neither file nor parent directory of destination path can be matched with - * source path. - */ - require(path.depth() > 2, "Base archive path must have a depth of at least 2 " + - "subdirectories. e.g. '/data/archive'") - } + require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " + + s"on a different file system than the source files. source path: $sourcePath" + + s" / base archive path: $baseArchivePath") + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so set this requirement to eliminate the cases + * where the archive path can be matched with source path. For example, when file is moved + * to archive directory, destination path will retain input file's path as suffix, so + * destination path can't be matched with source path if archive directory's depth is longer + * than 2, as neither file nor parent directory of destination path can be matched with + * source path. + */ + require(baseArchivePath.depth() > 2, "Base archive path must have at least 2 " + + "subdirectories from root directory. e.g. '/data/archive'") } - def archive(entry: FileEntry): Unit = { - require(baseArchivePath.isDefined) - + override def clean(entry: FileEntry): Unit = { val curPath = new Path(new URI(entry.path)) - val newPath = new Path(baseArchivePath.get.toString.stripSuffix("/") + curPath.toUri.getPath) + val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + curPath.toUri.getPath) try { logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") @@ -419,8 +414,12 @@ object FileStreamSource { logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) } } + } + + private[sql] class SourceFileRemover(fileSystem: FileSystem) + extends FileStreamSourceCleaner with Logging { - def delete(entry: FileEntry): Unit = { + override def clean(entry: FileEntry): Unit = { val curPath = new Path(new URI(entry.path)) try { logDebug(s"Removing completed file $curPath") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index e291706bf4876..7291fa6dfe02c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -32,7 +32,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, FileStreamSourceCleaner, SeenFilesMap} +import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap, SourceFileArchiver} import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ @@ -1740,30 +1740,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } class FakeFileSystem(scheme: String) extends FileSystem { - val requestsExists = new mutable.MutableList[Path]() - val requestsMkdirs = new mutable.MutableList[Path]() - val requestsRename = new mutable.MutableList[(Path, Path)]() + override def exists(f: Path): Boolean = true - override def exists(f: Path): Boolean = { - requestsExists += f - true - } - - override def mkdirs(f: Path, permission: FsPermission): Boolean = { - requestsMkdirs += f - true - } - - override def rename(src: Path, dst: Path): Boolean = { - requestsRename += ((src, dst)) - true - } + override def mkdirs(f: Path, permission: FsPermission): Boolean = true - def clearRecords(): Unit = { - requestsExists.clear() - requestsMkdirs.clear() - requestsRename.clear() - } + override def rename(src: Path, dst: Path): Boolean = true override def getUri: URI = URI.create(s"${scheme}:///") @@ -1792,19 +1773,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest { override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError } - test("FileStreamSourceCleaner - archive - base archive path depth <= 2") { + test("SourceFileArchiver - base archive path depth <= 2") { val fakeFileSystem = new FakeFileSystem("fake") val sourcePatternPath = new Path("/hello*/h{e,f}ll?") val baseArchiveDirPath = new Path("/hello") intercept[IllegalArgumentException] { - new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, - Some(fakeFileSystem), Some(baseArchiveDirPath)) + new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem, baseArchiveDirPath) } } - test("FileStreamSourceCleaner - archive - different filesystems between source and archive") { + test("SourceFileArchiver - different filesystems between source and archive") { val fakeFileSystem = new FakeFileSystem("fake") val fakeFileSystem2 = new FakeFileSystem("fake2") @@ -1812,28 +1792,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val baseArchiveDirPath = new Path("/hello") intercept[IllegalArgumentException] { - new FileStreamSourceCleaner(fakeFileSystem, sourcePatternPath, - Some(fakeFileSystem2), Some(baseArchiveDirPath)) + new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem2, + baseArchiveDirPath) } } - private def assertNoMove(fs: FakeFileSystem): Unit = { - assert(fs.requestsExists.isEmpty) - assert(fs.requestsMkdirs.isEmpty) - assert(fs.requestsRename.isEmpty) - } - - private def assertMoveFile( - fs: FakeFileSystem, - sourcePath: Path, - expectedArchivePath: Path): Unit = { - assert(fs.requestsExists.nonEmpty) - assert(fs.requestsExists.head === expectedArchivePath.getParent) - assert(fs.requestsMkdirs.isEmpty) - assert(fs.requestsRename.nonEmpty) - assert(fs.requestsRename.head === ((sourcePath, expectedArchivePath))) - } - private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = { assert(sourceDir.exists()) assert(sourceDir.list().exists(_.startsWith(filePrefix))) From 01f57506c9032d7a31bc383432ed09bf1801d402 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 1 Nov 2019 12:55:54 +0900 Subject: [PATCH 11/11] Remove unintentional newline --- .../apache/spark/sql/execution/streaming/FileStreamSource.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e0ca744d9d18f..35d486c7c7437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -243,7 +243,6 @@ class FileStreamSource( val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => (status.getPath.toUri.toString, status.getModificationTime) } - val endTime = System.nanoTime val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime) if (listingTimeMs > 2000) {