diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index a9e64c640042a..06161d8cd5df7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -63,6 +63,9 @@ class FileStreamSource( new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath) private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L) + private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive") + /** Maximum number of new files to be considered in each batch */ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger @@ -256,8 +259,9 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { - // No-op for now; FileStreamSource currently garbage-collects files based on timestamp - // and the value of the maxFileAge parameter. + if (currentLogOffset > minBatchesToRetain) { + metadataLog.purge(currentLogOffset - minBatchesToRetain) + } } override def stop() {}