diff --git a/distribution/pom.xml b/distribution/pom.xml
index e35f2ced7f17..4dca3b03f514 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -230,6 +230,8 @@
-c
io.druid.extensions.contrib:druid-rabbitmq
-c
+ io.druid.extensions.contrib:druid-redis-cache
+ -c
io.druid.extensions.contrib:scan-query
-c
io.druid.extensions.contrib:sqlserver-metadata-storage
diff --git a/extensions-contrib/redis-cache/pom.xml b/extensions-contrib/redis-cache/pom.xml
new file mode 100644
index 000000000000..8f77f1138851
--- /dev/null
+++ b/extensions-contrib/redis-cache/pom.xml
@@ -0,0 +1,70 @@
+
+
+
+
+
+ 4.0.0
+
+ io.druid.extensions.contrib
+ druid-redis-cache
+ druid-redis-cache
+
+
+ io.druid
+ druid
+ 0.11.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+
+ io.druid
+ druid-api
+ ${project.parent.version}
+ provided
+
+
+ io.druid
+ druid-server
+ ${project.parent.version}
+ provided
+
+
+ redis.clients
+ jedis
+ 2.9.0
+
+
+
+
+ junit
+ junit
+ test
+
+
+ com.fiftyonred
+ mock-jedis
+ 0.4.0
+ test
+
+
+
+
diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java
new file mode 100644
index 000000000000..e23507d5c1a7
--- /dev/null
+++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.emitter.service.ServiceMetricEvent;
+import io.druid.java.util.common.logger.Logger;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.exceptions.JedisException;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class RedisCache implements Cache
+{
+ private static final Logger log = new Logger(RedisCache.class);
+
+ private JedisPool pool;
+ private RedisCacheConfig config;
+
+ private final AtomicLong hitCount = new AtomicLong(0);
+ private final AtomicLong missCount = new AtomicLong(0);
+ private final AtomicLong timeoutCount = new AtomicLong(0);
+ private final AtomicLong errorCount = new AtomicLong(0);
+
+ private final AtomicLong priorRequestCount = new AtomicLong(0);
+ // both get、put and getBulk will increase request count by 1
+ private final AtomicLong totalRequestCount = new AtomicLong(0);
+
+ private RedisCache(JedisPool pool, RedisCacheConfig config)
+ {
+ this.pool = pool;
+ this.config = config;
+ }
+
+ public static RedisCache create(final RedisCacheConfig config)
+ {
+ JedisPoolConfig poolConfig = new JedisPoolConfig();
+ poolConfig.setMaxTotal(config.getMaxTotalConnections());
+ poolConfig.setMaxIdle(config.getMaxIdleConnections());
+ poolConfig.setMinIdle(config.getMinIdleConnections());
+
+ JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout());
+ return new RedisCache(pool, config);
+ }
+
+ @Override
+ public byte[] get(NamedKey key)
+ {
+ totalRequestCount.incrementAndGet();
+
+ try (Jedis jedis = pool.getResource()) {
+ byte[] bytes = jedis.get(key.toByteArray());
+ if (bytes == null) {
+ missCount.incrementAndGet();
+ return null;
+ } else {
+ hitCount.incrementAndGet();
+ return bytes;
+ }
+ }
+ catch (JedisException e) {
+ if (e.getMessage().contains("Read timed out")) {
+ timeoutCount.incrementAndGet();
+ } else {
+ errorCount.incrementAndGet();
+ }
+ log.warn(e, "Exception pulling item from cache");
+ return null;
+ }
+ }
+
+ @Override
+ public void put(NamedKey key, byte[] value)
+ {
+ totalRequestCount.incrementAndGet();
+
+ try (Jedis jedis = pool.getResource()) {
+ jedis.psetex(key.toByteArray(), config.getExpiration(), value);
+ }
+ catch (JedisException e) {
+ errorCount.incrementAndGet();
+ log.warn(e, "Exception pushing item to cache");
+ }
+ }
+
+ @Override
+ public Map getBulk(Iterable keys)
+ {
+ totalRequestCount.incrementAndGet();
+
+ Map results = new HashMap<>();
+
+ try (Jedis jedis = pool.getResource()) {
+ List namedKeys = Lists.newArrayList(keys);
+ List byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
+
+ List byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));
+
+ for (int i = 0; i < byteValues.size(); ++i) {
+ if (byteValues.get(i) != null) {
+ results.put(namedKeys.get(i), byteValues.get(i));
+ }
+ }
+
+ hitCount.addAndGet(results.size());
+ missCount.addAndGet(namedKeys.size() - results.size());
+ }
+ catch (JedisException e) {
+ if (e.getMessage().contains("Read timed out")) {
+ timeoutCount.incrementAndGet();
+ } else {
+ errorCount.incrementAndGet();
+ }
+ log.warn(e, "Exception pulling items from cache");
+ }
+
+ return results;
+ }
+
+ @Override
+ public void close(String namespace)
+ {
+ // no resources to cleanup
+ }
+
+ @Override
+ public CacheStats getStats()
+ {
+ return new CacheStats(
+ hitCount.get(),
+ missCount.get(),
+ 0,
+ 0,
+ 0,
+ timeoutCount.get(),
+ errorCount.get()
+ );
+ }
+
+ @Override
+ public boolean isLocal()
+ {
+ return false;
+ }
+
+ @Override
+ public void doMonitor(ServiceEmitter emitter)
+ {
+ final long priorCount = priorRequestCount.get();
+ final long totalCount = totalRequestCount.get();
+ final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ emitter.emit(builder.build("query/cache/redis/total/requests", totalCount));
+ emitter.emit(builder.build("query/cache/redis/delta/requests", totalCount - priorCount));
+ if (!priorRequestCount.compareAndSet(priorCount, totalCount)) {
+ log.error("Prior value changed while I was reporting! updating anyways");
+ priorRequestCount.set(totalCount);
+ }
+ }
+
+ @VisibleForTesting
+ static RedisCache create(final JedisPool pool, final RedisCacheConfig config)
+ {
+ return new RedisCache(pool, config);
+ }
+}
diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java
new file mode 100644
index 000000000000..14c2d472e2b7
--- /dev/null
+++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RedisCacheConfig
+{
+ @JsonProperty
+ private String host;
+
+ @JsonProperty
+ private int port;
+
+ // milliseconds, default to one day
+ @JsonProperty
+ private long expiration = 24 * 3600 * 1000;
+
+ // milliseconds, the type is 'int' because current Jedis only accept 'int' for timeout
+ @JsonProperty
+ private int timeout = 2000;
+
+ // max connections of redis connection pool
+ @JsonProperty
+ private int maxTotalConnections = 8;
+
+ // max idle connections of redis connection pool
+ @JsonProperty
+ private int maxIdleConnections = 8;
+
+ // min idle connections of redis connection pool
+ @JsonProperty
+ private int minIdleConnections = 0;
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public long getExpiration()
+ {
+ return expiration;
+ }
+
+ public int getTimeout()
+ {
+ return timeout;
+ }
+
+ public int getMaxTotalConnections()
+ {
+ return maxTotalConnections;
+ }
+
+ public int getMaxIdleConnections()
+ {
+ return maxIdleConnections;
+ }
+
+ public int getMinIdleConnections()
+ {
+ return minIdleConnections;
+ }
+}
diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java
new file mode 100644
index 000000000000..4b743309680b
--- /dev/null
+++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("redis")
+public class RedisCacheProvider extends RedisCacheConfig implements CacheProvider
+{
+ @Override
+ public Cache get()
+ {
+ return RedisCache.create(this);
+ }
+}
diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java
new file mode 100644
index 000000000000..c4814b2448dd
--- /dev/null
+++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class RedisDruidModule implements DruidModule
+{
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new SimpleModule("DruidRedisCache")
+ .registerSubtypes(RedisCacheProvider.class)
+ );
+ }
+}
diff --git a/extensions-contrib/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 000000000000..54730c71c161
--- /dev/null
+++ b/extensions-contrib/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.client.cache.RedisDruidModule
diff --git a/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java
new file mode 100644
index 000000000000..a0219bcdc7bd
--- /dev/null
+++ b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fiftyonred.mock_jedis.MockJedis;
+import com.fiftyonred.mock_jedis.MockJedisPool;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+import io.druid.guice.GuiceInjectors;
+import io.druid.guice.JsonConfigProvider;
+import io.druid.guice.ManageLifecycle;
+import io.druid.initialization.Initialization;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.lifecycle.Lifecycle;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.JedisPoolConfig;
+
+import java.util.Map;
+import java.util.UUID;
+
+public class RedisCacheTest
+{
+ private static final byte[] HI = StringUtils.toUtf8("hiiiiiiiiiiiiiiiiiii");
+ private static final byte[] HO = StringUtils.toUtf8("hooooooooooooooooooo");
+
+ private RedisCache cache;
+ private final RedisCacheConfig cacheConfig = new RedisCacheConfig()
+ {
+ @Override
+ public int getTimeout()
+ {
+ return 10;
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return 3600000;
+ }
+ };
+
+ @Before
+ public void setUp() throws Exception
+ {
+ JedisPoolConfig poolConfig = new JedisPoolConfig();
+ poolConfig.setMaxTotal(cacheConfig.getMaxTotalConnections());
+ poolConfig.setMaxIdle(cacheConfig.getMaxIdleConnections());
+ poolConfig.setMinIdle(cacheConfig.getMinIdleConnections());
+
+ MockJedisPool pool = new MockJedisPool(poolConfig, "localhost");
+ // orginal MockJedis do not support 'milliseconds' in long type,
+ // for test we override to support it
+ pool.setClient(new MockJedis("localhost") {
+ @Override
+ public String psetex(byte[] key, long milliseconds, byte[] value)
+ {
+ return this.psetex(key, (int) milliseconds, value);
+ }
+ });
+
+ cache = RedisCache.create(pool, cacheConfig);
+ }
+
+ @Test
+ public void testBasicInjection() throws Exception
+ {
+ final RedisCacheConfig config = new RedisCacheConfig();
+ Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(), ImmutableList.of(
+ binder -> {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+ binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
+
+ binder.bind(RedisCacheConfig.class).toInstance(config);
+ binder.bind(Cache.class).toProvider(RedisCacheProviderWithConfig.class).in(ManageLifecycle.class);
+ }
+ )
+ );
+ Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
+ lifecycle.start();
+ try {
+ Cache cache = injector.getInstance(Cache.class);
+ Assert.assertEquals(RedisCache.class, cache.getClass());
+ }
+ finally {
+ lifecycle.stop();
+ }
+ }
+
+ @Test
+ public void testSimpleInjection()
+ {
+ final String uuid = UUID.randomUUID().toString();
+ System.setProperty(uuid + ".type", "redis");
+ final Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(), ImmutableList.of(
+ binder -> {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+ binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
+
+ binder.bind(Cache.class).toProvider(CacheProvider.class);
+ JsonConfigProvider.bind(binder, uuid, CacheProvider.class);
+ }
+ )
+ );
+ final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class);
+ Assert.assertNotNull(cacheProvider);
+ Assert.assertEquals(RedisCacheProvider.class, cacheProvider.getClass());
+ }
+
+ @Test
+ public void testSanity() throws Exception
+ {
+ Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
+ put(cache, "a", HI, 0);
+ Assert.assertEquals(0, get(cache, "a", HI));
+ Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
+
+ put(cache, "the", HI, 1);
+ Assert.assertEquals(0, get(cache, "a", HI));
+ Assert.assertEquals(1, get(cache, "the", HI));
+
+ put(cache, "the", HO, 10);
+ Assert.assertEquals(0, get(cache, "a", HI));
+ Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
+ Assert.assertEquals(1, get(cache, "the", HI));
+ Assert.assertEquals(10, get(cache, "the", HO));
+
+ cache.close("the");
+ Assert.assertEquals(0, get(cache, "a", HI));
+ Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
+ }
+
+ @Test
+ public void testGetBulk() throws Exception
+ {
+ Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
+
+ put(cache, "the", HI, 1);
+ put(cache, "the", HO, 10);
+
+ Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
+ Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
+ Cache.NamedKey key3 = new Cache.NamedKey("a", HI);
+
+ Map result = cache.getBulk(
+ Lists.newArrayList(
+ key1,
+ key2,
+ key3
+ )
+ );
+
+ Assert.assertEquals(1, Ints.fromByteArray(result.get(key1)));
+ Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
+ Assert.assertEquals(null, result.get(key3));
+ }
+
+ public void put(Cache cache, String namespace, byte[] key, Integer value)
+ {
+ cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
+ }
+
+ public int get(Cache cache, String namespace, byte[] key)
+ {
+ return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
+ }
+}
+
+class RedisCacheProviderWithConfig extends RedisCacheProvider
+{
+ private final RedisCacheConfig config;
+
+ @Inject
+ public RedisCacheProviderWithConfig(RedisCacheConfig config)
+ {
+ this.config = config;
+ }
+
+ @Override
+ public Cache get()
+ {
+ return RedisCache.create(config);
+ }
+}
+
diff --git a/pom.xml b/pom.xml
index 4eab80896f82..5f19e96d9ffb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
extensions-contrib/scan-query
extensions-contrib/sqlserver-metadata-storage
extensions-contrib/kafka-emitter
+ extensions-contrib/redis-cache
distribution