From 4aba5bdc62fc8de687145e6e41611b272525faf2 Mon Sep 17 00:00:00 2001 From: qiumingming Date: Mon, 31 Jul 2017 01:19:33 +0800 Subject: [PATCH 1/3] Redis cache extension --- distribution/pom.xml | 2 + extensions-contrib/redis-cache/pom.xml | 70 ++++++ .../io/druid/client/cache/RedisCache.java | 190 +++++++++++++++++ .../druid/client/cache/RedisCacheConfig.java | 86 ++++++++ .../client/cache/RedisCacheProvider.java | 32 +++ .../druid/client/cache/RedisDruidModule.java | 46 ++++ .../io.druid.initialization.DruidModule | 1 + .../io/druid/client/cache/RedisCacheTest.java | 201 ++++++++++++++++++ pom.xml | 1 + 9 files changed, 629 insertions(+) create mode 100644 extensions-contrib/redis-cache/pom.xml create mode 100644 extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java create mode 100644 extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java create mode 100644 extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java create mode 100644 extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java create mode 100644 extensions-contrib/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java diff --git a/distribution/pom.xml b/distribution/pom.xml index e35f2ced7f17..4dca3b03f514 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -230,6 +230,8 @@ -c io.druid.extensions.contrib:druid-rabbitmq -c + io.druid.extensions.contrib:druid-redis-cache + -c io.druid.extensions.contrib:scan-query -c io.druid.extensions.contrib:sqlserver-metadata-storage diff --git a/extensions-contrib/redis-cache/pom.xml b/extensions-contrib/redis-cache/pom.xml new file mode 100644 index 000000000000..8f77f1138851 --- /dev/null +++ b/extensions-contrib/redis-cache/pom.xml @@ -0,0 +1,70 @@ + + + + + + 4.0.0 + + io.druid.extensions.contrib + druid-redis-cache + druid-redis-cache + + + io.druid + druid + 0.11.0-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + ${project.parent.version} + provided + + + io.druid + druid-server + ${project.parent.version} + provided + + + redis.clients + jedis + 2.9.0 + + + + + junit + junit + test + + + com.fiftyonred + mock-jedis + 0.4.0 + test + + + + diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java new file mode 100644 index 000000000000..a0a252ae8750 --- /dev/null +++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java @@ -0,0 +1,190 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.java.util.common.logger.Logger; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.exceptions.JedisException; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +public class RedisCache implements Cache +{ + private static final Logger log = new Logger(MemcachedCache.class); + + private JedisPool pool; + private RedisCacheConfig config; + + private final AtomicLong hitCount = new AtomicLong(0); + private final AtomicLong missCount = new AtomicLong(0); + private final AtomicLong timeoutCount = new AtomicLong(0); + private final AtomicLong errorCount = new AtomicLong(0); + + private final AtomicReference priorRequestCount = new AtomicReference<>(new AtomicLong(0L)); + // both get、put and getBulk will increase request count by 1 + private final AtomicLong totalRequestCount = new AtomicLong(0); + + private RedisCache(JedisPool pool, RedisCacheConfig config) + { + this.pool = pool; + this.config = config; + } + + public static RedisCache create(final RedisCacheConfig config) + { + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(config.getMaxTotalConnections()); + poolConfig.setMaxIdle(config.getMaxIdleConnections()); + poolConfig.setMinIdle(config.getMinIdleConnections()); + + JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout()); + return new RedisCache(pool, config); + } + + @Override + public byte[] get(NamedKey key) + { + totalRequestCount.incrementAndGet(); + + try (Jedis jedis = pool.getResource()) { + byte[] bytes = jedis.get(key.toByteArray()); + if (bytes == null) { + missCount.incrementAndGet(); + return null; + } else { + hitCount.incrementAndGet(); + return bytes; + } + } + catch (JedisException e) { + if (e.getMessage().contains("Read timed out")) { + timeoutCount.incrementAndGet(); + } else { + errorCount.incrementAndGet(); + } + log.warn(e, "Exception pulling item from cache"); + return null; + } + } + + @Override + public void put(NamedKey key, byte[] value) + { + totalRequestCount.incrementAndGet(); + + try (Jedis jedis = pool.getResource()) { + jedis.setex(key.toByteArray(), config.getExpiration(), value); + } + catch (JedisException e) { + errorCount.incrementAndGet(); + log.warn(e, "Exception pushing item to cache"); + } + } + + @Override + public Map getBulk(Iterable keys) + { + totalRequestCount.incrementAndGet(); + + Map results = new HashMap<>(); + + try (Jedis jedis = pool.getResource()) { + List namedKeys = Lists.newArrayList(keys); + List byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray); + + List byteValues = jedis.mget(byteKeys.toArray(new byte[0][])); + + for (int i = 0; i < byteValues.size(); ++i) { + if (byteValues.get(i) != null) { + results.put(namedKeys.get(i), byteValues.get(i)); + } + } + + hitCount.addAndGet(results.size()); + missCount.addAndGet(namedKeys.size() - results.size()); + } + catch (JedisException e) { + if (e.getMessage().contains("Read timed out")) { + timeoutCount.incrementAndGet(); + } else { + errorCount.incrementAndGet(); + } + log.warn(e, "Exception pulling items from cache"); + } + + return results; + } + + @Override + public void close(String namespace) + { + // no resources to cleanup + } + + @Override + public CacheStats getStats() + { + return new CacheStats( + hitCount.get(), + missCount.get(), + 0, + 0, + 0, + timeoutCount.get(), + errorCount.get() + ); + } + + @Override + public boolean isLocal() + { + return false; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + final AtomicLong priorCount = priorRequestCount.get(); + final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + emitter.emit(builder.build("query/cache/redis/total/requests", totalRequestCount.get())); + emitter.emit(builder.build("query/cache/redis/delta/requests", totalRequestCount.get() - priorCount.get())); + if (!priorRequestCount.compareAndSet(priorCount, totalRequestCount)) { + log.error("Prior value changed while I was reporting! updating anyways"); + priorRequestCount.set(totalRequestCount); + } + } + + @VisibleForTesting + static RedisCache create(final JedisPool pool, final RedisCacheConfig config) + { + return new RedisCache(pool, config); + } +} diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java new file mode 100644 index 000000000000..7b6df1df5a46 --- /dev/null +++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java @@ -0,0 +1,86 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RedisCacheConfig +{ + @JsonProperty + private String host; + + @JsonProperty + private int port; + + // seconds + @JsonProperty + private int expiration = 24 * 3600; + + // milliseconds + @JsonProperty + private int timeout = 2000; + + // max connections of redis connection pool + @JsonProperty + private int maxTotalConnections = 8; + + // max idle connections of redis connection pool + @JsonProperty + private int maxIdleConnections = 8; + + // min idle connections of redis connection pool + @JsonProperty + private int minIdleConnections = 0; + + public String getHost() + { + return host; + } + + public int getPort() + { + return port; + } + + public int getExpiration() + { + return expiration; + } + + public int getTimeout() + { + return timeout; + } + + public int getMaxTotalConnections() + { + return maxTotalConnections; + } + + public int getMaxIdleConnections() + { + return maxIdleConnections; + } + + public int getMinIdleConnections() + { + return minIdleConnections; + } +} diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java new file mode 100644 index 000000000000..4b743309680b --- /dev/null +++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("redis") +public class RedisCacheProvider extends RedisCacheConfig implements CacheProvider +{ + @Override + public Cache get() + { + return RedisCache.create(this); + } +} diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java new file mode 100644 index 000000000000..c4814b2448dd --- /dev/null +++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisDruidModule.java @@ -0,0 +1,46 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +public class RedisDruidModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + + } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("DruidRedisCache") + .registerSubtypes(RedisCacheProvider.class) + ); + } +} diff --git a/extensions-contrib/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..54730c71c161 --- /dev/null +++ b/extensions-contrib/redis-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.client.cache.RedisDruidModule diff --git a/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java new file mode 100644 index 000000000000..96be4a7ccdd3 --- /dev/null +++ b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fiftyonred.mock_jedis.MockJedisPool; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.name.Names; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.Initialization; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.lifecycle.Lifecycle; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +import java.util.Map; +import java.util.UUID; + +public class RedisCacheTest +{ + private static final byte[] HI = StringUtils.toUtf8("hiiiiiiiiiiiiiiiiiii"); + private static final byte[] HO = StringUtils.toUtf8("hooooooooooooooooooo"); + + private RedisCache cache; + private final RedisCacheConfig cacheConfig = new RedisCacheConfig() + { + @Override + public int getTimeout() + { + return 10; + } + + @Override + public int getExpiration() + { + return 3600; + } + }; + + @Before + public void setUp() throws Exception + { + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(cacheConfig.getMaxTotalConnections()); + poolConfig.setMaxIdle(cacheConfig.getMaxIdleConnections()); + poolConfig.setMinIdle(cacheConfig.getMinIdleConnections()); + + JedisPool pool = new MockJedisPool(poolConfig, "localhost"); + cache = RedisCache.create(pool, cacheConfig); + } + + @Test + public void testBasicInjection() throws Exception + { + final RedisCacheConfig config = new RedisCacheConfig(); + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); + + binder.bind(RedisCacheConfig.class).toInstance(config); + binder.bind(Cache.class).toProvider(RedisCacheProviderWithConfig.class).in(ManageLifecycle.class); + } + ) + ); + Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.start(); + try { + Cache cache = injector.getInstance(Cache.class); + Assert.assertEquals(RedisCache.class, cache.getClass()); + } + finally { + lifecycle.stop(); + } + } + + @Test + public void testSimpleInjection() + { + final String uuid = UUID.randomUUID().toString(); + System.setProperty(uuid + ".type", "redis"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); + + binder.bind(Cache.class).toProvider(CacheProvider.class); + JsonConfigProvider.bind(binder, uuid, CacheProvider.class); + } + ) + ); + final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class); + Assert.assertNotNull(cacheProvider); + Assert.assertEquals(RedisCacheProvider.class, cacheProvider.getClass()); + } + + @Test + public void testSanity() throws Exception + { + Assert.assertNull(cache.get(new Cache.NamedKey("a", HI))); + put(cache, "a", HI, 0); + Assert.assertEquals(0, get(cache, "a", HI)); + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); + + put(cache, "the", HI, 1); + Assert.assertEquals(0, get(cache, "a", HI)); + Assert.assertEquals(1, get(cache, "the", HI)); + + put(cache, "the", HO, 10); + Assert.assertEquals(0, get(cache, "a", HI)); + Assert.assertNull(cache.get(new Cache.NamedKey("a", HO))); + Assert.assertEquals(1, get(cache, "the", HI)); + Assert.assertEquals(10, get(cache, "the", HO)); + + cache.close("the"); + Assert.assertEquals(0, get(cache, "a", HI)); + Assert.assertNull(cache.get(new Cache.NamedKey("a", HO))); + } + + @Test + public void testGetBulk() throws Exception + { + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); + + put(cache, "the", HI, 1); + put(cache, "the", HO, 10); + + Cache.NamedKey key1 = new Cache.NamedKey("the", HI); + Cache.NamedKey key2 = new Cache.NamedKey("the", HO); + Cache.NamedKey key3 = new Cache.NamedKey("a", HI); + + Map result = cache.getBulk( + Lists.newArrayList( + key1, + key2, + key3 + ) + ); + + Assert.assertEquals(1, Ints.fromByteArray(result.get(key1))); + Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); + Assert.assertEquals(null, result.get(key3)); + } + + public void put(Cache cache, String namespace, byte[] key, Integer value) + { + cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value)); + } + + public int get(Cache cache, String namespace, byte[] key) + { + return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key))); + } +} + +class RedisCacheProviderWithConfig extends RedisCacheProvider +{ + private final RedisCacheConfig config; + + @Inject + public RedisCacheProviderWithConfig(RedisCacheConfig config) + { + this.config = config; + } + + @Override + public Cache get() + { + return RedisCache.create(config); + } +} + diff --git a/pom.xml b/pom.xml index 4eab80896f82..5f19e96d9ffb 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ extensions-contrib/scan-query extensions-contrib/sqlserver-metadata-storage extensions-contrib/kafka-emitter + extensions-contrib/redis-cache distribution From 13455091551d4e0e355ca1e310fc90238edc6769 Mon Sep 17 00:00:00 2001 From: qiumingming Date: Thu, 3 Aug 2017 03:28:16 +0800 Subject: [PATCH 2/3] Fix some trival and optimize code --- .../java/io/druid/client/cache/RedisCache.java | 18 +++++++++--------- .../druid/client/cache/RedisCacheConfig.java | 8 ++++---- .../io/druid/client/cache/RedisCacheTest.java | 17 +++++++++++++---- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java index a0a252ae8750..e23507d5c1a7 100644 --- a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java +++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java @@ -34,11 +34,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; public class RedisCache implements Cache { - private static final Logger log = new Logger(MemcachedCache.class); + private static final Logger log = new Logger(RedisCache.class); private JedisPool pool; private RedisCacheConfig config; @@ -48,7 +47,7 @@ public class RedisCache implements Cache private final AtomicLong timeoutCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); - private final AtomicReference priorRequestCount = new AtomicReference<>(new AtomicLong(0L)); + private final AtomicLong priorRequestCount = new AtomicLong(0); // both get、put and getBulk will increase request count by 1 private final AtomicLong totalRequestCount = new AtomicLong(0); @@ -101,7 +100,7 @@ public void put(NamedKey key, byte[] value) totalRequestCount.incrementAndGet(); try (Jedis jedis = pool.getResource()) { - jedis.setex(key.toByteArray(), config.getExpiration(), value); + jedis.psetex(key.toByteArray(), config.getExpiration(), value); } catch (JedisException e) { errorCount.incrementAndGet(); @@ -172,13 +171,14 @@ public boolean isLocal() @Override public void doMonitor(ServiceEmitter emitter) { - final AtomicLong priorCount = priorRequestCount.get(); + final long priorCount = priorRequestCount.get(); + final long totalCount = totalRequestCount.get(); final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); - emitter.emit(builder.build("query/cache/redis/total/requests", totalRequestCount.get())); - emitter.emit(builder.build("query/cache/redis/delta/requests", totalRequestCount.get() - priorCount.get())); - if (!priorRequestCount.compareAndSet(priorCount, totalRequestCount)) { + emitter.emit(builder.build("query/cache/redis/total/requests", totalCount)); + emitter.emit(builder.build("query/cache/redis/delta/requests", totalCount - priorCount)); + if (!priorRequestCount.compareAndSet(priorCount, totalCount)) { log.error("Prior value changed while I was reporting! updating anyways"); - priorRequestCount.set(totalRequestCount); + priorRequestCount.set(totalCount); } } diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java index 7b6df1df5a46..14c2d472e2b7 100644 --- a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java +++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java @@ -29,11 +29,11 @@ public class RedisCacheConfig @JsonProperty private int port; - // seconds + // milliseconds, default to one day @JsonProperty - private int expiration = 24 * 3600; + private long expiration = 24 * 3600 * 1000; - // milliseconds + // milliseconds, the type is 'int' because current Jedis only accept 'int' for timeout @JsonProperty private int timeout = 2000; @@ -59,7 +59,7 @@ public int getPort() return port; } - public int getExpiration() + public long getExpiration() { return expiration; } diff --git a/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java index 96be4a7ccdd3..2bc2a87af317 100644 --- a/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java +++ b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java @@ -19,6 +19,7 @@ package io.druid.client.cache; +import com.fiftyonred.mock_jedis.MockJedis; import com.fiftyonred.mock_jedis.MockJedisPool; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -35,7 +36,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.util.Map; @@ -56,9 +56,9 @@ public int getTimeout() } @Override - public int getExpiration() + public long getExpiration() { - return 3600; + return 3600000; } }; @@ -70,7 +70,16 @@ public void setUp() throws Exception poolConfig.setMaxIdle(cacheConfig.getMaxIdleConnections()); poolConfig.setMinIdle(cacheConfig.getMinIdleConnections()); - JedisPool pool = new MockJedisPool(poolConfig, "localhost"); + MockJedisPool pool = new MockJedisPool(poolConfig, "localhost"); + // orginal MockJedis do not support 'milliseconds' in long type, + // for test we add one method to support it + pool.setClient(new MockJedis("localhost") { + public String psetex(byte[] key, long milliseconds, byte[] value) + { + return this.psetex(key, (int) milliseconds, value); + } + }); + cache = RedisCache.create(pool, cacheConfig); } From 4c3b1ed053b7f82d0000be7e0ab5edca8cf449ca Mon Sep 17 00:00:00 2001 From: qiumingming Date: Thu, 3 Aug 2017 04:13:08 +0800 Subject: [PATCH 3/3] Add Override annotation in RedisCacheTest --- .../src/test/java/io/druid/client/cache/RedisCacheTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java index 2bc2a87af317..a0219bcdc7bd 100644 --- a/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java +++ b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java @@ -72,8 +72,9 @@ public void setUp() throws Exception MockJedisPool pool = new MockJedisPool(poolConfig, "localhost"); // orginal MockJedis do not support 'milliseconds' in long type, - // for test we add one method to support it + // for test we override to support it pool.setClient(new MockJedis("localhost") { + @Override public String psetex(byte[] key, long milliseconds, byte[] value) { return this.psetex(key, (int) milliseconds, value);