diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 6731817cb031..2319ce001f54 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -107,6 +107,9 @@ You can optionally only configure caching to be enabled on the broker by setting |--------|---------------|-----------|-------| |`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false| |`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false| +|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false| +|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false| +|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`| |`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| |`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`| diff --git a/docs/content/querying/caching.md b/docs/content/querying/caching.md index 1d6f60af441e..2dd2c88d6b62 100644 --- a/docs/content/querying/caching.md +++ b/docs/content/querying/caching.md @@ -3,9 +3,10 @@ layout: doc_page --- # Query Caching -Druid supports query result caching through an LRU cache. Results are stored on a per segment basis, along with the -parameters of a given query. This allows Druid to return final results based partially on segment results in the cache and partially -on segment results from scanning historical/real-time segments. +Druid supports query result caching through an LRU cache. Results are stored as a whole or either on a per segment basis along with the +parameters of a given query. Segment level caching allows Druid to return final results based partially on segment results in the cache +and partially on segment results from scanning historical/real-time segments. Result level caching enables Druid to cache the entire +result set, so that query results can be completely retrieved from the cache for identical queries. Segment results can be stored in a local heap cache or in an external distributed key/value store. Segment query caches can be enabled at either the Historical and Broker level (it is not recommended to enable caching on both). @@ -15,6 +16,7 @@ can be enabled at either the Historical and Broker level (it is not recommended Enabling caching on the broker can yield faster results than if query caches were enabled on Historicals for small clusters. This is the recommended setup for smaller production clusters (< 20 servers). Take note that when caching is enabled on the Broker, results from Historicals are returned on a per segment basis, and Historicals will not be able to do any local result merging. +Result level caching is enabled only on the Broker side. ## Query caching on Historicals diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index b0effe81cf9d..d4e2be28f123 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -15,6 +15,8 @@ The query context is used for various query configuration parameters. The follow |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | |populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache or druid.historical.cache.populateCache to determine whether or not to save the results of this query to the query cache | +|useResultLevelCache | `false` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useResultLevelCache to determine whether or not to read from the query cache | +|populateResultLevelCache | `false` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache to determine whether or not to save the results of this query to the query cache | |bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java index 9b4744fd961c..0ced2b7f296b 100644 --- a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java +++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCache.java @@ -59,12 +59,17 @@ private RedisCache(JedisPool pool, RedisCacheConfig config) public static RedisCache create(final RedisCacheConfig config) { + JedisPool pool; JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(config.getMaxTotalConnections()); poolConfig.setMaxIdle(config.getMaxIdleConnections()); poolConfig.setMinIdle(config.getMinIdleConnections()); - JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout()); + if(config.getPasswd() != null){ + pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout(),config.getPasswd()); + }else{ + pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout()); + } return new RedisCache(pool, config); } diff --git a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java index 3b61814ea9dc..b30a8077a346 100644 --- a/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java +++ b/extensions-contrib/redis-cache/src/main/java/io/druid/client/cache/RedisCacheConfig.java @@ -29,6 +29,9 @@ public class RedisCacheConfig @JsonProperty private int port; + @JsonProperty + private String passwd; + // milliseconds, default to one day @JsonProperty private long expiration = 24 * 3600 * 1000; @@ -59,6 +62,11 @@ public int getPort() return port; } + public String getPasswd() + { + return passwd; + } + public long getExpiration() { return expiration; diff --git a/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java index 2817bb013aa6..5616616f2f84 100644 --- a/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java +++ b/extensions-contrib/redis-cache/src/test/java/io/druid/client/cache/RedisCacheTest.java @@ -182,6 +182,11 @@ public void testGetBulk() throws Exception Assert.assertEquals(null, result.get(key3)); } + @Test + public void testConfigPasswd(){ + Assert.assertNull(cacheConfig.getPasswd()); + } + public void put(Cache cache, String namespace, byte[] key, Integer value) { cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value)); diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index 95681dd1b067..fbe6992b7026 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -22,11 +22,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import io.druid.guice.annotations.ExtensionPoint; +import io.druid.query.aggregation.AggregatorFactory; +import java.util.Iterator; import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; /** -*/ + */ @ExtensionPoint public interface CacheStrategy> { @@ -37,6 +40,7 @@ public interface CacheStrategy> * @param query the query to be cached * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be * called on the cached by-segment results + * * @return true if the query is cacheable, otherwise false. */ boolean isCacheable(QueryType query, boolean willMergeRunners); @@ -45,6 +49,7 @@ public interface CacheStrategy> * Computes the cache key for the given query * * @param query the query to compute a cache key for + * * @return the cache key */ byte[] computeCacheKey(QueryType query); @@ -58,17 +63,59 @@ public interface CacheStrategy> /** * Returns a function that converts from the QueryType's result type to something cacheable. - * + *

* The resulting function must be thread-safe. * + * @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching + * * @return a thread-safe function that converts the QueryType's result type into something cacheable */ - Function prepareForCache(); + Function prepareForCache(boolean isResultLevelCache); /** * A function that does the inverse of the operation that the function prepareForCache returns * + * @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching + * * @return A function that does the inverse of the operation that the function prepareForCache returns */ - Function pullFromCache(); + Function pullFromCache(boolean isResultLevelCache); + + + default Function prepareForSegmentLevelCache() + { + return prepareForCache(false); + } + + default Function pullFromSegmentLevelCache() + { + return pullFromCache(false); + } + + /** + * Helper function used by TopN, GroupBy, Timeseries queries in {@link #pullFromCache(boolean)}. + * When using the result level cache, the agg values seen here are + * finalized values generated by AggregatorFactory.finalizeComputation(). + * These finalized values are deserialized from the cache as generic Objects, which will + * later be reserialized and returned to the user without further modification. + * Because the agg values are deserialized as generic Objects, the values are subject to the same + * type consistency issues handled by DimensionHandlerUtils.convertObjectToType() in the pullFromCache implementations + * for dimension values (e.g., a Float would become Double). + */ + static void fetchAggregatorsFromCache( + Iterator aggIter, + Iterator resultIter, + boolean isResultLevelCache, + BiFunction addToResultFunction + ) + { + while (aggIter.hasNext() && resultIter.hasNext()) { + final AggregatorFactory factory = aggIter.next(); + if (isResultLevelCache) { + addToResultFunction.apply(factory.getName(), resultIter.next()); + } else { + addToResultFunction.apply(factory.getName(), factory.deserialize(resultIter.next())); + } + } + } } diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index d0a16fd87844..d88a536eb9ec 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -37,6 +37,8 @@ public class QueryContexts public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; public static final boolean DEFAULT_USE_CACHE = true; + public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true; + public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true; public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes @@ -72,6 +74,26 @@ public static boolean isUseCache(Query query, boolean defaultValue) return parseBoolean(query, "useCache", defaultValue); } + public static boolean isPopulateResultLevelCache(Query query) + { + return isPopulateResultLevelCache(query, DEFAULT_POPULATE_RESULTLEVEL_CACHE); + } + + public static boolean isPopulateResultLevelCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "populateResultLevelCache", defaultValue); + } + + public static boolean isUseResultLevelCache(Query query) + { + return isUseResultLevelCache(query, DEFAULT_USE_RESULTLEVEL_CACHE); + } + + public static boolean isUseResultLevelCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "useResultLevelCache", defaultValue); + } + public static boolean isFinalize(Query query, boolean defaultValue) { return parseBoolean(query, "finalize", defaultValue); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 28d5acb42de0..9208799f9030 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -50,6 +50,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulatorFns; +import io.druid.query.aggregation.PostAggregator; import io.druid.query.cache.CacheKeyBuilder; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -398,6 +399,7 @@ public byte[] computeCacheKey(GroupByQuery query) .appendCacheables(query.getAggregatorSpecs()) .appendCacheables(query.getDimensions()) .appendCacheable(query.getVirtualColumns()) + .appendString(query.getIntervals().toString()) .build(); } @@ -408,7 +410,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function prepareForCache() + public Function prepareForCache(boolean isResultLevelCache) { return new Function() { @@ -426,6 +428,11 @@ public Object apply(Row input) for (AggregatorFactory agg : aggs) { retVal.add(event.get(agg.getName())); } + if (isResultLevelCache) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + retVal.add(event.get(postAgg.getName())); + } + } return retVal; } @@ -435,7 +442,7 @@ public Object apply(Row input) } @Override - public Function pullFromCache() + public Function pullFromCache(boolean isResultLevelCache) { return new Function() { @@ -448,19 +455,30 @@ public Row apply(Object input) DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue()); - Map event = Maps.newLinkedHashMap(); + final Map event = Maps.newLinkedHashMap(); Iterator dimsIter = dims.iterator(); while (dimsIter.hasNext() && results.hasNext()) { final DimensionSpec factory = dimsIter.next(); event.put(factory.getOutputName(), results.next()); } - Iterator aggsIter = aggs.iterator(); - while (aggsIter.hasNext() && results.hasNext()) { - final AggregatorFactory factory = aggsIter.next(); - event.put(factory.getName(), factory.deserialize(results.next())); - } + CacheStrategy.fetchAggregatorsFromCache( + aggsIter, + results, + isResultLevelCache, + (aggName, aggValueObject) -> { + event.put(aggName, aggValueObject); + return null; + } + ); + + if (isResultLevelCache) { + Iterator postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && results.hasNext()) { + event.put(postItr.next().getName(), results.next()); + } + } if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) { throw new ISE( "Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]", diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 4a921480f30f..5025d9dbc975 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -198,7 +198,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function prepareForCache() + public Function prepareForCache(boolean isResultLevelCache) { return new Function() { @@ -211,7 +211,7 @@ public SegmentAnalysis apply(@Nullable SegmentAnalysis input) } @Override - public Function pullFromCache() + public Function pullFromCache(boolean isResultLevelCache) { return new Function() { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 74148d66aea4..9efad496b351 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -206,7 +206,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -221,7 +221,7 @@ public Object apply(Result input) } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 0b567ca6fc79..023e532940da 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -243,7 +243,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -272,7 +272,7 @@ public Object apply(final Result input) } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index a36461d22346..ae286458e49b 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -173,7 +173,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -186,7 +186,7 @@ public Object apply(Result input) } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 473775b71bc0..b28d688f71d0 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -164,6 +164,7 @@ public byte[] computeCacheKey(TimeseriesQuery query) .appendCacheable(query.getDimensionsFilter()) .appendCacheables(query.getAggregatorSpecs()) .appendCacheable(query.getVirtualColumns()) + .appendString(query.getIntervals().toString()) .build(); } @@ -174,7 +175,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -188,14 +189,18 @@ public Object apply(final Result input) for (AggregatorFactory agg : aggs) { retVal.add(results.getMetric(agg.getName())); } - + if (isResultLevelCache) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + retVal.add(results.getMetric(postAgg.getName())); + } + } return retVal; } }; } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { @@ -205,16 +210,28 @@ public Function> pullFromCache() public Result apply(@Nullable Object input) { List results = (List) input; - Map retVal = Maps.newLinkedHashMap(); + final Map retVal = Maps.newLinkedHashMap(); Iterator aggsIter = aggs.iterator(); Iterator resultIter = results.iterator(); DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue()); - while (aggsIter.hasNext() && resultIter.hasNext()) { - final AggregatorFactory factory = aggsIter.next(); - retVal.put(factory.getName(), factory.deserialize(resultIter.next())); + CacheStrategy.fetchAggregatorsFromCache( + aggsIter, + resultIter, + isResultLevelCache, + (aggName, aggValueObject) -> { + retVal.put(aggName, aggValueObject); + return null; + } + ); + + if (isResultLevelCache) { + Iterator postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && resultIter.hasNext()) { + retVal.put(postItr.next().getName(), resultIter.next()); + } } return new Result( diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 81761afc6a37..a21462498f81 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -293,7 +293,6 @@ public TypeReference> getResultTypeReference() } - @Override public CacheStrategy, Object, TopNQuery> getCacheStrategy(final TopNQuery query) { @@ -322,7 +321,8 @@ public byte[] computeCacheKey(TopNQuery query) .appendCacheable(query.getGranularity()) .appendCacheable(query.getDimensionsFilter()) .appendCacheables(query.getAggregatorSpecs()) - .appendCacheable(query.getVirtualColumns()); + .appendCacheable(query.getVirtualColumns()) + .appendString(query.getIntervals().toString()); final List postAggregators = prunePostAggregators(query); if (!postAggregators.isEmpty()) { @@ -341,7 +341,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -361,6 +361,11 @@ public Object apply(final Result input) for (String aggName : aggFactoryNames) { vals.add(result.getMetric(aggName)); } + if (isResultLevelCache) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + vals.add(result.getMetric(postAgg.getName())); + } + } retVal.add(vals); } return retVal; @@ -369,7 +374,7 @@ public Object apply(final Result input) } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { @@ -386,22 +391,29 @@ public Result apply(Object input) while (inputIter.hasNext()) { List result = (List) inputIter.next(); - Map vals = Maps.newLinkedHashMap(); + final Map vals = Maps.newLinkedHashMap(); Iterator aggIter = aggs.iterator(); Iterator resultIter = result.iterator(); vals.put(query.getDimensionSpec().getOutputName(), resultIter.next()); - while (aggIter.hasNext() && resultIter.hasNext()) { - final AggregatorFactory factory = aggIter.next(); - vals.put(factory.getName(), factory.deserialize(resultIter.next())); - } + CacheStrategy.fetchAggregatorsFromCache( + aggIter, + resultIter, + isResultLevelCache, + (aggName, aggValueObject) -> { + vals.put(aggName, aggValueObject); + return null; + } + ); - for (PostAggregator postAgg : postAggs) { - vals.put(postAgg.getName(), postAgg.compute(vals)); + if (isResultLevelCache) { + Iterator postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && resultIter.hasNext()) { + vals.put(postItr.next().getName(), resultIter.next()); + } } - retVal.add(vals); } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index 359e39f6af2b..a497a6313801 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -94,7 +94,7 @@ public void testCacheStrategy() throws Exception null ); - Object preparedValue = strategy.prepareForCache().apply(result); + Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result); ObjectMapper objectMapper = new DefaultObjectMapper(); SegmentAnalysis fromCacheValue = objectMapper.readValue( @@ -102,7 +102,7 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - SegmentAnalysis fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + SegmentAnalysis fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java index 621a435f5e95..dd73518ae5fe 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java @@ -59,7 +59,7 @@ public void testCacheStrategy() throws Exception new SearchResultValue(ImmutableList.of(new SearchHit("dim1", "a"))) ); - Object preparedValue = strategy.prepareForCache().apply( + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( result ); @@ -69,7 +69,7 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java index 3acf7e759ab9..9eab26f76594 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java @@ -215,7 +215,7 @@ public void testCacheStrategy() throws Exception ) ); - Object preparedValue = strategy.prepareForCache().apply( + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( result ); @@ -225,7 +225,7 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 84d8d952421a..722c99734fb5 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -32,6 +32,8 @@ import io.druid.query.TableDataSource; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.TestHelper; import io.druid.segment.VirtualColumns; @@ -40,104 +42,158 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; import java.util.Arrays; @RunWith(Parameterized.class) public class TimeseriesQueryQueryToolChestTest { - private static final TimeseriesQueryQueryToolChest TOOL_CHEST = new TimeseriesQueryQueryToolChest(null); - - @Parameterized.Parameters(name = "descending={0}") - public static Iterable constructorFeeder() throws IOException - { - return QueryRunnerTestHelper.transformToConstructionFeeder(Arrays.asList(false, true)); - } - - private final boolean descending; - - public TimeseriesQueryQueryToolChestTest(boolean descending) - { - this.descending = descending; - } - - @Test - public void testCacheStrategy() throws Exception - { - CacheStrategy, Object, TimeseriesQuery> strategy = - TOOL_CHEST.getCacheStrategy( - new TimeseriesQuery( - new TableDataSource("dummy"), - new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), - descending, - VirtualColumns.EMPTY, - null, - Granularities.ALL, - ImmutableList.of( - new CountAggregatorFactory("metric1"), - new LongSumAggregatorFactory("metric0", "metric0") - ), - null, - null - ) + private static final TimeseriesQueryQueryToolChest TOOL_CHEST = new TimeseriesQueryQueryToolChest(null); + + @Parameterized.Parameters(name = "descending={0}") + public static Iterable constructorFeeder() + { + return QueryRunnerTestHelper.transformToConstructionFeeder(Arrays.asList(false, true)); + } + + private final boolean descending; + + public TimeseriesQueryQueryToolChestTest(boolean descending) + { + this.descending = descending; + } + + @Test + public void testCacheStrategy() throws Exception + { + CacheStrategy, Object, TimeseriesQuery> strategy = + TOOL_CHEST.getCacheStrategy( + new TimeseriesQuery( + new TableDataSource("dummy"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), + descending, + VirtualColumns.EMPTY, + null, + Granularities.ALL, + ImmutableList.of( + new CountAggregatorFactory("metric1"), + new LongSumAggregatorFactory("metric0", "metric0") + ), + ImmutableList.of(new ConstantPostAggregator("post", 10)), + null + ) + ); + + final Result result1 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TimeseriesResultValue( + ImmutableMap.of("metric1", 2, "metric0", 3) + ) ); - final Result result = new Result<>( - // test timestamps that result in integer size millis - DateTimes.utc(123L), - new TimeseriesResultValue( - ImmutableMap.of("metric1", 2, "metric0", 3) - ) - ); - - Object preparedValue = strategy.prepareForCache().apply(result); - - ObjectMapper objectMapper = TestHelper.makeJsonMapper(); - Object fromCacheValue = objectMapper.readValue( - objectMapper.writeValueAsBytes(preparedValue), - strategy.getCacheObjectClazz() - ); - - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); - - Assert.assertEquals(result, fromCacheResult); - } - - @Test - public void testCacheKey() throws Exception - { - final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() - .dataSource("dummy") - .intervals("2015-01-01/2015-01-02") - .descending(descending) - .granularity(Granularities.ALL) - .aggregators( - ImmutableList.of( - new CountAggregatorFactory("metric1"), - new LongSumAggregatorFactory("metric0", "metric0") - ) - ) - .build(); - - final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() - .dataSource("dummy") - .intervals("2015-01-01/2015-01-02") - .descending(descending) - .granularity(Granularities.ALL) - .aggregators( - ImmutableList.of( - new LongSumAggregatorFactory("metric0", "metric0"), - new CountAggregatorFactory("metric1") - ) - ) - .build(); - - // Test for https://github.com/druid-io/druid/issues/4093. - Assert.assertFalse( - Arrays.equals( - TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), - TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) - ) - ); - } -} + Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1); + + ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + Object fromCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedValue), + strategy.getCacheObjectClazz() + ); + + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + + final Result result2 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TimeseriesResultValue( + ImmutableMap.of("metric1", 2, "metric0", 3, "post", 10) + ) + ); + + Object preparedResultLevelCacheValue = strategy.prepareForCache(true).apply(result2); + Object fromResultLevelCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultLevelCacheValue), + strategy.getCacheObjectClazz() + ); + + Result fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue); + Assert.assertEquals(result2, fromResultLevelCacheRes); + } + + @Test + public void testCacheKey() + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new CountAggregatorFactory("metric1"), + new LongSumAggregatorFactory("metric0", "metric0") + ) + ) + .build(); + + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .build(); + + // Test for https://github.com/druid-io/druid/issues/4093. + Assert.assertFalse( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) + ) + ); + } + + @Test + public void testComputeCacheKeyWithDifferentIntervals() + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new CountAggregatorFactory("metric1"), + new LongSumAggregatorFactory("metric0", "metric0") + ) + ) + .build(); + + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-03/2015-01-04") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new CountAggregatorFactory("metric1"), + new LongSumAggregatorFactory("metric0", "metric0") + ) + ) + .build(); + + // Test for https://github.com/druid-io/druid/issues/4093. + Assert.assertFalse( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) + ) + ); + } +} \ No newline at end of file diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java index 8938390986a5..fadec310463a 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -62,156 +62,230 @@ public class TopNQueryQueryToolChestTest public void testCacheStrategy() throws Exception { CacheStrategy, Object, TopNQuery> strategy = - new TopNQueryQueryToolChest(null, null).getCacheStrategy( - new TopNQuery( - new TableDataSource("dummy"), - VirtualColumns.EMPTY, - new DefaultDimensionSpec("test", "test"), - new NumericTopNMetricSpec("metric1"), - 3, - new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), - null, - Granularities.ALL, - ImmutableList.of(new CountAggregatorFactory("metric1")), - ImmutableList.of(new ConstantPostAggregator("post", 10)), - null - ) - ); - - final Result result = new Result<>( - // test timestamps that result in integer size millis - DateTimes.utc(123L), - new TopNResultValue( - Arrays.asList( - ImmutableMap.of( - "test", "val1", - "metric1", 2 - ) + new TopNQueryQueryToolChest(null, null).getCacheStrategy( + new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new NumericTopNMetricSpec("metric1"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), + null, + Granularities.ALL, + ImmutableList.of(new CountAggregatorFactory("metric1")), + ImmutableList.of(new ConstantPostAggregator("post", 10)), + null + ) + ); + + final Result result1 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TopNResultValue( + Arrays.asList( + ImmutableMap.of( + "test", "val1", + "metric1", 2 + ) + ) ) - ) ); - Object preparedValue = strategy.prepareForCache().apply( - result + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( + result1 ); ObjectMapper objectMapper = TestHelper.makeJsonMapper(); Object fromCacheValue = objectMapper.readValue( - objectMapper.writeValueAsBytes(preparedValue), - strategy.getCacheObjectClazz() + objectMapper.writeValueAsBytes(preparedValue), + strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + + final Result result2 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TopNResultValue( + Arrays.asList( + ImmutableMap.of( + "test", "val1", + "metric1", 2, + "post", 10 + ) + ) + ) + ); + + Object preparedResultCacheValue = strategy.prepareForCache(true).apply( + result2 + ); + + Object fromResultCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultCacheValue), + strategy.getCacheObjectClazz() + ); + + Result fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue); + Assert.assertEquals(result2, fromResultCacheResult); - Assert.assertEquals(result, fromCacheResult); } @Test - public void testComputeCacheKeyWithDifferentPostAgg() throws Exception + public void testComputeCacheKeyWithDifferentPostAgg() { final TopNQuery query1 = new TopNQuery( - new TableDataSource("dummy"), - VirtualColumns.EMPTY, - new DefaultDimensionSpec("test", "test"), - new NumericTopNMetricSpec("post"), - 3, - new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), - null, - Granularities.ALL, - ImmutableList.of(new CountAggregatorFactory("metric1")), - ImmutableList.of(new ConstantPostAggregator("post", 10)), - null + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new NumericTopNMetricSpec("post"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), + null, + Granularities.ALL, + ImmutableList.of(new CountAggregatorFactory("metric1")), + ImmutableList.of(new ConstantPostAggregator("post", 10)), + null ); final TopNQuery query2 = new TopNQuery( - new TableDataSource("dummy"), - VirtualColumns.EMPTY, - new DefaultDimensionSpec("test", "test"), - new NumericTopNMetricSpec("post"), - 3, - new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), - null, - Granularities.ALL, - ImmutableList.of(new CountAggregatorFactory("metric1")), - ImmutableList.of( - new ArithmeticPostAggregator( - "post", - "+", - ImmutableList.of( - new FieldAccessPostAggregator( - null, - "metric1" - ), - new FieldAccessPostAggregator( - null, - "metric1" + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new NumericTopNMetricSpec("post"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), + null, + Granularities.ALL, + ImmutableList.of(new CountAggregatorFactory("metric1")), + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric1" + ) + ) ) - ) - ) - ), - null + ), + null + ); + + final CacheStrategy, Object, TopNQuery> strategy1 = new TopNQueryQueryToolChest( + null, + null + ).getCacheStrategy(query1); + + final CacheStrategy, Object, TopNQuery> strategy2 = new TopNQueryQueryToolChest( + null, + null + ).getCacheStrategy(query2); + + Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + } + + + + @Test + public void testCacheKeyWithDifferentIntervals() + { + final TopNQuery query1 = new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new NumericTopNMetricSpec("post"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), + null, + Granularities.ALL, + ImmutableList.of(new CountAggregatorFactory("metric1")), + ImmutableList.of(new ConstantPostAggregator("post", 10)), + null + ); + + final TopNQuery query2 = new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new NumericTopNMetricSpec("post"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-03/2015-01-04"))), + null, + Granularities.ALL, + ImmutableList.of(new CountAggregatorFactory("metric1")), + ImmutableList.of(new ConstantPostAggregator("post", 10) + ), + null ); final CacheStrategy, Object, TopNQuery> strategy1 = new TopNQueryQueryToolChest( - null, - null + null, + null ).getCacheStrategy(query1); final CacheStrategy, Object, TopNQuery> strategy2 = new TopNQueryQueryToolChest( - null, - null + null, + null ).getCacheStrategy(query2); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); } @Test - public void testMinTopNThreshold() throws Exception + public void testMinTopNThreshold() { TopNQueryConfig config = new TopNQueryConfig(); final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest( - config, - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + config, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ); QueryRunnerFactory factory = new TopNQueryRunnerFactory( - TestQueryRunners.getPool(), - chest, - QueryRunnerTestHelper.NOOP_QUERYWATCHER + TestQueryRunners.getPool(), + chest, + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); QueryRunner> runner = QueryRunnerTestHelper.makeQueryRunner( - factory, - new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId), - null + factory, + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId), + null ); Map context = Maps.newHashMap(); context.put("minTopNThreshold", 500); TopNQueryBuilder builder = new TopNQueryBuilder() - .dataSource(QueryRunnerTestHelper.dataSource) - .granularity(QueryRunnerTestHelper.allGran) - .dimension(QueryRunnerTestHelper.placementishDimension) - .metric(QueryRunnerTestHelper.indexMetric) - .intervals(QueryRunnerTestHelper.fullOnInterval) - .aggregators(QueryRunnerTestHelper.commonDoubleAggregators); + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.placementishDimension) + .metric(QueryRunnerTestHelper.indexMetric) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators(QueryRunnerTestHelper.commonDoubleAggregators); TopNQuery query1 = builder.threshold(10).context(null).build(); MockQueryRunner mockRunner = new MockQueryRunner(runner); new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config).run( - QueryPlus.wrap(query1), - ImmutableMap.of() + QueryPlus.wrap(query1), + ImmutableMap.of() ); Assert.assertEquals(1000, mockRunner.query.getThreshold()); TopNQuery query2 = builder.threshold(10).context(context).build(); new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config) - .run(QueryPlus.wrap(query2), ImmutableMap.of()); + .run(QueryPlus.wrap(query2), ImmutableMap.of()); Assert.assertEquals(500, mockRunner.query.getThreshold()); TopNQuery query3 = builder.threshold(2000).context(context).build(); new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config) - .run(QueryPlus.wrap(query3), ImmutableMap.of()); + .run(QueryPlus.wrap(query3), ImmutableMap.of()); Assert.assertEquals(2000, mockRunner.query.getThreshold()); } @@ -227,12 +301,12 @@ static class MockQueryRunner implements QueryRunner> @Override public Sequence> run( - QueryPlus> queryPlus, - Map responseContext + QueryPlus> queryPlus, + Map responseContext ) { this.query = (TopNQuery) queryPlus.getQuery(); return runner.run(queryPlus, responseContext); } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 0fd9762dc7e9..cf5517b17a74 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -497,7 +497,7 @@ private void addSequencesFromCache( return; } - final Function pullFromCacheFunction = strategy.pullFromCache(); + final Function pullFromCacheFunction = strategy.pullFromSegmentLevelCache(); final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); for (Pair cachedResultPair : cachedResults) { final byte[] cachedResult = cachedResultPair.rhs; @@ -597,7 +597,7 @@ private Sequence getAndCacheServerResults( .withQuerySegmentSpec(segmentsOfServerSpec), responseContext ); - final Function cacheFn = strategy.prepareForCache(); + final Function cacheFn = strategy.prepareForSegmentLevelCache(); return resultsBySegments .map(result -> { final BySegmentResultValueClass resultsOfSegment = result.getValue(); diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 17df72a59967..ad7fe4cd801d 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -103,7 +103,7 @@ public Sequence run(QueryPlus queryPlus, Map responseConte } if (useCache) { - final Function cacheFn = strategy.pullFromCache(); + final Function cacheFn = strategy.pullFromSegmentLevelCache(); final byte[] cachedResult = cache.get(key); if (cachedResult != null) { final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); @@ -143,7 +143,7 @@ public void cleanup(Iterator iterFromMake) final Collection> cacheFutures = Collections.synchronizedList(Lists.>newLinkedList()); if (populateCache) { - final Function cacheFn = strategy.prepareForCache(); + final Function cacheFn = strategy.prepareForSegmentLevelCache(); return Sequences.withEffect( Sequences.map( diff --git a/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java b/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java new file mode 100644 index 000000000000..f55d2e6bba68 --- /dev/null +++ b/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java @@ -0,0 +1,94 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.StringUtils; +import io.druid.query.CacheStrategy; +import io.druid.query.Query; +import io.druid.query.QueryContexts; + +public class ResultLevelCacheUtil +{ + private static final Logger log = new Logger(ResultLevelCacheUtil.class); + + public static Cache.NamedKey computeResultLevelCacheKey( + String resultLevelCacheIdentifier + ) + { + return new Cache.NamedKey( + resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier) + ); + } + + public static void populate( + Cache cache, + Cache.NamedKey key, + byte[] resultBytes + ) + { + log.debug("Populating results into cache"); + cache.put(key, resultBytes); + } + + public static boolean useResultLevelCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return useResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + public static boolean populateResultLevelCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return populateResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + private static boolean useResultLevelCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return QueryContexts.isUseResultLevelCache(query) + && strategy != null + && cacheConfig.isUseResultLevelCache() + && cacheConfig.isQueryCacheable(query); + } + + private static boolean populateResultLevelCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return QueryContexts.isPopulateResultLevelCache(query) + && strategy != null + && cacheConfig.isPopulateResultLevelCache() + && cacheConfig.isQueryCacheable(query); + } +} diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index acfe6a6153df..eaa22c6cde9d 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -30,13 +30,20 @@ public class CacheConfig { public static final String USE_CACHE = "useCache"; public static final String POPULATE_CACHE = "populateCache"; - + // The defaults defined here for cache related parameters are different from the QueryContext defaults due to legacy reasons. + // They should be made the same at some point in the future. @JsonProperty private boolean useCache = false; @JsonProperty private boolean populateCache = false; + @JsonProperty + private boolean useResultLevelCache = false; + + @JsonProperty + private boolean populateResultLevelCache = false; + @JsonProperty @Min(0) private int numBackgroundThreads = 0; @@ -45,6 +52,9 @@ public class CacheConfig @Min(0) private int cacheBulkMergeLimit = Integer.MAX_VALUE; + @JsonProperty + private int resultLevelCacheLimit = Integer.MAX_VALUE; + @JsonProperty private List unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT); @@ -58,6 +68,16 @@ public boolean isUseCache() return useCache; } + public boolean isPopulateResultLevelCache() + { + return populateResultLevelCache; + } + + public boolean isUseResultLevelCache() + { + return useResultLevelCache; + } + public int getNumBackgroundThreads() { return numBackgroundThreads; @@ -68,6 +88,11 @@ public int getCacheBulkMergeLimit() return cacheBulkMergeLimit; } + public int getResultLevelCacheLimit() + { + return resultLevelCacheLimit; + } + public boolean isQueryCacheable(Query query) { return isQueryCacheable(query.getType()); diff --git a/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java new file mode 100644 index 000000000000..39a5a6de781d --- /dev/null +++ b/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java @@ -0,0 +1,302 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import io.druid.client.ResultLevelCacheUtil; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.java.util.common.RE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.SequenceWrapper; +import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.QueryResource; + + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; + +public class ResultLevelCachingQueryRunner implements QueryRunner +{ + private static final Logger log = new Logger(ResultLevelCachingQueryRunner.class); + private final QueryRunner baseRunner; + private ObjectMapper objectMapper; + private final Cache cache; + private final CacheConfig cacheConfig; + private final boolean useResultCache; + private final boolean populateResultCache; + private Query query; + private final CacheStrategy> strategy; + + + public ResultLevelCachingQueryRunner( + QueryRunner baseRunner, + QueryToolChest queryToolChest, + Query query, + ObjectMapper objectMapper, + Cache cache, + CacheConfig cacheConfig + ) + { + this.baseRunner = baseRunner; + this.objectMapper = objectMapper; + this.cache = cache; + this.cacheConfig = cacheConfig; + this.query = query; + this.strategy = queryToolChest.getCacheStrategy(query); + this.populateResultCache = ResultLevelCacheUtil.populateResultLevelCacheOnBrokers(query, strategy, cacheConfig); + this.useResultCache = ResultLevelCacheUtil.useResultLevelCacheOnBrokers(query, strategy, cacheConfig); + } + + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + if (useResultCache || populateResultCache) { + + final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeCacheKey(query)); + final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr); + String existingResultSetId = extractEtagFromResults(cachedResultSet); + + existingResultSetId = existingResultSetId == null ? "" : existingResultSetId; + query = query.withOverriddenContext( + ImmutableMap.of(QueryResource.HEADER_IF_NONE_MATCH, existingResultSetId)); + + Sequence resultFromClient = baseRunner.run( + QueryPlus.wrap(query), + responseContext + ); + String newResultSetId = (String) responseContext.get(QueryResource.HEADER_ETAG); + + if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) { + log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId()); + return deserializeResults(cachedResultSet, strategy, existingResultSetId); + } else { + @Nullable + ResultLevelCachePopulator resultLevelCachePopulator = createResultLevelCachePopulator( + cacheKeyStr, + newResultSetId + ); + if (resultLevelCachePopulator == null) { + return resultFromClient; + } + final Function cacheFn = strategy.prepareForCache(true); + + return Sequences.wrap(Sequences.map( + resultFromClient, + new Function() + { + @Override + public T apply(T input) + { + if (resultLevelCachePopulator.isShouldPopulate()) { + resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn); + } + return input; + } + } + ), new SequenceWrapper() + { + @Override + public void after(boolean isDone, Throwable thrown) + { + Preconditions.checkNotNull( + resultLevelCachePopulator, + "ResultLevelCachePopulator cannot be null during cache population" + ); + if (thrown != null) { + log.error( + thrown, + "Error while preparing for result level caching for query %s with error %s ", + query.getId(), + thrown.getMessage() + ); + } else if (resultLevelCachePopulator.isShouldPopulate()) { + // The resultset identifier and its length is cached along with the resultset + resultLevelCachePopulator.populateResults(); + log.debug("Cache population complete for query %s", query.getId()); + } + resultLevelCachePopulator.cacheObjectStream = null; + } + }); + } + } else { + return baseRunner.run( + queryPlus, + responseContext + ); + } + } + + private byte[] fetchResultsFromResultLevelCache( + final String queryCacheKey + ) + { + if (useResultCache && queryCacheKey != null) { + return cache.get(ResultLevelCacheUtil.computeResultLevelCacheKey(queryCacheKey)); + } + return null; + } + + private String extractEtagFromResults( + final byte[] cachedResult + ) + { + if (cachedResult == null) { + return null; + } + log.debug("Fetching result level cache identifier for query: %s", query.getId()); + int etagLength = ByteBuffer.wrap(cachedResult, 0, Integer.BYTES).getInt(); + return StringUtils.fromUtf8(Arrays.copyOfRange(cachedResult, Integer.BYTES, etagLength + Integer.BYTES)); + } + + private Sequence deserializeResults( + final byte[] cachedResult, CacheStrategy strategy, String resultSetId + ) + { + if (cachedResult == null) { + log.error("Cached result set is null"); + } + final Function pullFromCacheFunction = strategy.pullFromCache(true); + final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); + //Skip the resultsetID and its length bytes + Sequence cachedSequence = Sequences.simple(() -> { + try { + int resultOffset = Integer.BYTES + resultSetId.length(); + return objectMapper.readValues( + objectMapper.getFactory().createParser( + cachedResult, + resultOffset, + cachedResult.length - resultOffset + ), + cacheObjectClazz + ); + } + catch (IOException e) { + throw new RE(e, "Failed to retrieve results from cache for query ID [%s]", query.getId()); + } + }); + + return Sequences.map(cachedSequence, pullFromCacheFunction); + } + + private ResultLevelCachePopulator createResultLevelCachePopulator( + String cacheKeyStr, + String resultSetId + ) + { + if (resultSetId != null && populateResultCache) { + ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator( + cache, + objectMapper, + ResultLevelCacheUtil.computeResultLevelCacheKey(cacheKeyStr), + cacheConfig, + true + ); + try { + // Save the resultSetId and its length + resultLevelCachePopulator.cacheObjectStream.write(ByteBuffer.allocate(Integer.BYTES) + .putInt(resultSetId.length()) + .array()); + resultLevelCachePopulator.cacheObjectStream.write(StringUtils.toUtf8(resultSetId)); + } + catch (IOException ioe) { + log.error(ioe, "Failed to write cached values for query %s", query.getId()); + return null; + } + return resultLevelCachePopulator; + } else { + return null; + } + } + + public class ResultLevelCachePopulator + { + private final Cache cache; + private final ObjectMapper mapper; + private final Cache.NamedKey key; + private final CacheConfig cacheConfig; + private ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream(); + + public boolean isShouldPopulate() + { + return shouldPopulate; + } + + private boolean shouldPopulate; + + private ResultLevelCachePopulator( + Cache cache, + ObjectMapper mapper, + Cache.NamedKey key, + CacheConfig cacheConfig, + boolean shouldPopulate + ) + { + this.cache = cache; + this.mapper = mapper; + this.key = key; + this.cacheConfig = cacheConfig; + this.shouldPopulate = shouldPopulate; + } + + private void cacheResultEntry( + ResultLevelCachePopulator resultLevelCachePopulator, + T resultEntry, + Function cacheFn + ) + { + + int cacheLimit = cacheConfig.getResultLevelCacheLimit(); + try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) { + gen.writeObject(cacheFn.apply(resultEntry)); + if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) { + shouldPopulate = false; + resultLevelCachePopulator.cacheObjectStream = null; + return; + } + } + catch (IOException ex) { + log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!"); + shouldPopulate = false; + resultLevelCachePopulator.cacheObjectStream = null; + } + } + + public void populateResults() + { + ResultLevelCacheUtil.populate( + cache, + key, + cacheObjectStream.toByteArray() + ); + } + } +} diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 768e284ca1c5..e9ee6959ea32 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -24,6 +24,8 @@ import com.google.inject.Inject; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.CachingClusteredClient; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.query.FluentQueryRunnerBuilder; import io.druid.query.PostProcessingOperator; import io.druid.query.Query; @@ -31,6 +33,7 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.ResultLevelCachingQueryRunner; import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; @@ -47,6 +50,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final RetryQueryRunnerConfig retryConfig; private final ObjectMapper objectMapper; private final ServerConfig serverConfig; + private final Cache cache; + private final CacheConfig cacheConfig; + @Inject public ClientQuerySegmentWalker( @@ -55,7 +61,9 @@ public ClientQuerySegmentWalker( QueryToolChestWarehouse warehouse, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, - ServerConfig serverConfig + ServerConfig serverConfig, + Cache cache, + CacheConfig cacheConfig ) { this.emitter = emitter; @@ -64,6 +72,8 @@ public ClientQuerySegmentWalker( this.retryConfig = retryConfig; this.objectMapper = objectMapper; this.serverConfig = serverConfig; + this.cache = cache; + this.cacheConfig = cacheConfig; } @Override @@ -81,6 +91,22 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable QueryRunner makeRunner(Query query, QueryRunner baseClientRunner) { QueryToolChest> toolChest = warehouse.getToolChest(query); + + // This does not adhere to the fluent workflow. See https://github.com/druid-io/druid/issues/5517 + return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest), + toolChest, + query, + objectMapper, + cache, + cacheConfig); + } + + private QueryRunner makeRunner( + Query query, + QueryRunner baseClientRunner, + QueryToolChest> toolChest + ) + { PostProcessingOperator postProcessing = objectMapper.convertValue( query.getContextValue("postProcessing"), new TypeReference>() @@ -105,6 +131,4 @@ private QueryRunner makeRunner(Query query, QueryRunner baseClientR .emitCPUTimeMetric(emitter) .postProcess(postProcessing); } - - } diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index 3fdc02ddf30d..392ac0a6f4cc 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -316,7 +316,7 @@ public boolean isUseCache() byte[] cacheValue = cache.get(cacheKey); Assert.assertNotNull(cacheValue); - Function fn = cacheStrategy.pullFromCache(); + Function fn = cacheStrategy.pullFromSegmentLevelCache(); List cacheResults = Lists.newArrayList( Iterators.transform( objectMapper.readValues( @@ -351,7 +351,7 @@ private void testUseCache( cache, objectMapper, cacheKey, - Iterables.transform(expectedResults, cacheStrategy.prepareForCache()) + Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache()) ); CachingQueryRunner runner = new CachingQueryRunner(