diff --git a/server/src/main/java/io/druid/client/cache/ByteCountingLRUMap.java b/server/src/main/java/io/druid/client/cache/ByteCountingLRUMap.java index 1f4b1ce6211f..fd632f7445a9 100644 --- a/server/src/main/java/io/druid/client/cache/ByteCountingLRUMap.java +++ b/server/src/main/java/io/druid/client/cache/ByteCountingLRUMap.java @@ -19,20 +19,23 @@ package io.druid.client.cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.cache.Weigher; import com.metamx.common.logger.Logger; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.Set; /** -*/ -class ByteCountingLRUMap extends LinkedHashMap + */ +class ByteCountingLRUMap { private static final Logger log = new Logger(ByteCountingLRUMap.class); + private final com.google.common.cache.Cache cache; private final boolean logEvictions; private final int logEvictionCount; private final long sizeInBytes; @@ -44,22 +47,56 @@ public ByteCountingLRUMap( final long sizeInBytes ) { - this(16, 0, sizeInBytes); + this(16, 1, 0, sizeInBytes); } public ByteCountingLRUMap( final int initialSize, + final int concurrencyLevel, final int logEvictionCount, final long sizeInBytes ) { - super(initialSize, 0.75f, true); this.logEvictionCount = logEvictionCount; this.sizeInBytes = sizeInBytes; logEvictions = logEvictionCount != 0; numBytes = 0; evictionCount = 0; + + RemovalListener listener = new RemovalListener() + { + @Override + public void onRemoval(RemovalNotification entry) + { + ++evictionCount; + if (logEvictions && evictionCount % logEvictionCount == 0) { + log.info( + "Evicting %,dth element. Size[%,d], numBytes[%,d], averageSize[%,d]", + evictionCount, + size(), + numBytes, + numBytes / size() + ); + } + numBytes -= entry.getKey().remaining() + entry.getValue().length; + } + }; + cache = CacheBuilder + .newBuilder() + .initialCapacity(initialSize) + .concurrencyLevel(concurrencyLevel) + .maximumWeight(sizeInBytes) + .removalListener(listener) + .weigher(new Weigher() + { + public int weigh(ByteBuffer key, byte[] value) + { + int weigh = key.remaining() + value.length; + numBytes += weigh; + return weigh; + } + }).build(); } public long getNumBytes() @@ -72,41 +109,20 @@ public long getEvictionCount() return evictionCount; } - @Override - public byte[] put(ByteBuffer key, byte[] value) + public byte[] get(ByteBuffer key) { - numBytes += key.remaining() + value.length; - return super.put(key, value); + return cache.getIfPresent(key); } - @Override - protected boolean removeEldestEntry(Map.Entry eldest) + public void put(ByteBuffer key, byte[] value) { - if (numBytes > sizeInBytes) { - ++evictionCount; - if (logEvictions && evictionCount % logEvictionCount == 0) { - log.info( - "Evicting %,dth element. Size[%,d], numBytes[%,d], averageSize[%,d]", - evictionCount, - size(), - numBytes, - numBytes / size() - ); - } - - numBytes -= eldest.getKey().remaining() + eldest.getValue().length; - return true; - } - return false; + cache.put(key, value); } - @Override public byte[] remove(Object key) { - byte[] value = super.remove(key); - if(value != null) { - numBytes -= ((ByteBuffer)key).remaining() + value.length; - } + byte[] value = cache.getIfPresent(key); + cache.invalidate(key); return value; } @@ -114,16 +130,18 @@ public byte[] remove(Object key) * Don't allow key removal using the underlying keySet iterator * All removal operations must use ByteCountingLRUMap.remove() */ - @Override public Set keySet() { - return Collections.unmodifiableSet(super.keySet()); + return Collections.unmodifiableSet(cache.asMap().keySet()); } - @Override public void clear() { - numBytes = 0; - super.clear(); + cache.invalidateAll(); + } + + public long size() + { + return cache.size(); } } diff --git a/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java b/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java index e8e724fd7780..749d2b20f50e 100644 --- a/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java +++ b/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java @@ -35,6 +35,10 @@ public class LocalCacheProvider implements CacheProvider @Min(0) private int initialSize = 500000; + @JsonProperty + @Min(0) + private int concurrencyLevel = 4; + @JsonProperty @Min(0) private int logEvictionCount = 0; @@ -42,6 +46,6 @@ public class LocalCacheProvider implements CacheProvider @Override public Cache get() { - return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes)); + return new MapCache(new ByteCountingLRUMap(initialSize, concurrencyLevel, logEvictionCount, sizeInBytes)); } } diff --git a/server/src/main/java/io/druid/client/cache/MapCache.java b/server/src/main/java/io/druid/client/cache/MapCache.java index 7dfac4c233de..0368af20081e 100644 --- a/server/src/main/java/io/druid/client/cache/MapCache.java +++ b/server/src/main/java/io/druid/client/cache/MapCache.java @@ -25,7 +25,6 @@ import com.metamx.emitter.service.ServiceEmitter; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -41,7 +40,6 @@ public static Cache create(long sizeInBytes) return new MapCache(new ByteCountingLRUMap(sizeInBytes)); } - private final Map baseMap; private final ByteCountingLRUMap byteCountingLRUMap; private final Map namespaceId; @@ -57,7 +55,6 @@ public static Cache create(long sizeInBytes) ) { this.byteCountingLRUMap = byteCountingLRUMap; - this.baseMap = Collections.synchronizedMap(byteCountingLRUMap); namespaceId = Maps.newHashMap(); ids = new AtomicInteger(); @@ -82,7 +79,7 @@ public byte[] get(NamedKey key) { final byte[] retVal; synchronized (clearLock) { - retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key)); + retVal = byteCountingLRUMap.get(computeKey(getNamespaceId(key.namespace), key.key)); } if (retVal == null) { missCount.incrementAndGet(); @@ -96,7 +93,7 @@ public byte[] get(NamedKey key) public void put(NamedKey key, byte[] value) { synchronized (clearLock) { - baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value); + byteCountingLRUMap.put(computeKey(getNamespaceId(key.namespace), key.key), value); } } @@ -126,7 +123,7 @@ public void close(String namespace) namespaceId.remove(namespace); } synchronized (clearLock) { - Iterator iter = baseMap.keySet().iterator(); + Iterator iter = byteCountingLRUMap.keySet().iterator(); List toRemove = Lists.newLinkedList(); while (iter.hasNext()) { ByteBuffer next = iter.next(); @@ -139,7 +136,7 @@ public void close(String namespace) } } for (ByteBuffer key : toRemove) { - baseMap.remove(key); + byteCountingLRUMap.remove(key); } } } diff --git a/server/src/test/java/io/druid/client/cache/ByteCountingLRUMapTest.java b/server/src/test/java/io/druid/client/cache/ByteCountingLRUMapTest.java index 9fce4868f761..00400a8e7bc5 100644 --- a/server/src/test/java/io/druid/client/cache/ByteCountingLRUMapTest.java +++ b/server/src/test/java/io/druid/client/cache/ByteCountingLRUMapTest.java @@ -65,8 +65,13 @@ public void testSanity() throws Exception Assert.assertEquals(ByteBuffer.wrap(eightyEightVal), ByteBuffer.wrap(map.get(tenKey))); map.put(twoByte, oneByte.array()); - assertMapValues(2, 101, 2); - Assert.assertEquals(ByteBuffer.wrap(eightyEightVal), ByteBuffer.wrap(map.get(tenKey))); + assertMapValues(1, 3, 3); + Assert.assertNull(map.get(tenKey)); + Assert.assertEquals(oneByte, ByteBuffer.wrap(map.get(twoByte))); + + map.put(tenKey, twoByte.array()); + assertMapValues(2, 15, 3); + Assert.assertEquals(twoByte, ByteBuffer.wrap(map.get(tenKey))); Assert.assertEquals(oneByte, ByteBuffer.wrap(map.get(twoByte))); Iterator it = map.keySet().iterator(); @@ -80,10 +85,10 @@ public void testSanity() throws Exception for(ByteBuffer buf : toRemove) { map.remove(buf); } - assertMapValues(1, 3, 2); + assertMapValues(1, 3, 4); map.remove(twoByte); - assertMapValues(0, 0, 2); + assertMapValues(0, 0, 5); } private void assertMapValues(final int size, final int numBytes, final int evictionCount)