From 1df968fd1c06ed27cc98c3d3c8378e4a5315949e Mon Sep 17 00:00:00 2001 From: ragnarok56 Date: Wed, 23 Aug 2023 08:33:00 -0400 Subject: [PATCH 1/6] allow for 0 maxCachedFiles --- .../streaming/FileStreamOptions.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 07c1ccc432cdb..288df7c46734d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -125,6 +125,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging matchedMode } + /** + * maximum number of files to cache to be processed in subsequent batches + */ + val maxCachedFiles: Int = parameters.get("maxCachedFiles").map { str => + Try(str.toInt).filter(_ >= 0).getOrElse { + throw new IllegalArgumentException( + s"Invalid value '$str' for option 'maxCachedFiles', must be an integer greater than or equal to 0") + } + }.getOrElse(10000) + + /** + * ratio of cached files to max files to allow for listing from input source when + * there are fewer cached files than could be available to be read + */ + val discardCachedFilesRatio: Float = parameters.get("discardCachedFilesRatio").map { str => + Try(str.toFloat).filter(x => 0 <= x && x <= 1).getOrElse { + throw new IllegalArgumentException( + s"Invalid value '$str' for option 'discardCachedFilesRatio', must be a positive float " + + "between 0 and 1" + ) + } + }.getOrElse(0.2f) + private def withBooleanParameter(name: String, default: Boolean) = { parameters.get(name).map { str => try { From f11045b97b847eb8a57f59c1032381386e2669af Mon Sep 17 00:00:00 2001 From: ragnarok56 Date: Wed, 23 Aug 2023 22:59:45 -0400 Subject: [PATCH 2/6] scalastyle --- .../spark/sql/execution/streaming/FileStreamOptions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 288df7c46734d..fd44832c4559d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -131,7 +131,8 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging val maxCachedFiles: Int = parameters.get("maxCachedFiles").map { str => Try(str.toInt).filter(_ >= 0).getOrElse { throw new IllegalArgumentException( - s"Invalid value '$str' for option 'maxCachedFiles', must be an integer greater than or equal to 0") + s"Invalid value '$str' for option 'maxCachedFiles', must be an integer greater than or " + + "equal to 0") } }.getOrElse(10000) From 96eb729f0932bbabdf0bd1c46963a31a06b4211f Mon Sep 17 00:00:00 2001 From: ragnarok56 Date: Tue, 22 Aug 2023 21:55:02 -0400 Subject: [PATCH 3/6] Add settings for cached files in streaming --- .../structured-streaming-programming-guide.md | 4 + .../streaming/FileStreamSource.scala | 14 +-- .../sql/streaming/FileStreamSourceSuite.scala | 93 ++++++++++++++++++- 3 files changed, 104 insertions(+), 7 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index f3a8a0a40694b..8a66001708678 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -574,6 +574,10 @@ Here are the details of all the sources in Spark.
maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger or maxBytesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
+ maxCachedFiles: maximum number of files to cache to be processed in subsequent batches (default: 10000). If files are available in the cache, they will be read from first before listing from the input source. +
+ discardCachedFilesRatio: ratio of cached files to max files to allow for listing from input source when there are fewer cached files than could be available to be read (default: 0.2). For example, if there are only 10 cached files remaining for a batch but the maxFilesPerTrigger is set to 100, the 10 cached files would be discarded and a new listing would be performed instead. +
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 2f5bb2f010e9c..490a41eae7b5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -114,6 +114,11 @@ class FileStreamSource( "the same and causes data lost.") } + + private val maxCachedFiles = sourceOptions.maxCachedFiles + + private val discardCachedFilesRatio = sourceOptions.discardCachedFilesRatio + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) @@ -184,7 +189,7 @@ class FileStreamSource( case files: ReadMaxFiles if !sourceOptions.latestFirst => // we can cache and reuse remaining fetched list of files in further batches val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) - if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_INPUT_RATIO) { + if (usFiles.size < files.maxFiles() * discardCachedFilesRatio) { // Discard unselected files if the number of files are smaller than threshold. // This is to avoid the case when the next batch would have too few files to read // whereas there're new files available. @@ -221,8 +226,8 @@ class FileStreamSource( } if (unselectedFiles != null && unselectedFiles.nonEmpty) { - logTrace(s"Taking first $MAX_CACHED_UNSEEN_FILES unread files.") - unreadFiles = unselectedFiles.take(MAX_CACHED_UNSEEN_FILES) + logTrace(s"Taking first $maxCachedFiles unread files.") + unreadFiles = unselectedFiles.take(maxCachedFiles) logTrace(s"${unreadFiles.size} unread files are available for further batches.") } else { unreadFiles = null @@ -426,9 +431,6 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - val DISCARD_UNSEEN_INPUT_RATIO = 0.2 - val MAX_CACHED_UNSEEN_FILES = 10000 - case class FileEntry( path: String, // uri-encoded path string timestamp: Timestamp, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fd3d59af7e6b8..1c856a7929cd2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2305,7 +2305,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } // batch 5 will trigger list operation though the batch 4 should have 1 unseen file: - // 1 is smaller than the threshold (refer FileStreamSource.DISCARD_UNSEEN_FILES_RATIO), + // 1 is smaller than the threshold (refer FileStreamOptions.discardCachedFilesRatio), // hence unseen files for batch 4 will be discarded. val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) .asInstanceOf[FileStreamSourceOffset] @@ -2357,6 +2357,97 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("Options for caching unread files") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10", + "maxCachedFiles" -> "12", "discardCachedFilesRatio" -> "0.1") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) + val metadataLog = source invokePrivate _metadataLog() + + def verifyBatch( + offset: FileStreamSourceOffset, + expectedBatchId: Long, + inputFiles: Seq[File], + expectedFileOffset: Int, + expectedFilesInBatch: Int, + expectedListingCount: Int): Unit = { + val batchId = offset.logOffset + assert(batchId === expectedBatchId) + + val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry]) + assert(files.forall(_.batchId == batchId)) + + val actualInputFiles = files.map { p => p.sparkPath.toUri.getPath } + val expectedInputFiles = inputFiles.slice( + expectedFileOffset, + expectedFileOffset + expectedFilesInBatch + ) + .map(_.getCanonicalPath) + assert(actualInputFiles === expectedInputFiles) + + assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + + CountListingLocalFileSystem.resetCount() + + // provide 44 files in src, with sequential "last modified" to guarantee ordering + val inputFiles = (0 to 43).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(idx * 10000) + f + } + + // first 3 batches only perform 1 listing + // batch 0 processes 10 (12 cached) + // batch 1 processes 10 from cache (2 cached) + // batch 2 processes 2 from cache (0 cached) since + // discardCachedFilesRatio is less than threshold + val offsetBatch0 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch0, expectedBatchId = 0, inputFiles, + expectedFileOffset = 0, expectedFilesInBatch = 10, expectedListingCount = 1) + val offsetBatch1 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch1, expectedBatchId = 1, inputFiles, + expectedFileOffset = 10, expectedFilesInBatch = 10, expectedListingCount = 1) + val offsetBatch2 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch2, expectedBatchId = 2, inputFiles, + expectedFileOffset = 20, expectedFilesInBatch = 2, expectedListingCount = 1) + + // next 3 batches perform another listing + // batch 3 processes 10 (12 cached) + // batch 4 processes 10 from cache (2 cached) + // batch 5 processes 2 from cache (0 cached) since + // discardCachedFilesRatio is less than threshold + val offsetBatch3 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch3, expectedBatchId = 3, inputFiles, + expectedFileOffset = 22, expectedFilesInBatch = 10, expectedListingCount = 2) + val offsetBatch4 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch4, expectedBatchId = 4, inputFiles, + expectedFileOffset = 32, expectedFilesInBatch = 10, expectedListingCount = 2) + val offsetBatch5 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch5, expectedBatchId = 5, inputFiles, + expectedFileOffset = 42, expectedFilesInBatch = 2, expectedListingCount = 2) + + // validate no remaining files and another listing is performed + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + assert(5 === offsetBatch.logOffset) + assert(3 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + } + } + test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") { def formatTime(time: LocalDateTime): String = { time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) From 6198e44dac031417411ea38e73c466f909957b33 Mon Sep 17 00:00:00 2001 From: ragnarok56 Date: Wed, 6 Mar 2024 20:50:47 -0500 Subject: [PATCH 4/6] update for maxBytesPerTrigger changes --- docs/structured-streaming-programming-guide.md | 2 +- .../spark/sql/execution/streaming/FileStreamOptions.scala | 8 ++++---- .../spark/sql/execution/streaming/FileStreamSource.scala | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 8a66001708678..89155da51a399 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -576,7 +576,7 @@ Here are the details of all the sources in Spark.
maxCachedFiles: maximum number of files to cache to be processed in subsequent batches (default: 10000). If files are available in the cache, they will be read from first before listing from the input source.
- discardCachedFilesRatio: ratio of cached files to max files to allow for listing from input source when there are fewer cached files than could be available to be read (default: 0.2). For example, if there are only 10 cached files remaining for a batch but the maxFilesPerTrigger is set to 100, the 10 cached files would be discarded and a new listing would be performed instead. + discardCachedInputRatio: ratio of cached files/bytes to max files/bytes to allow for listing from input source when there is less cached input than could be available to be read (default: 0.2). For example, if there are only 10 cached files remaining for a batch but the maxFilesPerTrigger is set to 100, the 10 cached files would be discarded and a new listing would be performed instead. Similarly, if there are cached files that are 10 MB remaining for a batch, but the maxBytesPerTrigger is set to 100MB, the cached files would be discarded.
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index fd44832c4559d..b259f9dbcdcb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -137,13 +137,13 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging }.getOrElse(10000) /** - * ratio of cached files to max files to allow for listing from input source when - * there are fewer cached files than could be available to be read + * ratio of cached input to max files/bytes to allow for listing from input source when + * there are fewer cached files/bytes than could be available to be read */ - val discardCachedFilesRatio: Float = parameters.get("discardCachedFilesRatio").map { str => + val discardCachedInputRatio: Float = parameters.get("discardCachedInputRatio").map { str => Try(str.toFloat).filter(x => 0 <= x && x <= 1).getOrElse { throw new IllegalArgumentException( - s"Invalid value '$str' for option 'discardCachedFilesRatio', must be a positive float " + + s"Invalid value '$str' for option 'discardCachedInputRatio', must be a positive float " + "between 0 and 1" ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 490a41eae7b5e..373a122e00018 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -117,7 +117,7 @@ class FileStreamSource( private val maxCachedFiles = sourceOptions.maxCachedFiles - private val discardCachedFilesRatio = sourceOptions.discardCachedFilesRatio + private val discardCachedInputRatio = sourceOptions.discardCachedInputRatio /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. @@ -189,7 +189,7 @@ class FileStreamSource( case files: ReadMaxFiles if !sourceOptions.latestFirst => // we can cache and reuse remaining fetched list of files in further batches val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) - if (usFiles.size < files.maxFiles() * discardCachedFilesRatio) { + if (usFiles.size < files.maxFiles() * discardCachedInputRatio) { // Discard unselected files if the number of files are smaller than threshold. // This is to avoid the case when the next batch would have too few files to read // whereas there're new files available. @@ -207,7 +207,7 @@ class FileStreamSource( // we can cache and reuse remaining fetched list of files in further batches val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) = takeFilesUntilMax(newFiles, files.maxBytes()) - if (rSize.toDouble < (files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) { + if (rSize.toDouble < (files.maxBytes() * discardCachedInputRatio)) { // Discard unselected files if the total size of files is smaller than threshold. // This is to avoid the case when the next batch would have too small of a size of // files to read whereas there're new files available. From 3fbd3688ec0ab08fcfd794e774139482783a6a75 Mon Sep 17 00:00:00 2001 From: ragnarok56 Date: Wed, 6 Mar 2024 20:53:40 -0500 Subject: [PATCH 5/6] fix typo in docs --- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 89155da51a399..fabe7f17b78b3 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -576,7 +576,7 @@ Here are the details of all the sources in Spark.
maxCachedFiles: maximum number of files to cache to be processed in subsequent batches (default: 10000). If files are available in the cache, they will be read from first before listing from the input source.
- discardCachedInputRatio: ratio of cached files/bytes to max files/bytes to allow for listing from input source when there is less cached input than could be available to be read (default: 0.2). For example, if there are only 10 cached files remaining for a batch but the maxFilesPerTrigger is set to 100, the 10 cached files would be discarded and a new listing would be performed instead. Similarly, if there are cached files that are 10 MB remaining for a batch, but the maxBytesPerTrigger is set to 100MB, the cached files would be discarded. + discardCachedInputRatio: ratio of cached files/bytes to max files/bytes to allow for listing from input source when there is less cached input than could be available to be read (default: 0.2). For example, if there are only 10 cached files remaining for a batch but the maxFilesPerTrigger is set to 100, the 10 cached files would be discarded and a new listing would be performed instead. Similarly, if there are cached files that are 10 MB remaining for a batch, but the maxBytesPerTrigger is set to 100MB, the cached files would be discarded.
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
From 473cd39b079196056df8d9fe5cc84d02c84cd40f Mon Sep 17 00:00:00 2001 From: ragnarok56 Date: Wed, 6 Mar 2024 20:55:50 -0500 Subject: [PATCH 6/6] fix tests --- .../spark/sql/streaming/FileStreamSourceSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 1c856a7929cd2..ff3cc5c247dfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2305,7 +2305,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } // batch 5 will trigger list operation though the batch 4 should have 1 unseen file: - // 1 is smaller than the threshold (refer FileStreamOptions.discardCachedFilesRatio), + // 1 is smaller than the threshold (refer FileStreamOptions.discardCachedInputRatio), // hence unseen files for batch 4 will be discarded. val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) .asInstanceOf[FileStreamSourceOffset] @@ -2361,7 +2361,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { withCountListingLocalFileSystemAsLocalFileSystem { withThreeTempDirs { case (src, meta, tmp) => val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10", - "maxCachedFiles" -> "12", "discardCachedFilesRatio" -> "0.1") + "maxCachedFiles" -> "12", "discardCachedInputRatio" -> "0.1") val scheme = CountListingLocalFileSystem.scheme val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", StructType(Nil), Seq.empty, meta.getCanonicalPath, options) @@ -2406,7 +2406,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // batch 0 processes 10 (12 cached) // batch 1 processes 10 from cache (2 cached) // batch 2 processes 2 from cache (0 cached) since - // discardCachedFilesRatio is less than threshold + // discardCachedInputRatio is less than threshold val offsetBatch0 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) .asInstanceOf[FileStreamSourceOffset] verifyBatch(offsetBatch0, expectedBatchId = 0, inputFiles, @@ -2424,7 +2424,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // batch 3 processes 10 (12 cached) // batch 4 processes 10 from cache (2 cached) // batch 5 processes 2 from cache (0 cached) since - // discardCachedFilesRatio is less than threshold + // discardCachedInputRatio is less than threshold val offsetBatch3 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) .asInstanceOf[FileStreamSourceOffset] verifyBatch(offsetBatch3, expectedBatchId = 3, inputFiles,