Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/log/AbstractIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,10 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
}

protected def safeForceUnmap(): Unit = {
try forceUnmap()
catch {
try {
if (mmap != null)
forceUnmap()
} catch {
case t: Throwable => error(s"Error unmapping index $file", t)
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,6 @@ class Log(@volatile var dir: File,

/**
* Close this log.
* The memory mapped buffer for index files of this log will be left open until the log is deleted.
*/
def close() {
debug("Closing log")
Expand All @@ -712,6 +711,7 @@ class Log(@volatile var dir: File,
// (the clean shutdown file is written after the logs are all closed).
producerStateManager.takeSnapshot()
logSegments.foreach(_.close())
isMemoryMappedBufferClosed = true
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we also set isMemoryMappedBufferClosed to true in close(), the first sentence in its comment, The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers(), is no longer accurate.

Can we update its comment to e.g. After memory mapped buffer is closed, all disk IO operation other than delete() on this log should throw KafkaStorageException.

}
}
}
Expand Down Expand Up @@ -1671,7 +1671,6 @@ class Log(@volatile var dir: File,
private[log] def delete() {
maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") {
lock synchronized {
checkIfMemoryMappedBufferClosed()
removeLogMetrics()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it would be more intuitive not to set isMemoryMappedBufferClosed = true in delete().

The reason is that, if we don't call checkIfMemoryMappedBufferClosed() in delete(), it suggests that delete() can be done anytime after close() is called, which is actually the case in logManager.replaceCurrentWithFutureLog() which calls addLogToBeDeleted(sourceLog) after sourceLog.close(). Then the intuition is that, if delete() can be done without considering the state (i.e. isMemoryMappedBufferClosed) of log, it probably should not change the state of log as well.

Another intuition is that, it seems weird if delete() sets isMemoryMappedBufferClosed to true and yet delete() can be called again without triggering KafkaStorageException.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question. It's a little unclear what's the best path as we don't generally require close to be called before delete for the various log/index classes. As a result, we have one case where we don't call Log.close at all (the LogManager.asyncDelete path) and rely on the Log.delete method to release resources. If we don't set isMemoryMappedBufferClosed in the delete method, then it would stay false for this path.

As it happens, the fact that we don't close the file handles before we rename the directory in LogManager.asyncDelete is one reason why things break on Windows.

Copy link
Copy Markdown
Member

@lindong28 lindong28 Oct 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the design for the Log.java API, it seems intuitive to only allow delete() after close() or closeHandlers() is invoked. delete() will only delete files on disk without reading or writing anything and thus it does not require any handler to be open for this log.

And it seems reasonable to enforce the rule that we always close a log before deleting it in e.g. LogManager.asyncDelete(). And delete() can throw IllegalStateException if isMemoryMappedBufferClosed is false.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this needs a bit more discussion. I propose we revert the change in 2.1 to close the index during Log.close so that the behaviour remains the same as in 2.0. In trunk, we can make the changes after we agree to them. How does that sound?

Copy link
Copy Markdown
Member

@lindong28 lindong28 Oct 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Sounds good. Thanks for the patch! The previous PR has been reverted in 2.1 branch.

logSegments.foreach(_.deleteIfExists())
segments.clear()
Expand Down