diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index bf5cc252f1574..547aa12ef0960 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -305,8 +305,10 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon } protected def safeForceUnmap(): Unit = { - try forceUnmap() - catch { + try { + if (mmap != null) + forceUnmap() + } catch { case t: Throwable => error(s"Error unmapping index $file", t) } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index bc328d77efc86..2e25754c00d37 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -700,7 +700,6 @@ class Log(@volatile var dir: File, /** * Close this log. - * The memory mapped buffer for index files of this log will be left open until the log is deleted. */ def close() { debug("Closing log") @@ -712,6 +711,7 @@ class Log(@volatile var dir: File, // (the clean shutdown file is written after the logs are all closed). producerStateManager.takeSnapshot() logSegments.foreach(_.close()) + isMemoryMappedBufferClosed = true } } } @@ -1671,7 +1671,6 @@ class Log(@volatile var dir: File, private[log] def delete() { maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { lock synchronized { - checkIfMemoryMappedBufferClosed() removeLogMetrics() logSegments.foreach(_.deleteIfExists()) segments.clear()