Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ You can optionally only configure caching to be enabled on the broker by setting
|--------|---------------|-----------|-------|
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false|
|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false|
|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`|

Expand Down
8 changes: 5 additions & 3 deletions docs/content/querying/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ layout: doc_page
---
# Query Caching

Druid supports query result caching through an LRU cache. Results are stored on a per segment basis, along with the
parameters of a given query. This allows Druid to return final results based partially on segment results in the cache and partially
on segment results from scanning historical/real-time segments.
Druid supports query result caching through an LRU cache. Results are stored as a whole or either on a per segment basis along with the
parameters of a given query. Segment level caching allows Druid to return final results based partially on segment results in the cache
and partially on segment results from scanning historical/real-time segments. Result level caching enables Druid to cache the entire
result set, so that query results can be completely retrieved from the cache for identical queries.

Segment results can be stored in a local heap cache or in an external distributed key/value store. Segment query caches
can be enabled at either the Historical and Broker level (it is not recommended to enable caching on both).
Expand All @@ -15,6 +16,7 @@ can be enabled at either the Historical and Broker level (it is not recommended
Enabling caching on the broker can yield faster results than if query caches were enabled on Historicals for small clusters. This is
the recommended setup for smaller production clusters (< 20 servers). Take note that when caching is enabled on the Broker,
results from Historicals are returned on a per segment basis, and Historicals will not be able to do any local result merging.
Result level caching is enabled only on the Broker side.

## Query caching on Historicals

Expand Down
2 changes: 2 additions & 0 deletions docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The query context is used for various query configuration parameters. The follow
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache or druid.historical.cache.populateCache to determine whether or not to save the results of this query to the query cache |
|useResultLevelCache | `false` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useResultLevelCache to determine whether or not to read from the query cache |
|populateResultLevelCache | `false` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache to determine whether or not to save the results of this query to the query cache |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,17 @@ private RedisCache(JedisPool pool, RedisCacheConfig config)

public static RedisCache create(final RedisCacheConfig config)
{
JedisPool pool;
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(config.getMaxTotalConnections());
poolConfig.setMaxIdle(config.getMaxIdleConnections());
poolConfig.setMinIdle(config.getMinIdleConnections());

JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout());
if(config.getPasswd() != null){
pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout(),config.getPasswd());
}else{
pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout());
}
return new RedisCache(pool, config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class RedisCacheConfig
@JsonProperty
private int port;

@JsonProperty
private String passwd;

// milliseconds, default to one day
@JsonProperty
private long expiration = 24 * 3600 * 1000;
Expand Down Expand Up @@ -59,6 +62,11 @@ public int getPort()
return port;
}

public String getPasswd()
{
return passwd;
}

public long getExpiration()
{
return expiration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public void testGetBulk() throws Exception
Assert.assertEquals(null, result.get(key3));
}

@Test
public void testConfigPasswd(){
Assert.assertNull(cacheConfig.getPasswd());
}

public void put(Cache cache, String namespace, byte[] key, Integer value)
{
cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
Expand Down
55 changes: 51 additions & 4 deletions processing/src/main/java/io/druid/query/CacheStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import io.druid.guice.annotations.ExtensionPoint;
import io.druid.query.aggregation.AggregatorFactory;

import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;

/**
*/
*/
@ExtensionPoint
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
Expand All @@ -37,6 +40,7 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* called on the cached by-segment results
*
* @return true if the query is cacheable, otherwise false.
*/
boolean isCacheable(QueryType query, boolean willMergeRunners);
Expand All @@ -45,6 +49,7 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
* Computes the cache key for the given query
*
* @param query the query to compute a cache key for
*
* @return the cache key
*/
byte[] computeCacheKey(QueryType query);
Expand All @@ -58,17 +63,59 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>

/**
* Returns a function that converts from the QueryType's result type to something cacheable.
*
* <p>
* The resulting function must be thread-safe.
*
* @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching
*
* @return a thread-safe function that converts the QueryType's result type into something cacheable
*/
Function<T, CacheType> prepareForCache();
Function<T, CacheType> prepareForCache(boolean isResultLevelCache);

/**
* A function that does the inverse of the operation that the function prepareForCache returns
*
* @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching
*
* @return A function that does the inverse of the operation that the function prepareForCache returns
*/
Function<CacheType, T> pullFromCache();
Function<CacheType, T> pullFromCache(boolean isResultLevelCache);


default Function<T, CacheType> prepareForSegmentLevelCache()
{
return prepareForCache(false);
}

default Function<CacheType, T> pullFromSegmentLevelCache()
{
return pullFromCache(false);
}

/**
* Helper function used by TopN, GroupBy, Timeseries queries in {@link #pullFromCache(boolean)}.
* When using the result level cache, the agg values seen here are
* finalized values generated by AggregatorFactory.finalizeComputation().
* These finalized values are deserialized from the cache as generic Objects, which will
* later be reserialized and returned to the user without further modification.
* Because the agg values are deserialized as generic Objects, the values are subject to the same
* type consistency issues handled by DimensionHandlerUtils.convertObjectToType() in the pullFromCache implementations
* for dimension values (e.g., a Float would become Double).
*/
static void fetchAggregatorsFromCache(
Iterator<AggregatorFactory> aggIter,
Iterator<Object> resultIter,
boolean isResultLevelCache,
BiFunction<String, Object, Void> addToResultFunction
)
{
while (aggIter.hasNext() && resultIter.hasNext()) {
final AggregatorFactory factory = aggIter.next();
if (isResultLevelCache) {
addToResultFunction.apply(factory.getName(), resultIter.next());
} else {
addToResultFunction.apply(factory.getName(), factory.deserialize(resultIter.next()));
}
}
}
}
22 changes: 22 additions & 0 deletions processing/src/main/java/io/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class QueryContexts
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
public static final boolean DEFAULT_USE_CACHE = true;
public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true;
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
Expand Down Expand Up @@ -72,6 +74,26 @@ public static <T> boolean isUseCache(Query<T> query, boolean defaultValue)
return parseBoolean(query, "useCache", defaultValue);
}

public static <T> boolean isPopulateResultLevelCache(Query<T> query)
{
return isPopulateResultLevelCache(query, DEFAULT_POPULATE_RESULTLEVEL_CACHE);
}

public static <T> boolean isPopulateResultLevelCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "populateResultLevelCache", defaultValue);
}

public static <T> boolean isUseResultLevelCache(Query<T> query)
{
return isUseResultLevelCache(query, DEFAULT_USE_RESULTLEVEL_CACHE);
}

public static <T> boolean isUseResultLevelCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "useResultLevelCache", defaultValue);
}

public static <T> boolean isFinalize(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "finalize", defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
Expand Down Expand Up @@ -398,6 +399,7 @@ public byte[] computeCacheKey(GroupByQuery query)
.appendCacheables(query.getAggregatorSpecs())
.appendCacheables(query.getDimensions())
.appendCacheable(query.getVirtualColumns())
.appendString(query.getIntervals().toString())
.build();
}

Expand All @@ -408,7 +410,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Row, Object> prepareForCache()
public Function<Row, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Row, Object>()
{
Expand All @@ -426,6 +428,11 @@ public Object apply(Row input)
for (AggregatorFactory agg : aggs) {
retVal.add(event.get(agg.getName()));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
retVal.add(event.get(postAgg.getName()));
}
}
return retVal;
}

Expand All @@ -435,7 +442,7 @@ public Object apply(Row input)
}

@Override
public Function<Object, Row> pullFromCache()
public Function<Object, Row> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Row>()
{
Expand All @@ -448,19 +455,30 @@ public Row apply(Object input)

DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());

Map<String, Object> event = Maps.newLinkedHashMap();
final Map<String, Object> event = Maps.newLinkedHashMap();
Iterator<DimensionSpec> dimsIter = dims.iterator();
while (dimsIter.hasNext() && results.hasNext()) {
final DimensionSpec factory = dimsIter.next();
event.put(factory.getOutputName(), results.next());
}

Iterator<AggregatorFactory> aggsIter = aggs.iterator();
while (aggsIter.hasNext() && results.hasNext()) {
final AggregatorFactory factory = aggsIter.next();
event.put(factory.getName(), factory.deserialize(results.next()));
}

CacheStrategy.fetchAggregatorsFromCache(
aggsIter,
results,
isResultLevelCache,
(aggName, aggValueObject) -> {
event.put(aggName, aggValueObject);
return null;
}
);

if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
while (postItr.hasNext() && results.hasNext()) {
event.put(postItr.next().getName(), results.next());
}
}
if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) {
throw new ISE(
"Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public TypeReference<SegmentAnalysis> getCacheObjectClazz()
}

@Override
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache()
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache(boolean isResultLevelCache)
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
Expand All @@ -211,7 +211,7 @@ public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
}

@Override
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache()
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache(boolean isResultLevelCache)
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Result<SearchResultValue>, Object> prepareForCache()
public Function<Result<SearchResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<SearchResultValue>, Object>()
{
Expand All @@ -221,7 +221,7 @@ public Object apply(Result<SearchResultValue> input)
}

@Override
public Function<Object, Result<SearchResultValue>> pullFromCache()
public Function<Object, Result<SearchResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<SearchResultValue>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Result<SelectResultValue>, Object> prepareForCache()
public Function<Result<SelectResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<SelectResultValue>, Object>()
{
Expand Down Expand Up @@ -272,7 +272,7 @@ public Object apply(final Result<SelectResultValue> input)
}

@Override
public Function<Object, Result<SelectResultValue>> pullFromCache()
public Function<Object, Result<SelectResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<SelectResultValue>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public TypeReference<Object> getCacheObjectClazz()
}

@Override
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache()
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<TimeBoundaryResultValue>, Object>()
{
Expand All @@ -186,7 +186,7 @@ public Object apply(Result<TimeBoundaryResultValue> input)
}

@Override
public Function<Object, Result<TimeBoundaryResultValue>> pullFromCache()
public Function<Object, Result<TimeBoundaryResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<TimeBoundaryResultValue>>()
{
Expand Down
Loading