From bfa6dfbeede444ffd1cb704741cb1b137f927710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 20 Oct 2015 22:54:44 -0700 Subject: [PATCH 1/2] cleanup cache interface --- .../main/java/io/druid/client/cache/Cache.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/io/druid/client/cache/Cache.java b/server/src/main/java/io/druid/client/cache/Cache.java index 4462b75aa619..70181b1ab587 100644 --- a/server/src/main/java/io/druid/client/cache/Cache.java +++ b/server/src/main/java/io/druid/client/cache/Cache.java @@ -30,8 +30,8 @@ */ public interface Cache { - public byte[] get(NamedKey key); - public void put(NamedKey key, byte[] value); + byte[] get(NamedKey key); + void put(NamedKey key, byte[] value); /** * Resulting map should not contain any null values (i.e. cache misses should not be included) @@ -39,21 +39,21 @@ public interface Cache * @param keys * @return */ - public Map getBulk(Iterable keys); + Map getBulk(Iterable keys); - public void close(String namespace); + void close(String namespace); - public CacheStats getStats(); + CacheStats getStats(); - public boolean isLocal(); + boolean isLocal(); /** * Custom metrics not covered by CacheStats may be emitted by this method. * @param emitter The service emitter to emit on. */ - public void doMonitor(ServiceEmitter emitter); + void doMonitor(ServiceEmitter emitter); - public class NamedKey + class NamedKey { final public String namespace; final public byte[] key; From b456e6225b850b0e1511eec2793c3fccaf3dd4c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 20 Oct 2015 22:55:41 -0700 Subject: [PATCH 2/2] replace ByteCountingLRUMap with Guava Cache --- .../client/cache/LocalCacheProvider.java | 62 +++++++- .../java/io/druid/client/cache/MapCache.java | 139 ++++++++---------- .../druid/client/cache/HybridCacheTest.java | 4 +- .../io/druid/client/cache/MapCacheTest.java | 16 +- 4 files changed, 127 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java b/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java index f602284af297..4eebbc2ff2a9 100644 --- a/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java +++ b/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java @@ -17,14 +17,21 @@ package io.druid.client.cache; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.query.DruidProcessingConfig; import javax.validation.constraints.Min; +import java.util.Map; /** */ public class LocalCacheProvider implements CacheProvider { + public static final NoopCache NOOP_CACHE = new NoopCache(); + @JsonProperty @Min(0) private long sizeInBytes = 0; @@ -33,13 +40,60 @@ public class LocalCacheProvider implements CacheProvider @Min(0) private int initialSize = 500000; - @JsonProperty - @Min(0) - private int logEvictionCount = 0; + @JacksonInject + private DruidProcessingConfig config = null; @Override public Cache get() { - return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes)); + if(sizeInBytes > 0) { + return MapCache.create(sizeInBytes, initialSize, config.getNumThreads()); + } else { + return NOOP_CACHE; + } + } + + private static class NoopCache implements Cache + { + @Override + public byte[] get(NamedKey key) + { + return null; + } + + @Override + public void put(NamedKey key, byte[] value) + { + } + + @Override + public Map getBulk(Iterable keys) + { + return ImmutableMap.of(); + } + + @Override + public void close(String namespace) + { + + } + + @Override + public CacheStats getStats() + { + return new CacheStats(0,0,0,0,0,0,0); + } + + @Override + public boolean isLocal() + { + return true; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + + } } } diff --git a/server/src/main/java/io/druid/client/cache/MapCache.java b/server/src/main/java/io/druid/client/cache/MapCache.java index c0b83f539bdc..d5c9ec824788 100644 --- a/server/src/main/java/io/druid/client/cache/MapCache.java +++ b/server/src/main/java/io/druid/client/cache/MapCache.java @@ -17,59 +17,77 @@ package io.druid.client.cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.Weigher; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.primitives.Ints; import com.metamx.emitter.service.ServiceEmitter; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; /** */ public class MapCache implements Cache { - public static Cache create(long sizeInBytes) + public static MapCache create(long sizeInBytes) { + return create(sizeInBytes, 0, 4); + } + public static MapCache create(long sizeInBytes, int initialCapacity, int concurrencyLevel) { - return new MapCache(new ByteCountingLRUMap(sizeInBytes)); + return new MapCache(sizeInBytes, initialCapacity, concurrencyLevel); } - private final Map baseMap; - private final ByteCountingLRUMap byteCountingLRUMap; - - private final Map namespaceId; - private final AtomicInteger ids; - - private final Object clearLock = new Object(); + private final com.google.common.cache.Cache baseCache; + private final com.google.common.cache.LoadingCache namespaceId; + private final AtomicInteger ids = new AtomicInteger(); - private final AtomicLong hitCount = new AtomicLong(0); - private final AtomicLong missCount = new AtomicLong(0); - - MapCache( - ByteCountingLRUMap byteCountingLRUMap - ) + private MapCache(long sizeInBytes, int initialCapacity, int concurrencyLevel) { - this.byteCountingLRUMap = byteCountingLRUMap; - this.baseMap = Collections.synchronizedMap(byteCountingLRUMap); - - namespaceId = Maps.newHashMap(); - ids = new AtomicInteger(); + this.baseCache = CacheBuilder + .newBuilder() + .initialCapacity(initialCapacity) + .maximumWeight(sizeInBytes) + .recordStats() + .concurrencyLevel(concurrencyLevel) + .weigher( + new Weigher() + { + @Override + public int weigh(ByteBuffer key, byte[] value) + { + return key.remaining() + value.length; + } + } + ) + .build(); + + this.namespaceId = CacheBuilder + .newBuilder() + .concurrencyLevel(concurrencyLevel) + .build(new CacheLoader() + { + @Override + public Integer load(String namespace) throws Exception + { + return ids.getAndIncrement(); + } + }); } @Override public CacheStats getStats() { + final com.google.common.cache.CacheStats stats = baseCache.stats(); return new CacheStats( - hitCount.get(), - missCount.get(), - byteCountingLRUMap.size(), - byteCountingLRUMap.getNumBytes(), - byteCountingLRUMap.getEvictionCount(), + stats.hitCount(), + stats.missCount(), + baseCache.size(), + -1, + stats.evictionCount(), 0, 0 ); @@ -78,24 +96,13 @@ public CacheStats getStats() @Override public byte[] get(NamedKey key) { - final byte[] retVal; - synchronized (clearLock) { - retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key)); - } - if (retVal == null) { - missCount.incrementAndGet(); - } else { - hitCount.incrementAndGet(); - } - return retVal; + return baseCache.getIfPresent(computeKey(namespaceId.getUnchecked(key.namespace), key.key)); } @Override public void put(NamedKey key, byte[] value) { - synchronized (clearLock) { - baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value); - } + baseCache.put(computeKey(namespaceId.getUnchecked(key.namespace), key.key), value); } @Override @@ -114,51 +121,25 @@ public Map getBulk(Iterable keys) @Override public void close(String namespace) { - byte[] idBytes; - synchronized (namespaceId) { - idBytes = getNamespaceId(namespace); - if (idBytes == null) { - return; - } - - namespaceId.remove(namespace); - } - synchronized (clearLock) { - Iterator iter = baseMap.keySet().iterator(); + final Integer id = namespaceId.asMap().remove(namespace); + if (id != null) { List toRemove = Lists.newLinkedList(); - while (iter.hasNext()) { - ByteBuffer next = iter.next(); - - if (next.get(0) == idBytes[0] - && next.get(1) == idBytes[1] - && next.get(2) == idBytes[2] - && next.get(3) == idBytes[3]) { + final int unboxed = id; + for (ByteBuffer next : baseCache.asMap().keySet()) { + if (next.getInt(0) == unboxed) { toRemove.add(next); } } - for (ByteBuffer key : toRemove) { - baseMap.remove(key); - } - } - } - - private byte[] getNamespaceId(final String identifier) - { - synchronized (namespaceId) { - byte[] idBytes = namespaceId.get(identifier); - if (idBytes != null) { - return idBytes; - } - - idBytes = Ints.toByteArray(ids.getAndIncrement()); - namespaceId.put(identifier, idBytes); - return idBytes; + baseCache.invalidateAll(toRemove); } } - private ByteBuffer computeKey(byte[] idBytes, byte[] key) + private ByteBuffer computeKey(int id, byte[] key) { - final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(idBytes).put(key); + final ByteBuffer retVal = ByteBuffer + .allocate(key.length + 4) + .putInt(id) + .put(key); retVal.rewind(); return retVal; } diff --git a/server/src/test/java/io/druid/client/cache/HybridCacheTest.java b/server/src/test/java/io/druid/client/cache/HybridCacheTest.java index 4de27307dcac..e5af3eacdcfc 100644 --- a/server/src/test/java/io/druid/client/cache/HybridCacheTest.java +++ b/server/src/test/java/io/druid/client/cache/HybridCacheTest.java @@ -80,8 +80,8 @@ public void configure(Binder binder) @Test public void testSanity() throws Exception { - final MapCache l1 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); - final MapCache l2 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); + final MapCache l1 = MapCache.create(1024 * 1024); + final MapCache l2 = MapCache.create(1024 * 1024); HybridCache cache = new HybridCache(l1, l2); final Cache.NamedKey key1 = new Cache.NamedKey("a", HI); diff --git a/server/src/test/java/io/druid/client/cache/MapCacheTest.java b/server/src/test/java/io/druid/client/cache/MapCacheTest.java index e6920088505a..2cfce9047ba8 100644 --- a/server/src/test/java/io/druid/client/cache/MapCacheTest.java +++ b/server/src/test/java/io/druid/client/cache/MapCacheTest.java @@ -28,45 +28,43 @@ public class MapCacheTest { private static final byte[] HI = "hi".getBytes(); private static final byte[] HO = "ho".getBytes(); - private ByteCountingLRUMap baseMap; private MapCache cache; @Before public void setUp() throws Exception { - baseMap = new ByteCountingLRUMap(1024 * 1024); - cache = new MapCache(baseMap); + cache = MapCache.create(1024 * 1024); } @Test public void testSanity() throws Exception { Assert.assertNull(cache.get(new Cache.NamedKey("a", HI))); - Assert.assertEquals(0, baseMap.size()); + Assert.assertEquals(0, cache.getStats().getNumEntries()); put(cache, "a", HI, 1); - Assert.assertEquals(1, baseMap.size()); + Assert.assertEquals(1, cache.getStats().getNumEntries()); Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); put(cache, "the", HI, 2); - Assert.assertEquals(2, baseMap.size()); + Assert.assertEquals(2, cache.getStats().getNumEntries()); Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(2, get(cache, "the", HI)); put(cache, "the", HO, 10); - Assert.assertEquals(3, baseMap.size()); + Assert.assertEquals(3, cache.getStats().getNumEntries()); Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertNull(cache.get(new Cache.NamedKey("a", HO))); Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(10, get(cache, "the", HO)); cache.close("the"); - Assert.assertEquals(1, baseMap.size()); + Assert.assertEquals(1, cache.getStats().getNumEntries()); Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertNull(cache.get(new Cache.NamedKey("a", HO))); cache.close("a"); - Assert.assertEquals(0, baseMap.size()); + Assert.assertEquals(0, cache.getStats().getNumEntries()); } public void put(Cache cache, String namespace, byte[] key, Integer value)