Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@
<argument>-c</argument>
<argument>io.druid.extensions.contrib:druid-rabbitmq</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:druid-redis-cache</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:scan-query</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:sqlserver-metadata-storage</argument>
Expand Down
70 changes: 70 additions & 0 deletions extensions-contrib/redis-cache/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-redis-cache</artifactId>
<name>druid-redis-cache</name>

<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.11.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fiftyonred</groupId>
<artifactId>mock-jedis</artifactId>
<version>0.4.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client.cache;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.logger.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class RedisCache implements Cache
{
private static final Logger log = new Logger(RedisCache.class);

private JedisPool pool;
private RedisCacheConfig config;

private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);

private final AtomicLong priorRequestCount = new AtomicLong(0);
// both get、put and getBulk will increase request count by 1
private final AtomicLong totalRequestCount = new AtomicLong(0);

private RedisCache(JedisPool pool, RedisCacheConfig config)
{
this.pool = pool;
this.config = config;
}

public static RedisCache create(final RedisCacheConfig config)
{
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(config.getMaxTotalConnections());
poolConfig.setMaxIdle(config.getMaxIdleConnections());
poolConfig.setMinIdle(config.getMinIdleConnections());

JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout());
return new RedisCache(pool, config);
}

@Override
public byte[] get(NamedKey key)
{
totalRequestCount.incrementAndGet();

try (Jedis jedis = pool.getResource()) {
byte[] bytes = jedis.get(key.toByteArray());
if (bytes == null) {
missCount.incrementAndGet();
return null;
} else {
hitCount.incrementAndGet();
return bytes;
}
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
timeoutCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling item from cache");
return null;
}
}

@Override
public void put(NamedKey key, byte[] value)
{
totalRequestCount.incrementAndGet();

try (Jedis jedis = pool.getResource()) {
jedis.psetex(key.toByteArray(), config.getExpiration(), value);
}
catch (JedisException e) {
errorCount.incrementAndGet();
log.warn(e, "Exception pushing item to cache");
}
}

@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
totalRequestCount.incrementAndGet();

Map<NamedKey, byte[]> results = new HashMap<>();

try (Jedis jedis = pool.getResource()) {
List<NamedKey> namedKeys = Lists.newArrayList(keys);
List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);

List<byte[]> byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));

for (int i = 0; i < byteValues.size(); ++i) {
if (byteValues.get(i) != null) {
results.put(namedKeys.get(i), byteValues.get(i));
}
}

hitCount.addAndGet(results.size());
missCount.addAndGet(namedKeys.size() - results.size());
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
timeoutCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling items from cache");
}

return results;
}

@Override
public void close(String namespace)
{
// no resources to cleanup
}

@Override
public CacheStats getStats()
{
return new CacheStats(
hitCount.get(),
missCount.get(),
0,
0,
0,
timeoutCount.get(),
errorCount.get()
);
}

@Override
public boolean isLocal()
{
return false;
}

@Override
public void doMonitor(ServiceEmitter emitter)
{
final long priorCount = priorRequestCount.get();
final long totalCount = totalRequestCount.get();
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
emitter.emit(builder.build("query/cache/redis/total/requests", totalCount));
emitter.emit(builder.build("query/cache/redis/delta/requests", totalCount - priorCount));
if (!priorRequestCount.compareAndSet(priorCount, totalCount)) {
log.error("Prior value changed while I was reporting! updating anyways");
priorRequestCount.set(totalCount);
}
}

@VisibleForTesting
static RedisCache create(final JedisPool pool, final RedisCacheConfig config)
{
return new RedisCache(pool, config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client.cache;

import com.fasterxml.jackson.annotation.JsonProperty;

public class RedisCacheConfig
{
@JsonProperty
private String host;

@JsonProperty
private int port;

// milliseconds, default to one day
@JsonProperty
private long expiration = 24 * 3600 * 1000;

// milliseconds, the type is 'int' because current Jedis only accept 'int' for timeout
@JsonProperty
private int timeout = 2000;

// max connections of redis connection pool
@JsonProperty
private int maxTotalConnections = 8;

// max idle connections of redis connection pool
@JsonProperty
private int maxIdleConnections = 8;

// min idle connections of redis connection pool
@JsonProperty
private int minIdleConnections = 0;

public String getHost()
{
return host;
}

public int getPort()
{
return port;
}

public long getExpiration()
{
return expiration;
}

public int getTimeout()
{
return timeout;
}

public int getMaxTotalConnections()
{
return maxTotalConnections;
}

public int getMaxIdleConnections()
{
return maxIdleConnections;
}

public int getMinIdleConnections()
{
return minIdleConnections;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client.cache;

import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeName("redis")
public class RedisCacheProvider extends RedisCacheConfig implements CacheProvider
{
@Override
public Cache get()
{
return RedisCache.create(this);
}
}
Loading