Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class FileStreamSource(
new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L)

private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")

/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger

Expand Down Expand Up @@ -256,8 +259,9 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in the future.
*/
override def commit(end: Offset): Unit = {
// No-op for now; FileStreamSource currently garbage-collects files based on timestamp
// and the value of the maxFileAge parameter.
if (currentLogOffset > minBatchesToRetain) {
metadataLog.purge(currentLogOffset - minBatchesToRetain)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the behavior of this when spark.sql.streaming.fileSource.log.deletion=false? Looks like HDFSMetadataLog.purge will always delete the files.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HDFSMetadataLog is not aware of such configuration.

Btw, I've put my observation on CompactibleFileStreamLog.purge() in comment on SPARK-20971.

https://issues.apache.org/jira/browse/SPARK-20971?focusedCommentId=16772632&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16772632

Let me quote it here:

Btw, calling purge breaks CompactibleFileStreamLog since CompactibleFileStreamLog expects non-compacted batches to be exist, but purge just removes all of metadata files matching criteria. The safest way seems to be just disallowing purge for CompactibleFileStreamLog, otherwise we have to concern about the intention of calling purge, like I was curious of rationalization of this issue like above.

So I've got a feeling that this may bring unexpected behavior and should be avoided.

}
}

override def stop() {}
Expand Down