diff --git a/distribution/pom.xml b/distribution/pom.xml index 04e205158d1b..f9466bc6b656 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -95,6 +95,8 @@ io.druid.extensions:druid-s3-extensions -c io.druid.extensions:druid-cloudfiles-extensions + -c + io.druid.extensions:druid-redis-cache diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 7a259870993e..ef9819f81dab 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -232,7 +232,7 @@ You can enable caching of results at the broker, historical, or realtime level u |Property|Description|Default| |--------|-----------|-------| -|`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`| +|`druid.cache.type`|`local`, `memcached`, `redis`|The type of cache to use for queries.|`local`| |`druid.(broker|historical|realtime).cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]| |`druid.(broker|historical|realtime).cache.useCache`|Whether to use cache for getting query results.|false| |`druid.(broker|historical|realtime).cache.populateCache`|Whether to populate cache.|false| @@ -255,6 +255,16 @@ You can enable caching of results at the broker, historical, or realtime level u |`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)| |`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid| +#### Redis + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.cache.hosts`|Redis hosts|["127.0.0.1:6379"]| +|`druid.cache.timeout`|Connection / operations timeout|30000| +|`druid.cache.prefix`|Key prefix|druid| +|`druid.cache.expiration`|Redis key expiration time|-1 (no expiration)| +|`druid.cache.poolSize`|Size of Redis connections pool|1| + ### Indexing Service Discovery This config is used to find the [Indexing Service](../design/indexing-service.html) using Curator service discovery. Only required if you are actually running an indexing service. diff --git a/extensions/redis-cache/pom.xml b/extensions/redis-cache/pom.xml new file mode 100644 index 000000000000..471af4b68967 --- /dev/null +++ b/extensions/redis-cache/pom.xml @@ -0,0 +1,73 @@ + + + + + 4.0.0 + + io.druid.extensions + druid-redis-cache + druid-redis-cache + druid-redis-cache + + + io.druid + druid + 0.9.0-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + provided + + + io.druid + druid-server + ${project.parent.version} + provided + + + redis.clients + jedis + 2.7.2 + jar + compile + + + net.jpountz.lz4 + lz4 + provided + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + + junit + junit + test + + + diff --git a/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java new file mode 100644 index 000000000000..585e2591970c --- /dev/null +++ b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java @@ -0,0 +1,301 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.collections.LoadBalancingPool; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidResourceHolder; +import net.jpountz.lz4.LZ4Exception; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.ShardedJedis; +import redis.clients.jedis.ShardedJedisPipeline; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.util.Hashing; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +public class RedisCache implements Cache +{ + private static final Logger log = new Logger(RedisCache.class); + + private final Supplier> supplier; + private final String prefix; + private final long expiration; + private final RedisTranscoder transcoder; + private final AtomicLong hitCount = new AtomicLong(0); + private final AtomicLong missCount = new AtomicLong(0); + private final AtomicLong timeoutCount = new AtomicLong(0); + private final AtomicLong errorCount = new AtomicLong(0); + + public static Cache create(final RedisCacheConfig config) + { + final Supplier> clientSupplier; + + if (config.getPoolSize() > 1) { + clientSupplier = new LoadBalancingPool<>( + config.getPoolSize(), + new Supplier() + { + @Override + public ShardedJedis get() + { + return new ShardedJedis(config.getShardsInfo(), Hashing.MURMUR_HASH); + } + } + ); + } else { + clientSupplier = Suppliers.>ofInstance( + StupidResourceHolder.create(new ShardedJedis(config.getShardsInfo(), Hashing.MURMUR_HASH)) + ); + } + + return new RedisCache(clientSupplier, config); + } + + public RedisCache(Supplier> supplier, RedisCacheConfig config) + { + this.supplier = supplier; + prefix = config.getPrefix(); + expiration = config.getExpiration(); + transcoder = new RedisTranscoder(); + } + + @Override + public byte[] get(NamedKey key) + { + byte[] result = null; + + try (ResourceHolder clientHolder = supplier.get()) { + final ShardedJedis jedis = clientHolder.get(); + final byte[] itemKey = CacheImplUtils.computeKeyHash(prefix, key).getBytes(); + final byte[] serialized = jedis.get(itemKey); + + if (serialized != null) { + result = deserialize(key, serialized); + hitCount.incrementAndGet(); + } + else { + missCount.incrementAndGet(); + } + } + catch (IOException | JedisException | LZ4Exception ex) { + this.countException(ex); + log.warn(ex, "Error getting cache item"); + } + + return result; + } + + @Override + public void put(NamedKey key, byte[] value) + { + final byte[] serialized = serialize(key, value); + + try (ResourceHolder clientHolder = supplier.get()) { + final ShardedJedis jedis = clientHolder.get(); + + final byte[] itemKey = CacheImplUtils.computeKeyHash(prefix, key).getBytes(); + Jedis shard = jedis.getShard(itemKey); + + if (expiration > 0) { + shard.psetex(itemKey, expiration, serialized); + } + else { + shard.set(itemKey, serialized); + } + + final byte[] setKey = CacheImplUtils.computeNamespaceHash(prefix, key.namespace).getBytes(); + shard = jedis.getShard(setKey); + shard.sadd(setKey, key.key); + } + catch (IOException | JedisException ex) { + this.countException(ex); + log.warn(ex, "Error putting cache item"); + } + } + + @Override + public Map getBulk(Iterable keys) + { + final Map keyLookup = Maps.uniqueIndex( + keys, + new Function() + { + @Override + public byte[] apply( + @Nullable NamedKey input + ) + { + return CacheImplUtils.computeKeyHash(prefix, input).getBytes(); + } + } + ); + + // Sometimes broker passes empty keys list to getBulk() + if (keyLookup.size() == 0) { + return ImmutableMap.of(); + } + + final Map results = Maps.newHashMap(); + final byte[][] keysArray = Iterables.toArray(keyLookup.keySet(), byte[].class); + final List valuesList; + + try (ResourceHolder clientHolder = supplier.get()) { + final ShardedJedis jedis = clientHolder.get(); + final ShardedJedisPipeline pipeline = jedis.pipelined(); + + for (byte[] itemKey : keysArray) { + pipeline.get(itemKey); + } + + pipeline.sync(); + valuesList = pipeline.getResults(); + } + catch (IOException | JedisException ex) { + this.countException(ex); + log.warn(ex, "Unable to get cache items"); + return results; + } + + final int length = keysArray.length; + int i = 0; + + while (i < length) { + final Object value = valuesList.get(i); + if (value != null) { + final NamedKey key = keyLookup.get(keysArray[i]); + try { + final byte[] deserialized = deserialize(key, (byte []) value); + hitCount.incrementAndGet(); + results.put(key, deserialized); + } + catch (LZ4Exception ex) { + errorCount.incrementAndGet(); + log.warn(ex, "Error decompressing cache item"); + } + } + else { + missCount.incrementAndGet(); + } + + i++; + } + + return results; + } + + @Override + public void close(String namespace) + { + try (ResourceHolder clientHolder = supplier.get()) { + final ShardedJedis jedis = clientHolder.get(); + + final byte[] setKey = CacheImplUtils.computeNamespaceHash(prefix, namespace).getBytes(); + final Set keys = jedis.smembers(setKey); + if (keys.size() == 0) { + return; + } + + keys.add(setKey); + for (byte[] key : keys) { + final String itemKey = CacheImplUtils.computeKeyHash(prefix, namespace, key); + jedis.del(itemKey); + } + } + catch (JedisConnectionException | IOException ex) { + Throwables.propagate(ex); + } + catch (JedisException ex) { + log.warn(ex, "Error cleaning cache items"); + } + } + + @Override + public CacheStats getStats() + { + return new CacheStats( + hitCount.get(), + missCount.get(), + 0, + 0, + 0, + timeoutCount.get(), + errorCount.get() + ); + } + + @Override + public boolean isLocal() + { + return false; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + //TODO + } + + private void countException(Exception ex) + { + if (isTimeout(ex)) { + timeoutCount.incrementAndGet(); + } + else { + errorCount.incrementAndGet(); + } + } + + private static boolean isTimeout(Exception ex) + { + //FIXME NoSuchElementException is thrown not only in case of timeout + Throwable thr = ex.getCause(); + return thr instanceof NoSuchElementException || thr instanceof SocketTimeoutException; + } + + private byte[] deserialize(NamedKey key, byte[] bytes) + { + final byte[] data = transcoder.decompress(bytes); + return CacheImplUtils.deserializeValue(key, data); + } + + private byte[] serialize(NamedKey key, byte[] value) + { + final byte[] data = CacheImplUtils.serializeValue(key, value); + return transcoder.compress(data); + } +} diff --git a/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java new file mode 100644 index 000000000000..85d7b04b70ae --- /dev/null +++ b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java @@ -0,0 +1,71 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import redis.clients.jedis.JedisShardInfo; + +import java.util.LinkedList; +import java.util.List; + +public class RedisCacheConfig +{ + + @JsonProperty + private List hosts = ImmutableList.of("127.0.0.1:6379"); + + @JsonProperty + private int timeout = 30000; + + @JsonProperty + private String prefix = "druid"; + + @JsonProperty + private long expiration = -1; + + @JsonProperty + private int poolSize = 1; + + public int getPoolSize() + { + return poolSize; + } + + public String getPrefix() + { + return prefix; + } + + public long getExpiration() + { + return expiration; + } + + public List getShardsInfo() { + List shardsInfo = new LinkedList<>(); + for (String host : hosts) { + JedisShardInfo info = new JedisShardInfo(host); + info.setConnectionTimeout(timeout); + shardsInfo.add(info); + } + return shardsInfo; + } +} diff --git a/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java new file mode 100644 index 000000000000..bdd7d0184500 --- /dev/null +++ b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("redis") +public class RedisCacheProvider extends RedisCacheConfig implements CacheProvider +{ + @Override + public Cache get() + { + return RedisCache.create(this); + } +} diff --git a/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java new file mode 100644 index 000000000000..e83aa452955e --- /dev/null +++ b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java @@ -0,0 +1,56 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.Module; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +public class RedisDruidModule implements DruidModule { + + @Override + public void configure(Binder binder) { + + } + + @Override + public List getJacksonModules() { + return ImmutableList.of(new Module() { + @Override + public String getModuleName() { + return "DruidRedisCache-" + System.identityHashCode(this); + } + + @Override + public Version version() { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) { + context.registerSubtypes(RedisCacheProvider.class); + } + }); + } +} diff --git a/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisTranscoder.java b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisTranscoder.java new file mode 100644 index 000000000000..8e66dadc9676 --- /dev/null +++ b/extensions/redis-cache/src/main/java/io/druid/client/cache/RedisTranscoder.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +import java.nio.ByteBuffer; + +public class RedisTranscoder +{ + private static final Logger log = new Logger(RedisTranscoder.class); + private final LZ4Factory factory; + + public RedisTranscoder() + { + factory = LZ4Factory.fastestInstance(); + } + + public byte[] compress(byte[] in) + { + if (in == null) { + throw new NullPointerException("Can't compress null"); + } + + final LZ4Compressor compressor = factory.fastCompressor(); + final byte[] out = new byte[compressor.maxCompressedLength(in.length)]; + final int compressedLength = compressor.compress(in, 0, in.length, out, 0); + + return ByteBuffer.allocate(Ints.BYTES + compressedLength) + .putInt(in.length) + .put(out, 0, compressedLength) + .array(); + } + + public byte[] decompress(byte[] in) + { + final byte[] out; + if (in != null) { + final LZ4FastDecompressor decompressor = factory.fastDecompressor(); + final int size = ByteBuffer.wrap(in).getInt(); + + out = new byte[size]; + decompressor.decompress(in, Ints.BYTES, out, 0, out.length); + } else { + out = null; + } + return out; + } +} diff --git a/extensions/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..54730c71c161 --- /dev/null +++ b/extensions/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.client.cache.RedisDruidModule diff --git a/extensions/redis-cache/src/test/java/RedisCacheTest.java b/extensions/redis-cache/src/test/java/RedisCacheTest.java new file mode 100644 index 000000000000..5ccf86f883c5 --- /dev/null +++ b/extensions/redis-cache/src/test/java/RedisCacheTest.java @@ -0,0 +1,392 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheProvider; +import io.druid.client.cache.CacheStats; +import io.druid.client.cache.RedisCache; +import io.druid.client.cache.RedisCacheConfig; +import io.druid.client.cache.RedisCacheProvider; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidResourceHolder; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.Initialization; +import org.apache.commons.codec.binary.Hex; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisShardInfo; +import redis.clients.jedis.Response; +import redis.clients.jedis.ShardedJedis; +import redis.clients.jedis.ShardedJedisPipeline; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +public class RedisCacheTest +{ + private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); + private static final byte[] HO = "hooooooooooooooooooo".getBytes(); + + private RedisCache cache; + private final RedisCacheConfig cacheConfig = new RedisCacheConfig() + { + public int getPoolSize() + { + return 1; + } + + public String getPrefix() + { + return "druid"; + } + + public long getExpiration() + { + return -1; + } + + public List getShardsInfo() + { + return new ArrayList<>(); + } + }; + + @Before + public void setUp() throws Exception + { + cache = new RedisCache( + Suppliers.>ofInstance( + StupidResourceHolder.create(new MockRedisClient()) + ), + cacheConfig + ); + } + + @Test + public void testBasicInjection() throws Exception + { + final RedisCacheConfig config = new RedisCacheConfig() + { + @Override + public List getShardsInfo() + { + return new ArrayList<>(); + } + }; + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(RedisCacheConfig.class).toInstance(config); + binder.bind(Cache.class).toProvider(RedisCacheProviderWithConfig.class).in(ManageLifecycle.class); + } + } + ) + ); + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.start(); + try { + Cache cache = injector.getInstance(Cache.class); + Assert.assertEquals(RedisCache.class, cache.getClass()); + } + finally { + lifecycle.stop(); + } + } + + @Test + public void testSimpleInjection() + { + final String uuid = UUID.randomUUID().toString(); + System.setProperty(uuid + ".type", "redis"); + System.setProperty(uuid + ".hosts", "[\"localhost\"]"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(Cache.class).toProvider(CacheProvider.class); + JsonConfigProvider.bind(binder, uuid, CacheProvider.class); + } + } + ) + ); + final CacheProvider redisCacheProvider = injector.getInstance(CacheProvider.class); + Assert.assertNotNull(redisCacheProvider); + Assert.assertEquals(RedisCacheProvider.class, redisCacheProvider.getClass()); + } + + @Test + public void testBaseOps() throws Exception + { + final Cache.NamedKey aKey = new Cache.NamedKey("a", HI); + Assert.assertNull(cache.get(aKey)); + put(cache, aKey, 1); + Assert.assertEquals(1, get(cache, aKey)); + cache.close("a"); + Assert.assertNull(cache.get(aKey)); + + final Cache.NamedKey hiKey = new Cache.NamedKey("the", HI); + final Cache.NamedKey hoKey = new Cache.NamedKey("the", HO); + put(cache, hiKey, 10); + put(cache, hoKey, 20); + Assert.assertEquals(10, get(cache, hiKey)); + Assert.assertEquals(20, get(cache, hoKey)); + cache.close("the"); + Assert.assertNull(cache.get(hiKey)); + Assert.assertNull(cache.get(hoKey)); + + final CacheStats stats = cache.getStats(); + Assert.assertEquals(3, stats.getNumHits()); + Assert.assertEquals(4, stats.getNumMisses()); + } + + @Test + public void testGetBulk() throws Exception + { + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); + + Cache.NamedKey key1 = new Cache.NamedKey("the", HI); + put(cache, key1, 2); + + Cache.NamedKey key2 = new Cache.NamedKey("the", HO); + put(cache, key2, 10); + + Map result = cache.getBulk( + Lists.newArrayList( + key1, + key2 + ) + ); + + Assert.assertEquals(2, Ints.fromByteArray(result.get(key1))); + Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); + + Cache.NamedKey missingKey = new Cache.NamedKey("missing", HI); + result = cache.getBulk(Lists.newArrayList(missingKey)); + Assert.assertEquals(result.size(), 0); + + result = cache.getBulk(Lists.newArrayList()); + Assert.assertEquals(result.size(), 0); + } + + public int get(Cache cache, Cache.NamedKey key) + { + return Ints.fromByteArray(cache.get(key)); + } + + public void put(Cache cache, Cache.NamedKey key, Integer value) { + cache.put(key, Ints.toByteArray(value)); + } +} + +class RedisCacheProviderWithConfig extends RedisCacheProvider +{ + private final RedisCacheConfig config; + + @Inject + public RedisCacheProviderWithConfig(RedisCacheConfig config) + { + this.config = config; + } + + @Override + public Cache get() + { + return RedisCache.create(config); + } +} + + +@SuppressWarnings("unchecked") +class MockRedisClient extends ShardedJedis +{ + private final Map keyMap = new HashMap(); + + public MockRedisClient() + { + super(new ArrayList()); + } + + @Override + public Long del(byte[] key) + { + final String hex = Hex.encodeHexString(key); + keyMap.remove(hex); + return (long) 1; + } + + @Override + public Long del(String key) + { + final String hex = Hex.encodeHexString(key.getBytes()); + keyMap.remove(hex); + return (long) 1; + } + + @Override + public byte[] get(byte[] key) + { + final String hex = Hex.encodeHexString(key); + return (byte[]) keyMap.get(hex); + } + + @Override + public Jedis getShard(byte[] key) + { + return new Jedis() + { + @Override + public byte[] get(byte[] key) + { + return MockRedisClient.this.get(key); + } + + @Override + public String psetex(byte[] key, long milliseconds, byte[] value) + { + return MockRedisClient.this.set(key, value); + } + + @Override + public Long sadd(byte[] key, byte[]... members) + { + return MockRedisClient.this.sadd(key, members); + } + + @Override + public String set(byte[] key, byte[] value) + { + return MockRedisClient.this.set(key, value); + } + }; + } + + @Override + public ShardedJedisPipeline pipelined() { + return new MockPipeline(this); + } + + @Override + public Long sadd(byte[] key, byte[]... members) + { + final String hex = Hex.encodeHexString(key); + final Object value = keyMap.get(hex); + + if (value == null) { + final HashSet set = Sets.newHashSet(members); + keyMap.put(hex, set); + return (long) set.size(); + } + + final HashSet set = (HashSet) value; + long added = 0; + for (byte[] m : members) { + if (set.add(m)) { + ++added; + } + } + + return added; + } + + @Override + public String set(byte[] key, byte[] value) + { + final String hex = Hex.encodeHexString(key); + keyMap.put(hex, value); + return null; + } + + @Override + public Set smembers(byte[] key) + { + final String hex = Hex.encodeHexString(key); + Object value = keyMap.get(hex); + if (value == null) { + return new HashSet<>(); + } + + return (HashSet) value; + } +} + +class MockPipeline extends ShardedJedisPipeline { + private final List results = new ArrayList<>(); + private final ShardedJedis jedis; + + public MockPipeline(ShardedJedis jedis) { + this.jedis = jedis; + } + + @Override + public Response get(byte[] key) + { + results.add(jedis.get(key)); + return new MockResponse(); + } + + @Override + public List getResults() { + return results; + } + + @Override + public void sync() + { + // do nothing here + } +} + +class MockResponse extends Response { + public MockResponse() { + super(null); + } +} diff --git a/pom.xml b/pom.xml index c5c84cb2fe5d..e3616bc11284 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ extensions/cloudfiles-extensions extensions/datasketches extensions/avro-extensions + extensions/redis-cache distribution diff --git a/server/src/main/java/io/druid/client/cache/CacheImplUtils.java b/server/src/main/java/io/druid/client/cache/CacheImplUtils.java new file mode 100644 index 000000000000..8765627bad38 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/CacheImplUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.commons.codec.digest.DigestUtils; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +public class CacheImplUtils +{ + static String computeKeyHash(String prefix, String namespace, byte[] key) { + return String.format("%s:%s:%s", prefix, DigestUtils.sha1Hex(namespace), DigestUtils.sha1Hex(key)); + } + + public static String computeKeyHash(String prefix, Cache.NamedKey key) + { + // hash keys to keep things under 250 characters for memcached + return computeKeyHash(prefix, key.namespace, key.key); + } + + public static String computeNamespaceHash(String prefix, String namespace) { + return prefix + ':' + DigestUtils.sha1Hex(namespace); + } + + public static byte[] deserializeValue(Cache.NamedKey key, byte[] bytes) + { + ByteBuffer buf = ByteBuffer.wrap(bytes); + + final int keyLength = buf.getInt(); + byte[] keyBytes = new byte[keyLength]; + buf.get(keyBytes); + byte[] value = new byte[buf.remaining()]; + buf.get(value); + + Preconditions.checkState( + Arrays.equals(keyBytes, key.toByteArray()), + "Keys do not match, possible hash collision?" + ); + return value; + } + + public static byte[] serializeValue(Cache.NamedKey key, byte[] value) + { + byte[] keyBytes = key.toByteArray(); + return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length) + .putInt(keyBytes.length) + .put(keyBytes) + .put(value) + .array(); + } +} diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 26a5d60d6e8a..0c2b337a0828 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -31,7 +31,6 @@ import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; -import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -51,13 +50,10 @@ import net.spy.memcached.metrics.MetricType; import net.spy.memcached.ops.LinkedOperationQueueFactory; import net.spy.memcached.ops.OperationQueueFactory; -import org.apache.commons.codec.digest.DigestUtils; import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -441,7 +437,7 @@ public byte[] get(NamedKey key) try (ResourceHolder clientHolder = client.get()) { Future future; try { - future = clientHolder.get().asyncGet(computeKeyHash(memcachedPrefix, key)); + future = clientHolder.get().asyncGet(CacheImplUtils.computeKeyHash(memcachedPrefix, key)); } catch (IllegalStateException e) { // operation did not get queued in time (queue is full) @@ -456,7 +452,7 @@ public byte[] get(NamedKey key) } else { missCount.incrementAndGet(); } - return bytes == null ? null : deserializeValue(key, bytes); + return bytes == null ? null : CacheImplUtils.deserializeValue(key, bytes); } catch (TimeoutException e) { timeoutCount.incrementAndGet(); @@ -483,9 +479,9 @@ public void put(NamedKey key, byte[] value) { try (final ResourceHolder clientHolder = client.get()) { clientHolder.get().set( - computeKeyHash(memcachedPrefix, key), + CacheImplUtils.computeKeyHash(memcachedPrefix, key), expiration, - serializeValue(key, value) + CacheImplUtils.serializeValue(key, value) ); } catch (IllegalStateException e) { @@ -498,33 +494,6 @@ public void put(NamedKey key, byte[] value) } } - private static byte[] serializeValue(NamedKey key, byte[] value) - { - byte[] keyBytes = key.toByteArray(); - return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length) - .putInt(keyBytes.length) - .put(keyBytes) - .put(value) - .array(); - } - - private static byte[] deserializeValue(NamedKey key, byte[] bytes) - { - ByteBuffer buf = ByteBuffer.wrap(bytes); - - final int keyLength = buf.getInt(); - byte[] keyBytes = new byte[keyLength]; - buf.get(keyBytes); - byte[] value = new byte[buf.remaining()]; - buf.get(value); - - Preconditions.checkState( - Arrays.equals(keyBytes, key.toByteArray()), - "Keys do not match, possible hash collision?" - ); - return value; - } - @Override public Map getBulk(Iterable keys) { @@ -538,7 +507,7 @@ public String apply( @Nullable NamedKey input ) { - return computeKeyHash(memcachedPrefix, input); + return CacheImplUtils.computeKeyHash(memcachedPrefix, input); } } ); @@ -572,7 +541,7 @@ public String apply( if (value != null) { results.put( key, - deserializeValue(key, value) + CacheImplUtils.deserializeValue(key, value) ); } } @@ -607,12 +576,6 @@ public void close(String namespace) - 2 // length of separators ; - private static String computeKeyHash(String memcachedPrefix, NamedKey key) - { - // hash keys to keep things under 250 characters for memcached - return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key); - } - public boolean isLocal() { return false;