diff --git a/extensions/caffeine-cache/pom.xml b/extensions/caffeine-cache/pom.xml
new file mode 100644
index 000000000000..f9ef41662b97
--- /dev/null
+++ b/extensions/caffeine-cache/pom.xml
@@ -0,0 +1,66 @@
+
+
+
+
+ 4.0.0
+
+ io.druid.extensions
+ druid-caffeine-cache
+ druid-caffeine-cache
+ druid-caffeine-cache
+
+
+ io.druid
+ druid
+ 0.9.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+
+ io.druid
+ druid-api
+ provided
+
+
+ io.druid
+ druid-server
+ ${project.parent.version}
+ provided
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 2.1.0
+
+
+ net.jpountz.lz4
+ lz4
+ provided
+
+
+
+
+ junit
+ junit
+ test
+
+
+
diff --git a/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java
new file mode 100644
index 000000000000..07ead4436f6b
--- /dev/null
+++ b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.metamx.common.logger.Logger;
+import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.emitter.service.ServiceMetricEvent;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.commons.codec.digest.DigestUtils;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CaffeineCache implements io.druid.client.cache.Cache
+{
+ private static final Logger log = new Logger(CaffeineCache.class);
+ private final Cache cache;
+ private final AtomicReference priorStats = new AtomicReference<>(
+ null);
+
+
+ public static CaffeineCache create(final CaffeineCacheConfig config)
+ {
+ Caffeine builder = Caffeine.newBuilder().recordStats();
+ if (config.getExpiration() >= 0) {
+ builder = builder
+ .expireAfterAccess(config.getExpiration(), TimeUnit.MILLISECONDS);
+ }
+ if (config.getMaxSize() >= 0) {
+ builder = builder
+ .maximumSize(config.getMaxSize());
+ }
+ return new CaffeineCache(builder.build());
+ }
+
+ public CaffeineCache(final Cache cache)
+ {
+ this.cache = cache;
+ }
+
+ @Override
+ public byte[] get(NamedKey key)
+ {
+ final String itemKey = computeKeyHash(key);
+ return deserialize(cache.getIfPresent(itemKey));
+ }
+
+ @Override
+ public void put(NamedKey key, byte[] value)
+ {
+ final String itemKey = computeKeyHash(key);
+ cache.put(itemKey, serialize(value));
+ }
+
+ @Override
+ public Map getBulk(Iterable keys)
+ {
+ final Map keyLookup = Maps.uniqueIndex(
+ keys,
+ new Function()
+ {
+ @Override
+ public String apply(
+ @Nullable NamedKey input
+ )
+ {
+ return computeKeyHash(input);
+ }
+ }
+ );
+
+ // Sometimes broker passes empty keys list to getBulk()
+ if (keyLookup.size() == 0) {
+ return ImmutableMap.of();
+ }
+
+ final Map results = Maps.newHashMap();
+ final Map cachedVals = cache.getAllPresent(Iterables.transform(
+ keys,
+ new Function()
+ {
+ @Override
+ public String apply(
+ NamedKey input
+ )
+ {
+ return computeKeyHash(input);
+ }
+ }
+ ));
+
+ // Hash join
+ for (String key : cachedVals.keySet()) {
+ final byte[] val = deserialize(cachedVals.get(key));
+ if (val != null) {
+ results.put(keyLookup.get(key), val);
+ }
+ }
+ return results;
+ }
+
+ // This is completely racy with put. Any values missed should be evicted later anyways. So no worries.
+ @Override
+ public void close(String namespace)
+ {
+ final String keyPrefix = computeNamespaceHash(namespace) + ":";
+ for (String key : cache.asMap().keySet()) {
+ if (key.startsWith(keyPrefix)) {
+ cache.invalidate(key);
+ }
+ }
+ }
+
+ @Override
+ public CacheStats getStats()
+ {
+ final com.github.benmanes.caffeine.cache.stats.CacheStats stats = cache.stats();
+ return new CacheStats(
+ stats.hitCount(),
+ stats.missCount(),
+ stats.loadSuccessCount() - stats.evictionCount(),
+ cache.estimatedSize(),
+ stats.evictionCount(),
+ 0,
+ stats.loadFailureCount()
+ );
+ }
+
+ @Override
+ public boolean isLocal()
+ {
+ return true;
+ }
+
+ @Override
+ public void doMonitor(ServiceEmitter emitter)
+ {
+ final com.github.benmanes.caffeine.cache.stats.CacheStats oldStats = priorStats.get();
+ final com.github.benmanes.caffeine.cache.stats.CacheStats newStats = cache.stats();
+ final com.github.benmanes.caffeine.cache.stats.CacheStats deltaStats;
+ if (oldStats == null) {
+ deltaStats = newStats;
+ } else {
+ deltaStats = newStats.minus(oldStats);
+ }
+ final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ emitter.emit(builder.build("query/cache/caffeine/delta/requests", deltaStats.requestCount()));
+ emitter.emit(builder.build("query/cache/caffeine/total/requests", newStats.requestCount()));
+ emitter.emit(builder.build("query/cache/caffeine/delta/loadTime", deltaStats.totalLoadTime()));
+ emitter.emit(builder.build("query/cache/caffeine/total/loadTime", newStats.totalLoadTime()));
+ if (!priorStats.compareAndSet(oldStats, newStats)) {
+ // ISE for stack trace
+ log.warn(
+ new IllegalStateException("Multiple monitors"),
+ "Multiple monitors on the same cache causing race conditions and unreliable stats reporting"
+ );
+ }
+ }
+
+ private final LZ4Factory factory = LZ4Factory.fastestInstance();
+
+ private byte[] deserialize(byte[] bytes)
+ {
+ if (bytes == null) {
+ return null;
+ }
+ final int decompressedLen = ByteBuffer.wrap(bytes).getInt();
+ final byte[] out = new byte[decompressedLen];
+ final int bytesRead = factory.fastDecompressor().decompress(bytes, Ints.BYTES, out, 0, out.length);
+ if (bytesRead != bytes.length - Ints.BYTES) {
+ if (log.isDebugEnabled()) {
+ log.debug("Bytes read [%s] does not equal expected bytes read [%s]", bytesRead, bytes.length - Ints.BYTES);
+ }
+ }
+ return out;
+ }
+
+ private byte[] serialize(byte[] value)
+ {
+ final LZ4Compressor compressor = factory.fastCompressor();
+ final int len = compressor.maxCompressedLength(value.length);
+ final byte[] out = new byte[len];
+ final int compressedSize = compressor.compress(value, 0, value.length, out, 0);
+ return ByteBuffer.allocate(compressedSize + Ints.BYTES)
+ .putInt(value.length)
+ .put(out, 0, compressedSize)
+ .array();
+ }
+
+ public static String computeKeyHash(final NamedKey key)
+ {
+ return String.format("%s:%s", computeNamespaceHash(key.namespace), DigestUtils.sha1Hex(key.key));
+ }
+
+ // So people can't do weird things with namespace strings
+ public static String computeNamespaceHash(final String namespace)
+ {
+ return DigestUtils.sha1Hex(namespace);
+ }
+}
diff --git a/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java
new file mode 100644
index 000000000000..8b20274333d2
--- /dev/null
+++ b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class CaffeineCacheConfig
+{
+ @JsonProperty
+ private long expiration = -1;
+
+ @JsonProperty
+ private long maxSize = -1;
+
+ public long getExpiration()
+ {
+ return expiration;
+ }
+
+ public long getMaxSize()
+ {
+ return maxSize;
+ }
+}
diff --git a/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java
new file mode 100644
index 000000000000..23e18987fe7e
--- /dev/null
+++ b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("caffeine")
+public class CaffeineCacheProvider extends CaffeineCacheConfig implements CacheProvider
+{
+ @Override
+ public Cache get()
+ {
+ return CaffeineCache.create(this);
+ }
+}
diff --git a/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java
new file mode 100644
index 000000000000..ca1bcaa9733f
--- /dev/null
+++ b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.Module;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class CaffeineDruidModule implements DruidModule
+{
+
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(new Module()
+ {
+ @Override
+ public String getModuleName()
+ {
+ return "DruidCaffeineCache-" + System.identityHashCode(this);
+ }
+
+ @Override
+ public Version version()
+ {
+ return Version.unknownVersion();
+ }
+
+ @Override
+ public void setupModule(SetupContext context)
+ {
+ context.registerSubtypes(CaffeineCacheProvider.class);
+ }
+ });
+ }
+}
diff --git a/extensions/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 000000000000..016f13528ad4
--- /dev/null
+++ b/extensions/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.client.cache.CaffeineDruidModule
diff --git a/extensions/caffeine-cache/src/test/java/CaffeineCacheTest.java b/extensions/caffeine-cache/src/test/java/CaffeineCacheTest.java
new file mode 100644
index 000000000000..76ffba8b74bd
--- /dev/null
+++ b/extensions/caffeine-cache/src/test/java/CaffeineCacheTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+import com.metamx.common.lifecycle.Lifecycle;
+import io.druid.client.cache.Cache;
+import io.druid.client.cache.CacheProvider;
+import io.druid.client.cache.CacheStats;
+import io.druid.client.cache.CaffeineCache;
+import io.druid.client.cache.CaffeineCacheConfig;
+import io.druid.client.cache.CaffeineCacheProvider;
+import io.druid.guice.GuiceInjectors;
+import io.druid.guice.JsonConfigProvider;
+import io.druid.guice.ManageLifecycle;
+import io.druid.initialization.Initialization;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.UUID;
+
+public class CaffeineCacheTest
+{
+ private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes();
+ private static final byte[] HO = "hooooooooooooooooooo".getBytes();
+
+ private CaffeineCache cache;
+ private final CaffeineCacheConfig cacheConfig = new CaffeineCacheConfig()
+ {
+ public int getPoolSize()
+ {
+ return 1;
+ }
+
+ public String getPrefix()
+ {
+ return "druid";
+ }
+
+ public long getExpiration()
+ {
+ return -1;
+ }
+ };
+
+ @Before
+ public void setUp() throws Exception
+ {
+ cache = CaffeineCache.create(cacheConfig);
+ }
+
+ @Test
+ public void testBasicInjection() throws Exception
+ {
+ final CaffeineCacheConfig config = new CaffeineCacheConfig();
+ Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(), ImmutableList.of(
+ new Module()
+ {
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+
+ binder.bind(CaffeineCacheConfig.class).toInstance(config);
+ binder.bind(Cache.class).toProvider(CaffeineCacheProviderWithConfig.class).in(ManageLifecycle.class);
+ }
+ }
+ )
+ );
+ final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
+ lifecycle.start();
+ try {
+ Cache cache = injector.getInstance(Cache.class);
+ Assert.assertEquals(CaffeineCache.class, cache.getClass());
+ }
+ finally {
+ lifecycle.stop();
+ }
+ }
+
+ @Test
+ public void testSimpleInjection()
+ {
+ final String uuid = UUID.randomUUID().toString();
+ System.setProperty(uuid + ".type", "caffeine");
+ final Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(), ImmutableList.of(
+ new Module()
+ {
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+
+ binder.bind(Cache.class).toProvider(CacheProvider.class);
+ JsonConfigProvider.bind(binder, uuid, CacheProvider.class);
+ }
+ }
+ )
+ );
+ final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class);
+ Assert.assertNotNull(cacheProvider);
+ Assert.assertEquals(CaffeineCacheProvider.class, cacheProvider.getClass());
+ }
+
+ @Test
+ public void testBaseOps() throws Exception
+ {
+ final Cache.NamedKey aKey = new Cache.NamedKey("a", HI);
+ Assert.assertNull(cache.get(aKey));
+ put(cache, aKey, 1);
+ Assert.assertEquals(1, get(cache, aKey));
+ cache.close("a");
+ Assert.assertNull(cache.get(aKey));
+
+ final Cache.NamedKey hiKey = new Cache.NamedKey("the", HI);
+ final Cache.NamedKey hoKey = new Cache.NamedKey("the", HO);
+ put(cache, hiKey, 10);
+ put(cache, hoKey, 20);
+ Assert.assertEquals(10, get(cache, hiKey));
+ Assert.assertEquals(20, get(cache, hoKey));
+ cache.close("the");
+ Assert.assertNull(cache.get(hiKey));
+ Assert.assertNull(cache.get(hoKey));
+
+ final CacheStats stats = cache.getStats();
+ Assert.assertEquals(3, stats.getNumHits());
+ Assert.assertEquals(4, stats.getNumMisses());
+ }
+
+ @Test
+ public void testGetBulk() throws Exception
+ {
+ Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
+
+ Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
+ put(cache, key1, 2);
+
+ Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
+ put(cache, key2, 10);
+
+ Map result = cache.getBulk(
+ Lists.newArrayList(
+ key1,
+ key2
+ )
+ );
+
+ Assert.assertEquals(2, Ints.fromByteArray(result.get(key1)));
+ Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
+
+ Cache.NamedKey missingKey = new Cache.NamedKey("missing", HI);
+ result = cache.getBulk(Lists.newArrayList(missingKey));
+ Assert.assertEquals(result.size(), 0);
+
+ result = cache.getBulk(Lists.newArrayList());
+ Assert.assertEquals(result.size(), 0);
+ }
+
+ public int get(Cache cache, Cache.NamedKey key)
+ {
+ return Ints.fromByteArray(cache.get(key));
+ }
+
+ public void put(Cache cache, Cache.NamedKey key, Integer value)
+ {
+ cache.put(key, Ints.toByteArray(value));
+ }
+}
+
+class CaffeineCacheProviderWithConfig extends CaffeineCacheProvider
+{
+ private final CaffeineCacheConfig config;
+
+ @Inject
+ public CaffeineCacheProviderWithConfig(CaffeineCacheConfig config)
+ {
+ this.config = config;
+ }
+
+ @Override
+ public Cache get()
+ {
+ return CaffeineCache.create(config);
+ }
+}
diff --git a/pom.xml b/pom.xml
index c5c84cb2fe5d..15178e040bf8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,7 @@
extensions/cloudfiles-extensions
extensions/datasketches
extensions/avro-extensions
+ extensions/caffeine-cache
distribution