From 2d5ca25c2a63466022ea3eba4ca09702671fa03a Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 8 Feb 2016 13:18:11 -0800 Subject: [PATCH] Add Caffeine cache layer extension * Fixes #2411 --- extensions/caffeine-cache/pom.xml | 66 +++++ .../io/druid/client/cache/CaffeineCache.java | 228 ++++++++++++++++++ .../client/cache/CaffeineCacheConfig.java | 41 ++++ .../client/cache/CaffeineCacheProvider.java | 32 +++ .../client/cache/CaffeineDruidModule.java | 63 +++++ .../io.druid.initialization.DruidModule | 1 + .../src/test/java/CaffeineCacheTest.java | 213 ++++++++++++++++ pom.xml | 1 + 8 files changed, 645 insertions(+) create mode 100644 extensions/caffeine-cache/pom.xml create mode 100644 extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java create mode 100644 extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java create mode 100644 extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java create mode 100644 extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java create mode 100644 extensions/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions/caffeine-cache/src/test/java/CaffeineCacheTest.java diff --git a/extensions/caffeine-cache/pom.xml b/extensions/caffeine-cache/pom.xml new file mode 100644 index 000000000000..f9ef41662b97 --- /dev/null +++ b/extensions/caffeine-cache/pom.xml @@ -0,0 +1,66 @@ + + + + + 4.0.0 + + io.druid.extensions + druid-caffeine-cache + druid-caffeine-cache + druid-caffeine-cache + + + io.druid + druid + 0.9.0-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + provided + + + io.druid + druid-server + ${project.parent.version} + provided + + + com.github.ben-manes.caffeine + caffeine + 2.1.0 + + + net.jpountz.lz4 + lz4 + provided + + + + + junit + junit + test + + + diff --git a/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java new file mode 100644 index 000000000000..07ead4436f6b --- /dev/null +++ b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java @@ -0,0 +1,228 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import org.apache.commons.codec.digest.DigestUtils; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class CaffeineCache implements io.druid.client.cache.Cache +{ + private static final Logger log = new Logger(CaffeineCache.class); + private final Cache cache; + private final AtomicReference priorStats = new AtomicReference<>( + null); + + + public static CaffeineCache create(final CaffeineCacheConfig config) + { + Caffeine builder = Caffeine.newBuilder().recordStats(); + if (config.getExpiration() >= 0) { + builder = builder + .expireAfterAccess(config.getExpiration(), TimeUnit.MILLISECONDS); + } + if (config.getMaxSize() >= 0) { + builder = builder + .maximumSize(config.getMaxSize()); + } + return new CaffeineCache(builder.build()); + } + + public CaffeineCache(final Cache cache) + { + this.cache = cache; + } + + @Override + public byte[] get(NamedKey key) + { + final String itemKey = computeKeyHash(key); + return deserialize(cache.getIfPresent(itemKey)); + } + + @Override + public void put(NamedKey key, byte[] value) + { + final String itemKey = computeKeyHash(key); + cache.put(itemKey, serialize(value)); + } + + @Override + public Map getBulk(Iterable keys) + { + final Map keyLookup = Maps.uniqueIndex( + keys, + new Function() + { + @Override + public String apply( + @Nullable NamedKey input + ) + { + return computeKeyHash(input); + } + } + ); + + // Sometimes broker passes empty keys list to getBulk() + if (keyLookup.size() == 0) { + return ImmutableMap.of(); + } + + final Map results = Maps.newHashMap(); + final Map cachedVals = cache.getAllPresent(Iterables.transform( + keys, + new Function() + { + @Override + public String apply( + NamedKey input + ) + { + return computeKeyHash(input); + } + } + )); + + // Hash join + for (String key : cachedVals.keySet()) { + final byte[] val = deserialize(cachedVals.get(key)); + if (val != null) { + results.put(keyLookup.get(key), val); + } + } + return results; + } + + // This is completely racy with put. Any values missed should be evicted later anyways. So no worries. + @Override + public void close(String namespace) + { + final String keyPrefix = computeNamespaceHash(namespace) + ":"; + for (String key : cache.asMap().keySet()) { + if (key.startsWith(keyPrefix)) { + cache.invalidate(key); + } + } + } + + @Override + public CacheStats getStats() + { + final com.github.benmanes.caffeine.cache.stats.CacheStats stats = cache.stats(); + return new CacheStats( + stats.hitCount(), + stats.missCount(), + stats.loadSuccessCount() - stats.evictionCount(), + cache.estimatedSize(), + stats.evictionCount(), + 0, + stats.loadFailureCount() + ); + } + + @Override + public boolean isLocal() + { + return true; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + final com.github.benmanes.caffeine.cache.stats.CacheStats oldStats = priorStats.get(); + final com.github.benmanes.caffeine.cache.stats.CacheStats newStats = cache.stats(); + final com.github.benmanes.caffeine.cache.stats.CacheStats deltaStats; + if (oldStats == null) { + deltaStats = newStats; + } else { + deltaStats = newStats.minus(oldStats); + } + final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + emitter.emit(builder.build("query/cache/caffeine/delta/requests", deltaStats.requestCount())); + emitter.emit(builder.build("query/cache/caffeine/total/requests", newStats.requestCount())); + emitter.emit(builder.build("query/cache/caffeine/delta/loadTime", deltaStats.totalLoadTime())); + emitter.emit(builder.build("query/cache/caffeine/total/loadTime", newStats.totalLoadTime())); + if (!priorStats.compareAndSet(oldStats, newStats)) { + // ISE for stack trace + log.warn( + new IllegalStateException("Multiple monitors"), + "Multiple monitors on the same cache causing race conditions and unreliable stats reporting" + ); + } + } + + private final LZ4Factory factory = LZ4Factory.fastestInstance(); + + private byte[] deserialize(byte[] bytes) + { + if (bytes == null) { + return null; + } + final int decompressedLen = ByteBuffer.wrap(bytes).getInt(); + final byte[] out = new byte[decompressedLen]; + final int bytesRead = factory.fastDecompressor().decompress(bytes, Ints.BYTES, out, 0, out.length); + if (bytesRead != bytes.length - Ints.BYTES) { + if (log.isDebugEnabled()) { + log.debug("Bytes read [%s] does not equal expected bytes read [%s]", bytesRead, bytes.length - Ints.BYTES); + } + } + return out; + } + + private byte[] serialize(byte[] value) + { + final LZ4Compressor compressor = factory.fastCompressor(); + final int len = compressor.maxCompressedLength(value.length); + final byte[] out = new byte[len]; + final int compressedSize = compressor.compress(value, 0, value.length, out, 0); + return ByteBuffer.allocate(compressedSize + Ints.BYTES) + .putInt(value.length) + .put(out, 0, compressedSize) + .array(); + } + + public static String computeKeyHash(final NamedKey key) + { + return String.format("%s:%s", computeNamespaceHash(key.namespace), DigestUtils.sha1Hex(key.key)); + } + + // So people can't do weird things with namespace strings + public static String computeNamespaceHash(final String namespace) + { + return DigestUtils.sha1Hex(namespace); + } +} diff --git a/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java new file mode 100644 index 000000000000..8b20274333d2 --- /dev/null +++ b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class CaffeineCacheConfig +{ + @JsonProperty + private long expiration = -1; + + @JsonProperty + private long maxSize = -1; + + public long getExpiration() + { + return expiration; + } + + public long getMaxSize() + { + return maxSize; + } +} diff --git a/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java new file mode 100644 index 000000000000..23e18987fe7e --- /dev/null +++ b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("caffeine") +public class CaffeineCacheProvider extends CaffeineCacheConfig implements CacheProvider +{ + @Override + public Cache get() + { + return CaffeineCache.create(this); + } +} diff --git a/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java new file mode 100644 index 000000000000..ca1bcaa9733f --- /dev/null +++ b/extensions/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java @@ -0,0 +1,63 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.Module; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +public class CaffeineDruidModule implements DruidModule +{ + + @Override + public void configure(Binder binder) + { + + } + + @Override + public List getJacksonModules() + { + return ImmutableList.of(new Module() + { + @Override + public String getModuleName() + { + return "DruidCaffeineCache-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(CaffeineCacheProvider.class); + } + }); + } +} diff --git a/extensions/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..016f13528ad4 --- /dev/null +++ b/extensions/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.client.cache.CaffeineDruidModule diff --git a/extensions/caffeine-cache/src/test/java/CaffeineCacheTest.java b/extensions/caffeine-cache/src/test/java/CaffeineCacheTest.java new file mode 100644 index 000000000000..76ffba8b74bd --- /dev/null +++ b/extensions/caffeine-cache/src/test/java/CaffeineCacheTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheProvider; +import io.druid.client.cache.CacheStats; +import io.druid.client.cache.CaffeineCache; +import io.druid.client.cache.CaffeineCacheConfig; +import io.druid.client.cache.CaffeineCacheProvider; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.Initialization; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; +import java.util.UUID; + +public class CaffeineCacheTest +{ + private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); + private static final byte[] HO = "hooooooooooooooooooo".getBytes(); + + private CaffeineCache cache; + private final CaffeineCacheConfig cacheConfig = new CaffeineCacheConfig() + { + public int getPoolSize() + { + return 1; + } + + public String getPrefix() + { + return "druid"; + } + + public long getExpiration() + { + return -1; + } + }; + + @Before + public void setUp() throws Exception + { + cache = CaffeineCache.create(cacheConfig); + } + + @Test + public void testBasicInjection() throws Exception + { + final CaffeineCacheConfig config = new CaffeineCacheConfig(); + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(CaffeineCacheConfig.class).toInstance(config); + binder.bind(Cache.class).toProvider(CaffeineCacheProviderWithConfig.class).in(ManageLifecycle.class); + } + } + ) + ); + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.start(); + try { + Cache cache = injector.getInstance(Cache.class); + Assert.assertEquals(CaffeineCache.class, cache.getClass()); + } + finally { + lifecycle.stop(); + } + } + + @Test + public void testSimpleInjection() + { + final String uuid = UUID.randomUUID().toString(); + System.setProperty(uuid + ".type", "caffeine"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(Cache.class).toProvider(CacheProvider.class); + JsonConfigProvider.bind(binder, uuid, CacheProvider.class); + } + } + ) + ); + final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class); + Assert.assertNotNull(cacheProvider); + Assert.assertEquals(CaffeineCacheProvider.class, cacheProvider.getClass()); + } + + @Test + public void testBaseOps() throws Exception + { + final Cache.NamedKey aKey = new Cache.NamedKey("a", HI); + Assert.assertNull(cache.get(aKey)); + put(cache, aKey, 1); + Assert.assertEquals(1, get(cache, aKey)); + cache.close("a"); + Assert.assertNull(cache.get(aKey)); + + final Cache.NamedKey hiKey = new Cache.NamedKey("the", HI); + final Cache.NamedKey hoKey = new Cache.NamedKey("the", HO); + put(cache, hiKey, 10); + put(cache, hoKey, 20); + Assert.assertEquals(10, get(cache, hiKey)); + Assert.assertEquals(20, get(cache, hoKey)); + cache.close("the"); + Assert.assertNull(cache.get(hiKey)); + Assert.assertNull(cache.get(hoKey)); + + final CacheStats stats = cache.getStats(); + Assert.assertEquals(3, stats.getNumHits()); + Assert.assertEquals(4, stats.getNumMisses()); + } + + @Test + public void testGetBulk() throws Exception + { + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); + + Cache.NamedKey key1 = new Cache.NamedKey("the", HI); + put(cache, key1, 2); + + Cache.NamedKey key2 = new Cache.NamedKey("the", HO); + put(cache, key2, 10); + + Map result = cache.getBulk( + Lists.newArrayList( + key1, + key2 + ) + ); + + Assert.assertEquals(2, Ints.fromByteArray(result.get(key1))); + Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); + + Cache.NamedKey missingKey = new Cache.NamedKey("missing", HI); + result = cache.getBulk(Lists.newArrayList(missingKey)); + Assert.assertEquals(result.size(), 0); + + result = cache.getBulk(Lists.newArrayList()); + Assert.assertEquals(result.size(), 0); + } + + public int get(Cache cache, Cache.NamedKey key) + { + return Ints.fromByteArray(cache.get(key)); + } + + public void put(Cache cache, Cache.NamedKey key, Integer value) + { + cache.put(key, Ints.toByteArray(value)); + } +} + +class CaffeineCacheProviderWithConfig extends CaffeineCacheProvider +{ + private final CaffeineCacheConfig config; + + @Inject + public CaffeineCacheProviderWithConfig(CaffeineCacheConfig config) + { + this.config = config; + } + + @Override + public Cache get() + { + return CaffeineCache.create(config); + } +} diff --git a/pom.xml b/pom.xml index c5c84cb2fe5d..15178e040bf8 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ extensions/cloudfiles-extensions extensions/datasketches extensions/avro-extensions + extensions/caffeine-cache distribution