From 155729f03d3af8a4d5f7a7a0f90afbe2c9ea7619 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 26 May 2016 13:12:42 -0700 Subject: [PATCH 01/13] Initial commit of caffeine cache --- extensions-core/caffeine-cache/README.md | 41 ++ extensions-core/caffeine-cache/pom.xml | 168 ++++++ .../client/cache/CacheExecutorFactory.java | 57 ++ .../io/druid/client/cache/CaffeineCache.java | 196 +++++++ .../client/cache/CaffeineCacheConfig.java | 60 +++ .../client/cache/CaffeineCacheProvider.java | 32 ++ .../client/cache/CaffeineDruidModule.java | 47 ++ .../io.druid.initialization.DruidModule | 1 + .../druid/client/cache/CaffeineCacheTest.java | 489 ++++++++++++++++++ pom.xml | 9 + 10 files changed, 1100 insertions(+) create mode 100644 extensions-core/caffeine-cache/README.md create mode 100644 extensions-core/caffeine-cache/pom.xml create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java create mode 100644 extensions-core/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java diff --git a/extensions-core/caffeine-cache/README.md b/extensions-core/caffeine-cache/README.md new file mode 100644 index 000000000000..5fb09b3ce9b2 --- /dev/null +++ b/extensions-core/caffeine-cache/README.md @@ -0,0 +1,41 @@ +Druid Caffeine Cache +-------------------- + +A local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE with a fix for https://bugs.openjdk.java.net/browse/JDK-8078490 + +# Versioning + +The versioning works like this: `druid_version.patch_set`. Such that `druid_version` is the version of druid the extension was compiled against, and `patch_set` is the "version" of the extension. + +# How to use +The maven artifact coordinate for this extension is `io.druid.extensions:druid-caffeine-cache` + +For Druid 0.9.0 and later, the extension can be included by pulling the jars using the `pull-deps` tool, and including the extension directory in the extension load list. + +### Jars + +For the sake of sanity if you are manually making an extension directory, release requires the following jars: + +* caffeine-2.3.0.jar +* druid-caffeine-cache-0.9.1.jar (or whatever the correct version for druid is that you are using) + +# Configuration +Below are the configuration options known to this module: + +|`runtime.properties`|Description|Default| +|--------------------|-----------|-------| +|`druid.cache.sizeInBytes`|The maximum size of the cache in bytes on heap.|None (unlimited)| +|`druid.cache.expireAfter`|The time (in ms) after an access for which a cache entry may be expired|None (no time limit)| +|`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or `SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)| +|`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment from a node) should cause an eager eviction of associated cache values|`false`| + +To enable the caffeine cache, include this module on the loadList and set `druid.cache.type` to `caffeine` in your properties. + +# Metrics +In addition to the normal cache metrics, the caffeine cache implementation also reports the following in both `total` and `delta` + +|Metric|Description|Normal value| +|------|-----------|------------| +|`query/cache/caffeine/*/requests`|Count of hits or misses|hit + miss| +|`query/cache/caffeine/*/loadTime`|Length of time caffeine spends loading new values (unused feature)|0| +|`query/cache/caffeine/*/evictionBytes`|Size in bytes that have been evicted from the cache|Varies, should tune cache `sizeInBytes` so that `sizeInBytes`/`evictionBytes` is approximately the rate of cache churn you desire| diff --git a/extensions-core/caffeine-cache/pom.xml b/extensions-core/caffeine-cache/pom.xml new file mode 100644 index 000000000000..6430d1de237d --- /dev/null +++ b/extensions-core/caffeine-cache/pom.xml @@ -0,0 +1,168 @@ + + + + + + 4.0.0 + + io.druid.extensions + druid-caffeine-cache + druid-caffeine-cache + druid-caffeine-cache + + + io.druid + druid + 0.9.1-SNAPSHOT + ../../pom.xml + + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + + + + + + Charles Allen + charles.allen@metamarkets.com + Metamarkets Group Inc. + https://www.metamarkets.com + + + + + scm:git:ssh://git@github.com/metamx/druid-cache-caffeine.git + scm:git:ssh://git@github.com/metamx/druid-cache-caffeine.git + https://github.com/metamx/druid-cache-caffeine.git + HEAD + + + + + io.druid + druid-api + provided + ${project.parent.version} + + + io.druid + druid-server + ${project.parent.version} + provided + + + com.github.ben-manes.caffeine + caffeine + 2.3.0 + + + net.jpountz.lz4 + lz4 + provided + + + + + junit + junit + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + + release + + + sonatype-nexus-staging + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + true + + sonatype-nexus-staging + https://oss.sonatype.org/ + false + + + + + + + diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java new file mode 100644 index 000000000000..f9efa306e04f --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonCreator; +import io.druid.concurrent.Execs; +import java.util.concurrent.Executor; + +public enum CacheExecutorFactory +{ + COMMON_FJP { + @Override + public Executor createExecutor() + { + return null; + } + }, + SINGLE_THREAD { + @Override + public Executor createExecutor() + { + return Execs.singleThreaded("CaffeineWorker-%s"); + } + }, + SAME_THREAD { + @Override + public Executor createExecutor() + { + return Runnable::run; + } + }; + + public abstract Executor createExecutor(); + + @JsonCreator + public static CacheExecutorFactory from(String str) + { + return Enum.valueOf(CacheExecutorFactory.class, str.toUpperCase()); + } +} diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java new file mode 100644 index 000000000000..c98d94dceb23 --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.primitives.Chars; +import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +public class CaffeineCache implements io.druid.client.cache.Cache +{ + private static final Logger log = new Logger(CaffeineCache.class); + private static final int FIXED_COST = 8; // Minimum cost in "weight" per entry; + private final Cache cache; + private final AtomicReference priorStats = new AtomicReference<>(null); + private final CaffeineCacheConfig config; + + + public static CaffeineCache create(final CaffeineCacheConfig config) + { + return create(config, config.createExecutor()); + } + + // Used in testing + public static CaffeineCache create(final CaffeineCacheConfig config, @Nullable final Executor executor) + { + Caffeine builder = Caffeine.newBuilder().recordStats(); + if (config.getExpireAfter() >= 0) { + builder + .expireAfterAccess(config.getExpireAfter(), TimeUnit.MILLISECONDS); + } + if (config.getSizeInBytes() >= 0) { + builder + .maximumWeight(config.getSizeInBytes()) + .weigher((NamedKey key, byte[] value) -> value.length + + key.key.length + + key.namespace.length() * Chars.BYTES + + FIXED_COST); + } + if (executor != null) { + builder.executor(executor); + } + return new CaffeineCache(builder.build(), config); + } + + public CaffeineCache(final Cache cache, CaffeineCacheConfig config) + { + this.cache = cache; + this.config = config; + } + + @Override + public byte[] get(NamedKey key) + { + return deserialize(cache.getIfPresent(key)); + } + + @Override + public void put(NamedKey key, byte[] value) + { + cache.put(key, serialize(value)); + } + + @Override + public Map getBulk(Iterable keys) + { + // The assumption here is that every value is accessed at least once. Materializing here ensures deserialize is only + // called *once* per value. + return ImmutableMap.copyOf(Maps.transformValues(cache.getAllPresent(keys), this::deserialize)); + } + + // This is completely racy with put. Any values missed should be evicted later anyways. So no worries. + @Override + public void close(String namespace) + { + if (config.isEvictOnClose()) { + cache.asMap().keySet().stream().filter(key -> key.namespace.equals(namespace)).forEach(cache::invalidate); + } + } + + @Override + public io.druid.client.cache.CacheStats getStats() + { + final com.github.benmanes.caffeine.cache.stats.CacheStats stats = cache.stats(); + final long size = cache + .policy().eviction() + .map(eviction -> eviction.isWeighted() ? eviction.weightedSize() : OptionalLong.empty()) + .orElse(OptionalLong.empty()).orElse(-1); + return new io.druid.client.cache.CacheStats( + stats.hitCount(), + stats.missCount(), + cache.estimatedSize(), + size, + stats.evictionCount(), + 0, + stats.loadFailureCount() + ); + } + + @Override + public boolean isLocal() + { + return true; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + final CacheStats oldStats = priorStats.get(); + final CacheStats newStats = cache.stats(); + final CacheStats deltaStats; + if (oldStats == null) { + deltaStats = newStats; + } else { + deltaStats = newStats.minus(oldStats); + } + final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + emitter.emit(builder.build("query/cache/caffeine/delta/requests", deltaStats.requestCount())); + emitter.emit(builder.build("query/cache/caffeine/total/requests", newStats.requestCount())); + emitter.emit(builder.build("query/cache/caffeine/delta/loadTime", deltaStats.totalLoadTime())); + emitter.emit(builder.build("query/cache/caffeine/total/loadTime", newStats.totalLoadTime())); + emitter.emit(builder.build("query/cache/caffeine/delta/evictionBytes", deltaStats.evictionWeight())); + emitter.emit(builder.build("query/cache/caffeine/total/evictionBytes", newStats.evictionWeight())); + if (!priorStats.compareAndSet(oldStats, newStats)) { + // ISE for stack trace + log.warn( + new IllegalStateException("Multiple monitors"), + "Multiple monitors on the same cache causing race conditions and unreliable stats reporting" + ); + } + } + + Cache getCache() + { + return cache; + } + + private final LZ4Factory factory = LZ4Factory.fastestInstance(); + private final LZ4FastDecompressor decompressor = factory.fastDecompressor(); + private final LZ4Compressor compressor = factory.fastCompressor(); + + private byte[] deserialize(byte[] bytes) + { + if (bytes == null) { + return null; + } + final int decompressedLen = ByteBuffer.wrap(bytes).getInt(); + final byte[] out = new byte[decompressedLen]; + decompressor.decompress(bytes, Ints.BYTES, out, 0, out.length); + return out; + } + + private byte[] serialize(byte[] value) + { + final int len = compressor.maxCompressedLength(value.length); + final byte[] out = new byte[len]; + final int compressedSize = compressor.compress(value, 0, value.length, out, 0); + return ByteBuffer.allocate(compressedSize + Ints.BYTES) + .putInt(value.length) + .put(out, 0, compressedSize) + .array(); + } +} diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java new file mode 100644 index 000000000000..50c5ed2875f9 --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.concurrent.Executor; + +public class CaffeineCacheConfig +{ + @JsonProperty + private long expireAfter = -1; + + @JsonProperty + private long sizeInBytes = -1; + + @JsonProperty + // Do not use COMMON_FJP unless you're running 8u60 or higher + // see https://github.com/ben-manes/caffeine/issues/77 + private CacheExecutorFactory cacheExecutorFactory = CacheExecutorFactory.COMMON_FJP; + + @JsonProperty + private boolean evictOnClose = false; + + public long getExpireAfter() + { + return expireAfter; + } + + public long getSizeInBytes() + { + return sizeInBytes; + } + + public Executor createExecutor() + { + return cacheExecutorFactory.createExecutor(); + } + + public boolean isEvictOnClose() + { + return evictOnClose; + } +} diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java new file mode 100644 index 000000000000..23e18987fe7e --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("caffeine") +public class CaffeineCacheProvider extends CaffeineCacheConfig implements CacheProvider +{ + @Override + public Cache get() + { + return CaffeineCache.create(this); + } +} diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java new file mode 100644 index 000000000000..fbadf4926876 --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +public class CaffeineDruidModule implements DruidModule +{ + + @Override + public void configure(Binder binder) + { + + } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("DruidCaffeineCache-" + System.identityHashCode(this)) + .registerSubtypes(CaffeineCacheProvider.class) + ); + } +} diff --git a/extensions-core/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..016f13528ad4 --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.client.cache.CaffeineDruidModule diff --git a/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java new file mode 100644 index 000000000000..43c67a87b605 --- /dev/null +++ b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java @@ -0,0 +1,489 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.JsonConfigurator; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.Initialization; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CaffeineCacheTest +{ + private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); + private static final byte[] HO = "hooooooooooooooooooo".getBytes(); + + private CaffeineCache cache; + private final CaffeineCacheConfig cacheConfig = new CaffeineCacheConfig(); + + @Before + public void setUp() throws Exception + { + cache = CaffeineCache.create(cacheConfig); + } + + @Test + public void testBasicInjection() throws Exception + { + final CaffeineCacheConfig config = new CaffeineCacheConfig(); + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(CaffeineCacheConfig.class).toInstance(config); + binder.bind(Cache.class).toProvider(CaffeineCacheProviderWithConfig.class).in(ManageLifecycle.class); + } + } + ) + ); + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.start(); + try { + Cache cache = injector.getInstance(Cache.class); + Assert.assertEquals(CaffeineCache.class, cache.getClass()); + } + finally { + lifecycle.stop(); + } + } + + @Test + public void testSimpleInjection() + { + final String uuid = UUID.randomUUID().toString(); + System.setProperty(uuid + ".type", "caffeine"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(Cache.class).toProvider(CacheProvider.class); + JsonConfigProvider.bind(binder, uuid, CacheProvider.class); + } + } + ) + ); + final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class); + Assert.assertNotNull(cacheProvider); + Assert.assertEquals(CaffeineCacheProvider.class, cacheProvider.getClass()); + } + + @Test + public void testBaseOps() throws Exception + { + final Cache.NamedKey aKey = new Cache.NamedKey("a", HI); + Assert.assertNull(cache.get(aKey)); + put(cache, aKey, 1); + Assert.assertEquals(1, get(cache, aKey)); + + /* Lazily deleted by LRU + cache.close("a"); + Assert.assertNull(cache.get(aKey)); + */ + + final Cache.NamedKey hiKey = new Cache.NamedKey("the", HI); + final Cache.NamedKey hoKey = new Cache.NamedKey("the", HO); + put(cache, hiKey, 10); + put(cache, hoKey, 20); + Assert.assertEquals(10, get(cache, hiKey)); + Assert.assertEquals(20, get(cache, hoKey)); + cache.close("the"); + /* Lazily deleted by LRU + Assert.assertNull(cache.get(hiKey)); + Assert.assertNull(cache.get(hoKey)); + */ + + Assert.assertNull(cache.get(new Cache.NamedKey("miss", HI))); + + final CacheStats stats = cache.getStats(); + Assert.assertEquals(3, stats.getNumHits()); + Assert.assertEquals(2, stats.getNumMisses()); + } + + @Test + public void testGetBulk() throws Exception + { + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); + + Cache.NamedKey key1 = new Cache.NamedKey("the", HI); + put(cache, key1, 2); + + Cache.NamedKey key2 = new Cache.NamedKey("the", HO); + put(cache, key2, 10); + + Map result = cache.getBulk( + Lists.newArrayList( + key1, + key2 + ) + ); + + Assert.assertEquals(2, Ints.fromByteArray(result.get(key1))); + Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); + + Cache.NamedKey missingKey = new Cache.NamedKey("missing", HI); + result = cache.getBulk(Lists.newArrayList(missingKey)); + Assert.assertEquals(result.size(), 0); + + result = cache.getBulk(Lists.newArrayList()); + Assert.assertEquals(result.size(), 0); + } + + @Test + public void testSizeEviction() throws InterruptedException + { + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return 40; + } + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); + final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); + final CaffeineCache cache = CaffeineCache.create(config, Runnable::run); + + Assert.assertNull(cache.get(key1)); + Assert.assertNull(cache.get(key2)); + + cache.put(key1, val1); + Assert.assertArrayEquals(val1, cache.get(key1)); + Assert.assertNull(cache.get(key2)); + + Assert.assertEquals(0, cache.getCache().stats().evictionWeight()); + + Assert.assertArrayEquals(val1, cache.get(key1)); + Assert.assertNull(cache.get(key2)); + + cache.put(key2, val2); + Assert.assertNull(cache.get(key1)); + Assert.assertArrayEquals(val2, cache.get(key2)); + Assert.assertEquals(34, cache.getCache().stats().evictionWeight()); + } + + @Test + public void testSizeCalculation() + { + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return 40; + } + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); + final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); + final Cache cache = CaffeineCache.create(config, Runnable::run); + + CacheStats stats = cache.getStats(); + Assert.assertEquals(0L, stats.getNumEntries()); + Assert.assertEquals(0L, stats.getSizeInBytes()); + + cache.put(key1, val1); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(34L, stats.getSizeInBytes()); + + cache.put(key2, val2); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(34L, stats.getSizeInBytes()); + } + + @Test + public void testSizeCalculationAfterDelete() + { + final String namespace = "the"; + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return 999999; + } + + @Override + public boolean isEvictOnClose() + { + return true; + } + + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey(namespace, s1); + final Cache.NamedKey key2 = new Cache.NamedKey(namespace, s2); + final Cache cache = CaffeineCache.create(config, Runnable::run); + + CacheStats stats = cache.getStats(); + Assert.assertEquals(0L, stats.getNumEntries()); + Assert.assertEquals(0L, stats.getSizeInBytes()); + + cache.put(key1, val1); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(34L, stats.getSizeInBytes()); + + cache.put(key2, val2); + + stats = cache.getStats(); + Assert.assertEquals(2L, stats.getNumEntries()); + Assert.assertEquals(68L, stats.getSizeInBytes()); + + cache.close(namespace); + stats = cache.getStats(); + Assert.assertEquals(0, stats.getNumEntries()); + Assert.assertEquals(0, stats.getSizeInBytes()); + } + + + @Test + public void testSizeCalculationMore() + { + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return 400; + } + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); + final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); + final Cache cache = CaffeineCache.create(config, Runnable::run); + + CacheStats stats = cache.getStats(); + Assert.assertEquals(0L, stats.getNumEntries()); + Assert.assertEquals(0L, stats.getSizeInBytes()); + + cache.put(key1, val1); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(34L, stats.getSizeInBytes()); + + cache.put(key2, val2); + + stats = cache.getStats(); + Assert.assertEquals(2L, stats.getNumEntries()); + Assert.assertEquals(68L, stats.getSizeInBytes()); + } + + @Test + public void testSizeCalculationNoWeight() + { + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return -1; + } + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); + final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); + final Cache cache = CaffeineCache.create(config, Runnable::run); + + CacheStats stats = cache.getStats(); + Assert.assertEquals(0L, stats.getNumEntries()); + Assert.assertEquals(-1L, stats.getSizeInBytes()); + + cache.put(key1, val1); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(-1L, stats.getSizeInBytes()); + + cache.put(key2, val2); + + stats = cache.getStats(); + Assert.assertEquals(2L, stats.getNumEntries()); + Assert.assertEquals(-1L, stats.getSizeInBytes()); + } + + @Test + public void testFromProperties() + { + final String keyPrefix = "cache.config.prefix"; + final Properties properties = new Properties(); + properties.put(keyPrefix + ".expireAfter", "10"); + properties.put(keyPrefix + ".sizeInBytes", "100"); + properties.put(keyPrefix + ".cacheExecutorFactory", "single_thread"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class); + } + ) + ); + final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class); + final JsonConfigProvider caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of( + keyPrefix, + CaffeineCacheConfig.class + ); + caffeineCacheConfigJsonConfigProvider.inject(properties, configurator); + final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get(); + Assert.assertEquals(10, config.getExpireAfter()); + Assert.assertEquals(100, config.getSizeInBytes()); + Assert.assertNotNull(config.createExecutor()); + } + + @Test + public void testMixedCaseFromProperties() + { + final String keyPrefix = "cache.config.prefix"; + final Properties properties = new Properties(); + properties.put(keyPrefix + ".expireAfter", "10"); + properties.put(keyPrefix + ".sizeInBytes", "100"); + properties.put(keyPrefix + ".cacheExecutorFactory", "CoMmON_FjP"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class); + } + ) + ); + final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class); + final JsonConfigProvider caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of( + keyPrefix, + CaffeineCacheConfig.class + ); + caffeineCacheConfigJsonConfigProvider.inject(properties, configurator); + final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get(); + Assert.assertEquals(10, config.getExpireAfter()); + Assert.assertEquals(100, config.getSizeInBytes()); + Assert.assertNull(config.createExecutor()); + } + + @Test + public void testDefaultFromProperties() + { + final String keyPrefix = "cache.config.prefix"; + final Properties properties = new Properties(); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class); + } + ) + ); + final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class); + final JsonConfigProvider caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of( + keyPrefix, + CaffeineCacheConfig.class + ); + caffeineCacheConfigJsonConfigProvider.inject(properties, configurator); + final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get(); + Assert.assertEquals(-1, config.getExpireAfter()); + Assert.assertEquals(-1, config.getSizeInBytes()); + Assert.assertNull(config.createExecutor()); + } + + public int get(Cache cache, Cache.NamedKey key) + { + return Ints.fromByteArray(cache.get(key)); + } + + public void put(Cache cache, Cache.NamedKey key, Integer value) + { + cache.put(key, Ints.toByteArray(value)); + } +} + +class CaffeineCacheProviderWithConfig extends CaffeineCacheProvider +{ + private final CaffeineCacheConfig config; + + @Inject + public CaffeineCacheProviderWithConfig(CaffeineCacheConfig config) + { + this.config = config; + } + + @Override + public Cache get() + { + return CaffeineCache.create(config); + } +} diff --git a/pom.xml b/pom.xml index 8049228605eb..95f7067c580f 100644 --- a/pom.xml +++ b/pom.xml @@ -823,5 +823,14 @@ + + java8 + + 1.8 + + + extensions-core/caffeine-cache + + From eca2df254c65fb2b42ebb1737f4d126f8bc70ddd Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 26 May 2016 14:46:19 -0700 Subject: [PATCH 02/13] Address code comments --- .../io/druid/client/cache/CaffeineCache.java | 20 ++++++++-------- .../client/cache/CaffeineDruidModule.java | 2 +- .../druid/client/cache/CaffeineCacheTest.java | 23 +++++++++++-------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java index c98d94dceb23..317a358491b6 100644 --- a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.primitives.Chars; @@ -44,6 +45,10 @@ public class CaffeineCache implements io.druid.client.cache.Cache { private static final Logger log = new Logger(CaffeineCache.class); private static final int FIXED_COST = 8; // Minimum cost in "weight" per entry; + private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance(); + private static final LZ4FastDecompressor LZ4_DECOMPRESSOR = LZ4_FACTORY.fastDecompressor(); + private static final LZ4Compressor LZ4_COMPRESSOR = LZ4_FACTORY.fastCompressor(); + private final Cache cache; private final AtomicReference priorStats = new AtomicReference<>(null); private final CaffeineCacheConfig config; @@ -76,7 +81,7 @@ public static CaffeineCache create(final CaffeineCacheConfig config, @Nullable f return new CaffeineCache(builder.build(), config); } - public CaffeineCache(final Cache cache, CaffeineCacheConfig config) + private CaffeineCache(final Cache cache, CaffeineCacheConfig config) { this.cache = cache; this.config = config; @@ -107,7 +112,7 @@ public Map getBulk(Iterable keys) public void close(String namespace) { if (config.isEvictOnClose()) { - cache.asMap().keySet().stream().filter(key -> key.namespace.equals(namespace)).forEach(cache::invalidate); + cache.asMap().keySet().removeIf(key -> key.namespace.equals(namespace)); } } @@ -163,15 +168,12 @@ public void doMonitor(ServiceEmitter emitter) } } + @VisibleForTesting Cache getCache() { return cache; } - private final LZ4Factory factory = LZ4Factory.fastestInstance(); - private final LZ4FastDecompressor decompressor = factory.fastDecompressor(); - private final LZ4Compressor compressor = factory.fastCompressor(); - private byte[] deserialize(byte[] bytes) { if (bytes == null) { @@ -179,15 +181,15 @@ private byte[] deserialize(byte[] bytes) } final int decompressedLen = ByteBuffer.wrap(bytes).getInt(); final byte[] out = new byte[decompressedLen]; - decompressor.decompress(bytes, Ints.BYTES, out, 0, out.length); + LZ4_DECOMPRESSOR.decompress(bytes, Ints.BYTES, out, 0, out.length); return out; } private byte[] serialize(byte[] value) { - final int len = compressor.maxCompressedLength(value.length); + final int len = LZ4_COMPRESSOR.maxCompressedLength(value.length); final byte[] out = new byte[len]; - final int compressedSize = compressor.compress(value, 0, value.length, out, 0); + final int compressedSize = LZ4_COMPRESSOR.compress(value, 0, value.length, out, 0); return ByteBuffer.allocate(compressedSize + Ints.BYTES) .putInt(value.length) .put(out, 0, compressedSize) diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java index fbadf4926876..20adb814bc44 100644 --- a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java @@ -40,7 +40,7 @@ public void configure(Binder binder) public List getJacksonModules() { return ImmutableList.of( - new SimpleModule("DruidCaffeineCache-" + System.identityHashCode(this)) + new SimpleModule("DruidCaffeineCache") .registerSubtypes(CaffeineCacheProvider.class) ); } diff --git a/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java index 43c67a87b605..ca42959d11c7 100644 --- a/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java +++ b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java @@ -33,13 +33,14 @@ import io.druid.guice.JsonConfigurator; import io.druid.guice.ManageLifecycle; import io.druid.initialization.Initialization; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.UUID; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; public class CaffeineCacheTest { @@ -47,7 +48,14 @@ public class CaffeineCacheTest private static final byte[] HO = "hooooooooooooooooooo".getBytes(); private CaffeineCache cache; - private final CaffeineCacheConfig cacheConfig = new CaffeineCacheConfig(); + private final CaffeineCacheConfig cacheConfig = new CaffeineCacheConfig() + { + @Override + public boolean isEvictOnClose() + { + return true; + } + }; @Before public void setUp() throws Exception @@ -120,10 +128,8 @@ public void testBaseOps() throws Exception put(cache, aKey, 1); Assert.assertEquals(1, get(cache, aKey)); - /* Lazily deleted by LRU cache.close("a"); Assert.assertNull(cache.get(aKey)); - */ final Cache.NamedKey hiKey = new Cache.NamedKey("the", HI); final Cache.NamedKey hoKey = new Cache.NamedKey("the", HO); @@ -132,16 +138,15 @@ public void testBaseOps() throws Exception Assert.assertEquals(10, get(cache, hiKey)); Assert.assertEquals(20, get(cache, hoKey)); cache.close("the"); - /* Lazily deleted by LRU + Assert.assertNull(cache.get(hiKey)); Assert.assertNull(cache.get(hoKey)); - */ Assert.assertNull(cache.get(new Cache.NamedKey("miss", HI))); final CacheStats stats = cache.getStats(); Assert.assertEquals(3, stats.getNumHits()); - Assert.assertEquals(2, stats.getNumMisses()); + Assert.assertEquals(5, stats.getNumMisses()); } @Test From e580a27525afa41950a6803e82ef5f6869b91e53 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 26 May 2016 14:48:39 -0700 Subject: [PATCH 03/13] Move and fixup README.md a bit --- .../content/development/extensions-core/caffeine-cache.md | 4 ++++ 1 file changed, 4 insertions(+) rename extensions-core/caffeine-cache/README.md => docs/content/development/extensions-core/caffeine-cache.md (98%) diff --git a/extensions-core/caffeine-cache/README.md b/docs/content/development/extensions-core/caffeine-cache.md similarity index 98% rename from extensions-core/caffeine-cache/README.md rename to docs/content/development/extensions-core/caffeine-cache.md index 5fb09b3ce9b2..8a8f793ed082 100644 --- a/extensions-core/caffeine-cache/README.md +++ b/docs/content/development/extensions-core/caffeine-cache.md @@ -1,3 +1,7 @@ +--- +layout: doc_page +--- + Druid Caffeine Cache -------------------- From e9d1025a7f28b5f4043d3c10b483dac954070d79 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 26 May 2016 14:53:26 -0700 Subject: [PATCH 04/13] Improve caffeine readme information --- .../extensions-core/caffeine-cache.md | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/docs/content/development/extensions-core/caffeine-cache.md b/docs/content/development/extensions-core/caffeine-cache.md index 8a8f793ed082..2cef8b9712f6 100644 --- a/docs/content/development/extensions-core/caffeine-cache.md +++ b/docs/content/development/extensions-core/caffeine-cache.md @@ -5,23 +5,7 @@ layout: doc_page Druid Caffeine Cache -------------------- -A local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE with a fix for https://bugs.openjdk.java.net/browse/JDK-8078490 - -# Versioning - -The versioning works like this: `druid_version.patch_set`. Such that `druid_version` is the version of druid the extension was compiled against, and `patch_set` is the "version" of the extension. - -# How to use -The maven artifact coordinate for this extension is `io.druid.extensions:druid-caffeine-cache` - -For Druid 0.9.0 and later, the extension can be included by pulling the jars using the `pull-deps` tool, and including the extension directory in the extension load list. - -### Jars - -For the sake of sanity if you are manually making an extension directory, release requires the following jars: - -* caffeine-2.3.0.jar -* druid-caffeine-cache-0.9.1.jar (or whatever the correct version for druid is that you are using) +A highly performant local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE with a fix for https://bugs.openjdk.java.net/browse/JDK-8078490 # Configuration Below are the configuration options known to this module: @@ -33,6 +17,16 @@ Below are the configuration options known to this module: |`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or `SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)| |`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment from a node) should cause an eager eviction of associated cache values|`false`| +## `druid.cache.cacheExecutorFactory` + +Here are the possible values for `druid.cache.cacheExecutorFactory`, which controls how maintenance tasks are run + +* `COMMON_FJP` (default) use the common ForkJoinPool. Do NOT use this option unless you are running 8u60 or higher +* `SINGLE_THREAD` Use a single-threaded executor +* `SAME_THREAD` Cache maintenance is done eagerly + +# Enabling + To enable the caffeine cache, include this module on the loadList and set `druid.cache.type` to `caffeine` in your properties. # Metrics From 3cd5ebaf5a0cc35183c168588f68686041ca7ecf Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 26 May 2016 15:01:06 -0700 Subject: [PATCH 05/13] Cleanup caffeine pom --- extensions-core/caffeine-cache/pom.xml | 90 +------------------------- 1 file changed, 1 insertion(+), 89 deletions(-) diff --git a/extensions-core/caffeine-cache/pom.xml b/extensions-core/caffeine-cache/pom.xml index 6430d1de237d..43cbbb2f5b3d 100644 --- a/extensions-core/caffeine-cache/pom.xml +++ b/extensions-core/caffeine-cache/pom.xml @@ -25,7 +25,7 @@ io.druid.extensions druid-caffeine-cache druid-caffeine-cache - druid-caffeine-cache + Local cache implementation for Druid using Caffeine https://github.com/ben-manes/caffeine as the underlying implementation io.druid @@ -34,29 +34,6 @@ ../../pom.xml - - - Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0 - - - - - - Charles Allen - charles.allen@metamarkets.com - Metamarkets Group Inc. - https://www.metamarkets.com - - - - - scm:git:ssh://git@github.com/metamx/druid-cache-caffeine.git - scm:git:ssh://git@github.com/metamx/druid-cache-caffeine.git - https://github.com/metamx/druid-cache-caffeine.git - HEAD - - io.druid @@ -100,69 +77,4 @@ - - - - release - - - sonatype-nexus-staging - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - - - - - - org.apache.maven.plugins - maven-source-plugin - - - attach-sources - - jar-no-fork - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - attach-javadocs - - jar - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.6 - - - sign-artifacts - verify - - sign - - - - - - org.sonatype.plugins - nexus-staging-maven-plugin - true - - sonatype-nexus-staging - https://oss.sonatype.org/ - false - - - - - - From 2d860c42d7f210be77a9041d74dc0febe6558502 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 31 May 2016 10:44:31 -0700 Subject: [PATCH 06/13] Address review comments --- .../client/cache/CacheExecutorFactory.java | 3 +- .../io/druid/client/cache/CaffeineCache.java | 24 ++++------- .../druid/client/cache/CaffeineCacheTest.java | 42 +++++++------------ 3 files changed, 27 insertions(+), 42 deletions(-) diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java index f9efa306e04f..a2c26159882e 100644 --- a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import io.druid.concurrent.Execs; import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; public enum CacheExecutorFactory { @@ -29,7 +30,7 @@ public enum CacheExecutorFactory @Override public Executor createExecutor() { - return null; + return ForkJoinPool.commonPool(); } }, SINGLE_THREAD { diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java index 317a358491b6..5903183a287f 100644 --- a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java @@ -30,16 +30,16 @@ import com.metamx.common.logger.Logger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + import java.nio.ByteBuffer; import java.util.Map; import java.util.OptionalLong; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; -import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.lz4.LZ4FastDecompressor; public class CaffeineCache implements io.druid.client.cache.Cache { @@ -50,7 +50,7 @@ public class CaffeineCache implements io.druid.client.cache.Cache private static final LZ4Compressor LZ4_COMPRESSOR = LZ4_FACTORY.fastCompressor(); private final Cache cache; - private final AtomicReference priorStats = new AtomicReference<>(null); + private final AtomicReference priorStats = new AtomicReference<>(CacheStats.empty()); private final CaffeineCacheConfig config; @@ -60,7 +60,7 @@ public static CaffeineCache create(final CaffeineCacheConfig config) } // Used in testing - public static CaffeineCache create(final CaffeineCacheConfig config, @Nullable final Executor executor) + public static CaffeineCache create(final CaffeineCacheConfig config, final Executor executor) { Caffeine builder = Caffeine.newBuilder().recordStats(); if (config.getExpireAfter() >= 0) { @@ -75,9 +75,7 @@ public static CaffeineCache create(final CaffeineCacheConfig config, @Nullable f + key.namespace.length() * Chars.BYTES + FIXED_COST); } - if (executor != null) { - builder.executor(executor); - } + builder.executor(executor); return new CaffeineCache(builder.build(), config); } @@ -146,12 +144,8 @@ public void doMonitor(ServiceEmitter emitter) { final CacheStats oldStats = priorStats.get(); final CacheStats newStats = cache.stats(); - final CacheStats deltaStats; - if (oldStats == null) { - deltaStats = newStats; - } else { - deltaStats = newStats.minus(oldStats); - } + final CacheStats deltaStats = newStats.minus(oldStats); + final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); emitter.emit(builder.build("query/cache/caffeine/delta/requests", deltaStats.requestCount())); emitter.emit(builder.build("query/cache/caffeine/total/requests", newStats.requestCount())); diff --git a/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java index ca42959d11c7..20e6d14268ba 100644 --- a/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java +++ b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; -import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Module; @@ -41,6 +40,7 @@ import java.util.Properties; import java.util.Random; import java.util.UUID; +import java.util.concurrent.ForkJoinPool; public class CaffeineCacheTest { @@ -68,18 +68,13 @@ public void testBasicInjection() throws Exception { final CaffeineCacheConfig config = new CaffeineCacheConfig(); Injector injector = Initialization.makeInjectorWithModules( - GuiceInjectors.makeStartupInjector(), ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); - - binder.bind(CaffeineCacheConfig.class).toInstance(config); - binder.bind(Cache.class).toProvider(CaffeineCacheProviderWithConfig.class).in(ManageLifecycle.class); - } + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(CaffeineCacheConfig.class).toInstance(config); + binder.bind(Cache.class).toProvider(CaffeineCacheProviderWithConfig.class).in(ManageLifecycle.class); } ) ); @@ -101,17 +96,12 @@ public void testSimpleInjection() System.setProperty(uuid + ".type", "caffeine"); final Injector injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); - - binder.bind(Cache.class).toProvider(CacheProvider.class); - JsonConfigProvider.bind(binder, uuid, CacheProvider.class); - } + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(Cache.class).toProvider(CacheProvider.class); + JsonConfigProvider.bind(binder, uuid, CacheProvider.class); } ) ); @@ -435,7 +425,7 @@ public void testMixedCaseFromProperties() final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get(); Assert.assertEquals(10, config.getExpireAfter()); Assert.assertEquals(100, config.getSizeInBytes()); - Assert.assertNull(config.createExecutor()); + Assert.assertEquals(ForkJoinPool.commonPool(), config.createExecutor()); } @Test @@ -462,7 +452,7 @@ public void testDefaultFromProperties() final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get(); Assert.assertEquals(-1, config.getExpireAfter()); Assert.assertEquals(-1, config.getSizeInBytes()); - Assert.assertNull(config.createExecutor()); + Assert.assertEquals(ForkJoinPool.commonPool(), config.createExecutor()); } public int get(Cache cache, Cache.NamedKey key) From c6f96063b6617bad2450befb96b1893503b8dcce Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Wed, 15 Jun 2016 14:29:27 -0700 Subject: [PATCH 07/13] Bump caffeine to 2.3.1 --- extensions-core/caffeine-cache/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/caffeine-cache/pom.xml b/extensions-core/caffeine-cache/pom.xml index 43cbbb2f5b3d..3fac1eb33da5 100644 --- a/extensions-core/caffeine-cache/pom.xml +++ b/extensions-core/caffeine-cache/pom.xml @@ -50,7 +50,7 @@ com.github.ben-manes.caffeine caffeine - 2.3.0 + 2.3.1 net.jpountz.lz4 From b584da9e54fdb9d67392a1053993cfe9ce1a5c7a Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 21 Jun 2016 13:37:14 -0700 Subject: [PATCH 08/13] Bump druid version to 0.9.2-SNAPSHOT --- extensions-core/caffeine-cache/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/caffeine-cache/pom.xml b/extensions-core/caffeine-cache/pom.xml index 3fac1eb33da5..5d3da168ca5c 100644 --- a/extensions-core/caffeine-cache/pom.xml +++ b/extensions-core/caffeine-cache/pom.xml @@ -30,7 +30,7 @@ io.druid druid - 0.9.1-SNAPSHOT + 0.9.2-SNAPSHOT ../../pom.xml From c5869ae02e5d8b5777216415f49d8f155d2934c6 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 21 Jun 2016 21:59:48 -0700 Subject: [PATCH 09/13] Make test not fail randomly. See https://github.com/ben-manes/caffeine/pull/93#issuecomment-227617998 for an explanation --- .../druid/client/cache/CaffeineCacheTest.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java index 20e6d14268ba..373bea20ef38 100644 --- a/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java +++ b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java @@ -36,6 +36,8 @@ import org.junit.Before; import org.junit.Test; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.Map; import java.util.Properties; import java.util.Random; @@ -44,6 +46,7 @@ public class CaffeineCacheTest { + private static final int RANDOM_SEED = 3478178; private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); private static final byte[] HO = "hooooooooooooooooooo".getBytes(); @@ -169,7 +172,7 @@ public void testGetBulk() throws Exception } @Test - public void testSizeEviction() throws InterruptedException + public void testSizeEviction() throws Exception { final CaffeineCacheConfig config = new CaffeineCacheConfig() { @@ -187,6 +190,7 @@ public long getSizeInBytes() final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); final CaffeineCache cache = CaffeineCache.create(config, Runnable::run); + forceRandomSeed(cache); Assert.assertNull(cache.get(key1)); Assert.assertNull(cache.get(key2)); @@ -349,7 +353,7 @@ public long getSizeInBytes() random.nextBytes(val2); final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); - final Cache cache = CaffeineCache.create(config, Runnable::run); + final CaffeineCache cache = CaffeineCache.create(config, Runnable::run); CacheStats stats = cache.getStats(); Assert.assertEquals(0L, stats.getNumEntries()); @@ -464,6 +468,18 @@ public void put(Cache cache, Cache.NamedKey key, Integer value) { cache.put(key, Ints.toByteArray(value)); } + + // See + public static void forceRandomSeed(CaffeineCache cache) throws Exception + { + final Map map = cache.getCache().asMap(); + final Method getFrequencySketch = map.getClass().getDeclaredMethod("frequencySketch"); + getFrequencySketch.setAccessible(true); + final Object frequencySketch = getFrequencySketch.invoke(map); + final Field seedField = frequencySketch.getClass().getDeclaredField("randomSeed"); + seedField.setAccessible(true); + seedField.setInt(frequencySketch, RANDOM_SEED); + } } class CaffeineCacheProviderWithConfig extends CaffeineCacheProvider From 64c3ea2578fb778dc4f3643afbf68e93e4500ae3 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 1 Jul 2016 16:24:45 -0700 Subject: [PATCH 10/13] Fix distribution and documentation --- distribution/pom.xml | 49 +++++++++++++++++++ .../extensions-core/caffeine-cache.md | 2 +- pom.xml | 15 ++++-- 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/distribution/pom.xml b/distribution/pom.xml index a1db657935a3..20038bf90333 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -152,4 +152,53 @@ + + + java8 + + 1.8 + + + + + org.codehaus.mojo + exec-maven-plugin + + + pull-deps-jdk8 + package + + exec + + + java + + -classpath + + -Ddruid.extensions.loadList=[] + -Ddruid.extensions.directory=${project.build.directory}/extensions + + + -Ddruid.extensions.hadoopDependenciesDir=${project.build.directory}/hadoop-dependencies + + io.druid.cli.Main + tools + pull-deps + --defaultVersion + ${project.parent.version} + -l + ${settings.localRepository} + + --no-default-hadoop + -c + io.druid.extensions:druid-caffeine-cache + + + + + + + + + diff --git a/docs/content/development/extensions-core/caffeine-cache.md b/docs/content/development/extensions-core/caffeine-cache.md index 2cef8b9712f6..b9d739904b2d 100644 --- a/docs/content/development/extensions-core/caffeine-cache.md +++ b/docs/content/development/extensions-core/caffeine-cache.md @@ -5,7 +5,7 @@ layout: doc_page Druid Caffeine Cache -------------------- -A highly performant local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE with a fix for https://bugs.openjdk.java.net/browse/JDK-8078490 +A highly performant local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE8u60 or higher # Configuration Below are the configuration options known to this module: diff --git a/pom.xml b/pom.xml index 10e3f3c84210..d7d2f5a37c44 100644 --- a/pom.xml +++ b/pom.xml @@ -104,9 +104,6 @@ extensions-contrib/distinctcount extensions-contrib/parquet-extensions extensions-contrib/statsd-emitter - - - distribution @@ -840,6 +837,18 @@ extensions-core/caffeine-cache + + distribution + + + + java7 + + 1.7 + + + + distribution From 6d15b72f08ee7b221d00f5fbb7853438e759090d Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 1 Jul 2016 16:27:14 -0700 Subject: [PATCH 11/13] Add caffeine to extensions.md --- docs/content/development/extensions.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 14dbeb389cc3..f461487e301d 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -28,7 +28,8 @@ Core extensions are maintained by Druid committers. |druid-kafka-eight|Kafka ingest firehose (high level consumer).|[link](../development/extensions-core/kafka-eight-firehose.html)| |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)| |druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)| -|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)| +|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/caffeine-cache.html)| +|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/s3.html)| |mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)| |postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)| From c00469ae3f4d92831838de88fdfd9d70e6553379 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 1 Jul 2016 16:27:47 -0700 Subject: [PATCH 12/13] Fix links in extensions.md --- docs/content/development/extensions.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index f461487e301d..b456442bc775 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -28,8 +28,8 @@ Core extensions are maintained by Druid committers. |druid-kafka-eight|Kafka ingest firehose (high level consumer).|[link](../development/extensions-core/kafka-eight-firehose.html)| |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)| |druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)| -|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/caffeine-cache.html)| -|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/s3.html)| +|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)| +|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/caffeine-cache.html)| |mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)| |postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)| From e894bfa7abeeb19185571d1c5dbca6b099ac0600 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 1 Jul 2016 16:28:29 -0700 Subject: [PATCH 13/13] Lexicographic --- docs/content/development/extensions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index b456442bc775..e95c397149dd 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -22,6 +22,7 @@ Core extensions are maintained by Druid committers. |Name|Description|Docs| |----|-----------|----| |druid-avro-extensions|Support for data in Apache Avro data format.|[link](../development/extensions-core/avro.html)| +|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/caffeine-cache.html)| |druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-aggregators.html)| |druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)| |druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)| @@ -29,7 +30,6 @@ Core extensions are maintained by Druid committers. |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)| |druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)| |druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)| -|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/caffeine-cache.html)| |mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)| |postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)|