From a33518164bcf9545e0223ae6280341e340ded41d Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 7 Oct 2018 17:54:51 -0700 Subject: [PATCH 1/2] MINOR: AbstractIndex.close should unmap --- core/src/main/scala/kafka/log/AbstractIndex.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index ec9d55f89accb..eb71b03006e96 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -251,6 +251,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon /** Close the index */ def close() { trimToValidSize() + closeHandler() } def closeHandler(): Unit = { From a37aabd2114346a36e74c7cc4a47d623cdf2fd6b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 7 Oct 2018 21:16:06 -0700 Subject: [PATCH 2/2] Call `closeHandler` from `deleteIfExists` --- core/src/main/scala/kafka/log/AbstractIndex.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index eb71b03006e96..bf5cc252f1574 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -223,13 +223,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon * not exist */ def deleteIfExists(): Boolean = { - inLock(lock) { - // On JVM, a memory mapping is typically unmapped by garbage collector. - // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. - // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. - // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. - safeForceUnmap() - } + closeHandler() Files.deleteIfExists(file.toPath) } @@ -255,6 +249,10 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon } def closeHandler(): Unit = { + // On JVM, a memory mapping is typically unmapped by garbage collector. + // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. + // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. + // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. inLock(lock) { safeForceUnmap() }