From 804116d181d466e0ab61216a89b3ff831e93f19e Mon Sep 17 00:00:00 2001 From: lizziewei Date: Thu, 25 Jul 2013 00:01:34 -0700 Subject: [PATCH 1/2] Unmap before resizing --- core/src/main/scala/kafka/log/OffsetIndex.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 9de3d31d7c8e9..71bd08fef7ad7 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -259,9 +259,10 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = flush() val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) + val position = this.mmap.position try { + (this.mmap.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() raf.setLength(roundedNewSize) - val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) this.mmap.position(position) } finally { From da110ce645a20c1ebc194d00280f7976984a02b1 Mon Sep 17 00:00:00 2001 From: lizziew Date: Mon, 12 Aug 2013 21:32:23 -0700 Subject: [PATCH 2/2] Finetune patch --- core/src/main/scala/kafka/log/OffsetIndex.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 71bd08fef7ad7..b9b3b0adeb739 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -248,6 +248,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = } } + def tryUnmap(m: MappedByteBuffer) { + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) + (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + } + /** * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at @@ -261,7 +266,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = val roundedNewSize = roundToExactMultiple(newSize, 8) val position = this.mmap.position try { - (this.mmap.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + tryUnmap(this.mmap) raf.setLength(roundedNewSize) this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) this.mmap.position(position)