From 52aaf5cd2b7d0e19ceb88bfe237aa006221d98a4 Mon Sep 17 00:00:00 2001 From: edoardo Date: Mon, 9 May 2016 16:48:07 +0100 Subject: [PATCH] KAFKA-3682 ArrayIndexOutOfBoundsExc from SkimpyOffsetMap.get() when full The number of attempts in looking for a value is now limited to the number of map slots - after the internal positionOf() method goes into linear search mode. Added unit test Co-developed with @mimaison --- core/src/main/scala/kafka/log/OffsetMap.scala | 5 +++++ .../test/scala/unit/kafka/log/OffsetMapTest.scala | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index 3893b2c7b1c28..f4530307df8a9 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -118,7 +118,12 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 + //we need to guard against attempt integer overflow if the map is full + //limit attempt to number of slots once positionOf(..) enters linear search mode + val maxAttempts = slots + hashSize - 4 do { + if(attempt >= maxAttempts) + return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala index f50daa43ef1d7..a5bec1766da70 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala @@ -44,7 +44,19 @@ class OffsetMapTest extends JUnitSuite { assertEquals(map.get(key(i)), -1L) } - def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes) + @Test + def testGetWhenFull() { + val map = new SkimpyOffsetMap(4096) + var i = 37L //any value would do + while (map.size < map.slots) { + map.put(key(i), i) + i = i + 1L + } + assertEquals(map.get(key(i)), -1L) + assertEquals(map.get(key(i-1L)), i-1L) + } + + def key(key: Long) = ByteBuffer.wrap(key.toString.getBytes) def validateMap(items: Int, loadFactor: Double = 0.5): SkimpyOffsetMap = { val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)