diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 9de3d31d7c8e9..b9b3b0adeb739 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -248,6 +248,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = } } + def tryUnmap(m: MappedByteBuffer) { + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) + (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + } + /** * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at @@ -259,9 +264,10 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = flush() val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) + val position = this.mmap.position try { + tryUnmap(this.mmap) raf.setLength(roundedNewSize) - val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) this.mmap.position(position) } finally {