diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index edddca824db3..83cb75b6aeb5 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -108,6 +108,9 @@ You can optionally only configure caching to be enabled on the broker by setting |--------|---------------|-----------|-------| |`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false| |`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false| +|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false| +|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false| +|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`| |`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| |`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`| diff --git a/docs/content/querying/caching.md b/docs/content/querying/caching.md index 1d6f60af441e..2dd2c88d6b62 100644 --- a/docs/content/querying/caching.md +++ b/docs/content/querying/caching.md @@ -3,9 +3,10 @@ layout: doc_page --- # Query Caching -Druid supports query result caching through an LRU cache. Results are stored on a per segment basis, along with the -parameters of a given query. This allows Druid to return final results based partially on segment results in the cache and partially -on segment results from scanning historical/real-time segments. +Druid supports query result caching through an LRU cache. Results are stored as a whole or either on a per segment basis along with the +parameters of a given query. Segment level caching allows Druid to return final results based partially on segment results in the cache +and partially on segment results from scanning historical/real-time segments. Result level caching enables Druid to cache the entire +result set, so that query results can be completely retrieved from the cache for identical queries. Segment results can be stored in a local heap cache or in an external distributed key/value store. Segment query caches can be enabled at either the Historical and Broker level (it is not recommended to enable caching on both). @@ -15,6 +16,7 @@ can be enabled at either the Historical and Broker level (it is not recommended Enabling caching on the broker can yield faster results than if query caches were enabled on Historicals for small clusters. This is the recommended setup for smaller production clusters (< 20 servers). Take note that when caching is enabled on the Broker, results from Historicals are returned on a per segment basis, and Historicals will not be able to do any local result merging. +Result level caching is enabled only on the Broker side. ## Query caching on Historicals diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index b0effe81cf9d..d4e2be28f123 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -15,6 +15,8 @@ The query context is used for various query configuration parameters. The follow |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | |populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache or druid.historical.cache.populateCache to determine whether or not to save the results of this query to the query cache | +|useResultLevelCache | `false` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useResultLevelCache to determine whether or not to read from the query cache | +|populateResultLevelCache | `false` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache to determine whether or not to save the results of this query to the query cache | |bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index 95681dd1b067..2c3ec35ed193 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -26,7 +26,7 @@ import java.util.concurrent.ExecutorService; /** -*/ + */ @ExtensionPoint public interface CacheStrategy> { @@ -37,6 +37,7 @@ public interface CacheStrategy> * @param query the query to be cached * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be * called on the cached by-segment results + * * @return true if the query is cacheable, otherwise false. */ boolean isCacheable(QueryType query, boolean willMergeRunners); @@ -45,6 +46,7 @@ public interface CacheStrategy> * Computes the cache key for the given query * * @param query the query to compute a cache key for + * * @return the cache key */ byte[] computeCacheKey(QueryType query); @@ -58,17 +60,32 @@ public interface CacheStrategy> /** * Returns a function that converts from the QueryType's result type to something cacheable. - * + *

* The resulting function must be thread-safe. * + * @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching + * * @return a thread-safe function that converts the QueryType's result type into something cacheable */ - Function prepareForCache(); + Function prepareForCache(boolean isResultLevelCache); /** * A function that does the inverse of the operation that the function prepareForCache returns * + * @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching + * * @return A function that does the inverse of the operation that the function prepareForCache returns */ - Function pullFromCache(); + Function pullFromCache(boolean isResultLevelCache); + + + default Function prepareForSegmentLevelCache() + { + return prepareForCache(false); + } + + default Function pullFromSegmentLevelCache() + { + return pullFromCache(false); + } } diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index d0a16fd87844..d88a536eb9ec 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -37,6 +37,8 @@ public class QueryContexts public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; public static final boolean DEFAULT_USE_CACHE = true; + public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true; + public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true; public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes @@ -72,6 +74,26 @@ public static boolean isUseCache(Query query, boolean defaultValue) return parseBoolean(query, "useCache", defaultValue); } + public static boolean isPopulateResultLevelCache(Query query) + { + return isPopulateResultLevelCache(query, DEFAULT_POPULATE_RESULTLEVEL_CACHE); + } + + public static boolean isPopulateResultLevelCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "populateResultLevelCache", defaultValue); + } + + public static boolean isUseResultLevelCache(Query query) + { + return isUseResultLevelCache(query, DEFAULT_USE_RESULTLEVEL_CACHE); + } + + public static boolean isUseResultLevelCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "useResultLevelCache", defaultValue); + } + public static boolean isFinalize(Query query, boolean defaultValue) { return parseBoolean(query, "finalize", defaultValue); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 28d5acb42de0..1252caa40ddd 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -50,6 +50,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulatorFns; +import io.druid.query.aggregation.PostAggregator; import io.druid.query.cache.CacheKeyBuilder; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -408,7 +409,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function prepareForCache() + public Function prepareForCache(boolean isResultLevelCache) { return new Function() { @@ -426,6 +427,11 @@ public Object apply(Row input) for (AggregatorFactory agg : aggs) { retVal.add(event.get(agg.getName())); } + if (isResultLevelCache) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + retVal.add(event.get(postAgg.getName())); + } + } return retVal; } @@ -435,7 +441,7 @@ public Object apply(Row input) } @Override - public Function pullFromCache() + public Function pullFromCache(boolean isResultLevelCache) { return new Function() { @@ -460,7 +466,12 @@ public Row apply(Object input) final AggregatorFactory factory = aggsIter.next(); event.put(factory.getName(), factory.deserialize(results.next())); } - + if (isResultLevelCache) { + Iterator postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && results.hasNext()) { + event.put(postItr.next().getName(), results.next()); + } + } if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) { throw new ISE( "Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]", diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 4a921480f30f..5025d9dbc975 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -198,7 +198,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function prepareForCache() + public Function prepareForCache(boolean isResultLevelCache) { return new Function() { @@ -211,7 +211,7 @@ public SegmentAnalysis apply(@Nullable SegmentAnalysis input) } @Override - public Function pullFromCache() + public Function pullFromCache(boolean isResultLevelCache) { return new Function() { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 74148d66aea4..9efad496b351 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -206,7 +206,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -221,7 +221,7 @@ public Object apply(Result input) } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 786391515604..c0c3d8278502 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -243,7 +243,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -272,7 +272,7 @@ public Object apply(final Result input) } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 99586b595dd3..a1046a00c05b 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -171,7 +171,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -184,7 +184,7 @@ public Object apply(Result input) } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 473775b71bc0..6e9f0307f70d 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -174,7 +174,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -188,14 +188,18 @@ public Object apply(final Result input) for (AggregatorFactory agg : aggs) { retVal.add(results.getMetric(agg.getName())); } - + if (isResultLevelCache) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + retVal.add(results.getMetric(postAgg.getName())); + } + } return retVal; } }; } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { @@ -216,6 +220,12 @@ public Result apply(@Nullable Object input) final AggregatorFactory factory = aggsIter.next(); retVal.put(factory.getName(), factory.deserialize(resultIter.next())); } + if (isResultLevelCache) { + Iterator postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && resultIter.hasNext()) { + retVal.put(postItr.next().getName(), resultIter.next()); + } + } return new Result( timestamp, diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 81761afc6a37..c34e43eb1a46 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -293,7 +293,6 @@ public TypeReference> getResultTypeReference() } - @Override public CacheStrategy, Object, TopNQuery> getCacheStrategy(final TopNQuery query) { @@ -341,7 +340,7 @@ public TypeReference getCacheObjectClazz() } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -361,6 +360,11 @@ public Object apply(final Result input) for (String aggName : aggFactoryNames) { vals.add(result.getMetric(aggName)); } + if (isResultLevelCache) { + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + vals.add(result.getMetric(postAgg.getName())); + } + } retVal.add(vals); } return retVal; @@ -369,7 +373,7 @@ public Object apply(final Result input) } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { @@ -401,7 +405,12 @@ public Result apply(Object input) for (PostAggregator postAgg : postAggs) { vals.put(postAgg.getName(), postAgg.compute(vals)); } - + if (isResultLevelCache) { + Iterator postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && resultIter.hasNext()) { + vals.put(postItr.next().getName(), resultIter.next()); + } + } retVal.add(vals); } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index 359e39f6af2b..a497a6313801 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -94,7 +94,7 @@ public void testCacheStrategy() throws Exception null ); - Object preparedValue = strategy.prepareForCache().apply(result); + Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result); ObjectMapper objectMapper = new DefaultObjectMapper(); SegmentAnalysis fromCacheValue = objectMapper.readValue( @@ -102,7 +102,7 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - SegmentAnalysis fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + SegmentAnalysis fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java index 621a435f5e95..dd73518ae5fe 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java @@ -59,7 +59,7 @@ public void testCacheStrategy() throws Exception new SearchResultValue(ImmutableList.of(new SearchHit("dim1", "a"))) ); - Object preparedValue = strategy.prepareForCache().apply( + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( result ); @@ -69,7 +69,7 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java index e9092200682f..d947613dbf62 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java @@ -215,7 +215,7 @@ public void testCacheStrategy() throws Exception ) ); - Object preparedValue = strategy.prepareForCache().apply( + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( result ); @@ -225,7 +225,7 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 8e67921dd83e..0c4c0e7b6e86 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -32,6 +32,8 @@ import io.druid.query.TableDataSource; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.TestHelper; import io.druid.segment.VirtualColumns; @@ -76,12 +78,12 @@ public void testCacheStrategy() throws Exception new CountAggregatorFactory("metric1"), new LongSumAggregatorFactory("metric0", "metric0") ), - null, + ImmutableList.of(new ConstantPostAggregator("post", 10)), null ) ); - final Result result = new Result<>( + final Result result1 = new Result<>( // test timestamps that result in integer size millis DateTimes.utc(123L), new TimeseriesResultValue( @@ -89,7 +91,7 @@ public void testCacheStrategy() throws Exception ) ); - Object preparedValue = strategy.prepareForCache().apply(result); + Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1); ObjectMapper objectMapper = TestHelper.makeJsonMapper(); Object fromCacheValue = objectMapper.readValue( @@ -97,9 +99,26 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + + final Result result2 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TimeseriesResultValue( + ImmutableMap.of("metric1", 2, "metric0", 3, "post", 10) + ) + ); + + Object preparedResultLevelCacheValue = strategy.prepareForCache(true).apply(result2); + Object fromResultLevelCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultLevelCacheValue), + strategy.getCacheObjectClazz() + ); - Assert.assertEquals(result, fromCacheResult); + Result fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue); + Assert.assertEquals(result2, fromResultLevelCacheRes); } @Test diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java index 5cd277123a6c..1a4e78351f7e 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -78,7 +78,7 @@ public void testCacheStrategy() throws Exception ) ); - final Result result = new Result<>( + final Result result1 = new Result<>( // test timestamps that result in integer size millis DateTimes.utc(123L), new TopNResultValue( @@ -91,8 +91,8 @@ public void testCacheStrategy() throws Exception ) ); - Object preparedValue = strategy.prepareForCache().apply( - result + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( + result1 ); ObjectMapper objectMapper = TestHelper.makeJsonMapper(); @@ -101,9 +101,36 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + + final Result result2 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TopNResultValue( + Arrays.asList( + ImmutableMap.of( + "test", "val1", + "metric1", 2, + "post", 10 + ) + ) + ) + ); + + Object preparedResultCacheValue = strategy.prepareForCache(true).apply( + result2 + ); + + Object fromResultCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultCacheValue), + strategy.getCacheObjectClazz() + ); + + Result fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue); + Assert.assertEquals(result2, fromResultCacheResult); - Assert.assertEquals(result, fromCacheResult); } @Test diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 1e2e441949d8..df48cc0f21ef 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -500,7 +500,7 @@ private void addSequencesFromCache( return; } - final Function pullFromCacheFunction = strategy.pullFromCache(); + final Function pullFromCacheFunction = strategy.pullFromSegmentLevelCache(); final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); for (Pair cachedResultPair : cachedResults) { final byte[] cachedResult = cachedResultPair.rhs; @@ -600,7 +600,7 @@ private Sequence getAndCacheServerResults( .withQuerySegmentSpec(segmentsOfServerSpec), responseContext ); - final Function cacheFn = strategy.prepareForCache(); + final Function cacheFn = strategy.prepareForSegmentLevelCache(); return resultsBySegments .map(result -> { final BySegmentResultValueClass resultsOfSegment = result.getValue(); diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 54ecc5c35a90..79bcbfb0d178 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -102,7 +102,7 @@ public Sequence run(QueryPlus queryPlus, Map responseConte } if (useCache) { - final Function cacheFn = strategy.pullFromCache(); + final Function cacheFn = strategy.pullFromSegmentLevelCache(); final byte[] cachedResult = cache.get(key); if (cachedResult != null) { final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); @@ -142,7 +142,7 @@ public void cleanup(Iterator iterFromMake) final Collection> cacheFutures = Collections.synchronizedList(Lists.>newLinkedList()); if (populateCache) { - final Function cacheFn = strategy.prepareForCache(); + final Function cacheFn = strategy.prepareForSegmentLevelCache(); return Sequences.withEffect( Sequences.map( diff --git a/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java b/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java new file mode 100644 index 000000000000..f55d2e6bba68 --- /dev/null +++ b/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java @@ -0,0 +1,94 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.StringUtils; +import io.druid.query.CacheStrategy; +import io.druid.query.Query; +import io.druid.query.QueryContexts; + +public class ResultLevelCacheUtil +{ + private static final Logger log = new Logger(ResultLevelCacheUtil.class); + + public static Cache.NamedKey computeResultLevelCacheKey( + String resultLevelCacheIdentifier + ) + { + return new Cache.NamedKey( + resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier) + ); + } + + public static void populate( + Cache cache, + Cache.NamedKey key, + byte[] resultBytes + ) + { + log.debug("Populating results into cache"); + cache.put(key, resultBytes); + } + + public static boolean useResultLevelCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return useResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + public static boolean populateResultLevelCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return populateResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + private static boolean useResultLevelCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return QueryContexts.isUseResultLevelCache(query) + && strategy != null + && cacheConfig.isUseResultLevelCache() + && cacheConfig.isQueryCacheable(query); + } + + private static boolean populateResultLevelCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return QueryContexts.isPopulateResultLevelCache(query) + && strategy != null + && cacheConfig.isPopulateResultLevelCache() + && cacheConfig.isQueryCacheable(query); + } +} diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index f1cc030c09f9..d73f9f387da0 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -29,13 +29,20 @@ public class CacheConfig { public static final String POPULATE_CACHE = "populateCache"; - + // The defaults defined here for cache related parameters are different from the QueryContext defaults due to legacy reasons. + // They should be made the same at some point in the future. @JsonProperty private boolean useCache = false; @JsonProperty private boolean populateCache = false; + @JsonProperty + private boolean useResultLevelCache = false; + + @JsonProperty + private boolean populateResultLevelCache = false; + @JsonProperty @Min(0) private int numBackgroundThreads = 0; @@ -44,6 +51,9 @@ public class CacheConfig @Min(0) private int cacheBulkMergeLimit = Integer.MAX_VALUE; + @JsonProperty + private int resultLevelCacheLimit = Integer.MAX_VALUE; + @JsonProperty private List unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT); @@ -57,6 +67,16 @@ public boolean isUseCache() return useCache; } + public boolean isPopulateResultLevelCache() + { + return populateResultLevelCache; + } + + public boolean isUseResultLevelCache() + { + return useResultLevelCache; + } + public int getNumBackgroundThreads() { return numBackgroundThreads; @@ -67,6 +87,11 @@ public int getCacheBulkMergeLimit() return cacheBulkMergeLimit; } + public int getResultLevelCacheLimit() + { + return resultLevelCacheLimit; + } + public boolean isQueryCacheable(Query query) { return isQueryCacheable(query.getType()); diff --git a/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java new file mode 100644 index 000000000000..39a5a6de781d --- /dev/null +++ b/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java @@ -0,0 +1,302 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import io.druid.client.ResultLevelCacheUtil; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.java.util.common.RE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.SequenceWrapper; +import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.QueryResource; + + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; + +public class ResultLevelCachingQueryRunner implements QueryRunner +{ + private static final Logger log = new Logger(ResultLevelCachingQueryRunner.class); + private final QueryRunner baseRunner; + private ObjectMapper objectMapper; + private final Cache cache; + private final CacheConfig cacheConfig; + private final boolean useResultCache; + private final boolean populateResultCache; + private Query query; + private final CacheStrategy> strategy; + + + public ResultLevelCachingQueryRunner( + QueryRunner baseRunner, + QueryToolChest queryToolChest, + Query query, + ObjectMapper objectMapper, + Cache cache, + CacheConfig cacheConfig + ) + { + this.baseRunner = baseRunner; + this.objectMapper = objectMapper; + this.cache = cache; + this.cacheConfig = cacheConfig; + this.query = query; + this.strategy = queryToolChest.getCacheStrategy(query); + this.populateResultCache = ResultLevelCacheUtil.populateResultLevelCacheOnBrokers(query, strategy, cacheConfig); + this.useResultCache = ResultLevelCacheUtil.useResultLevelCacheOnBrokers(query, strategy, cacheConfig); + } + + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + if (useResultCache || populateResultCache) { + + final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeCacheKey(query)); + final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr); + String existingResultSetId = extractEtagFromResults(cachedResultSet); + + existingResultSetId = existingResultSetId == null ? "" : existingResultSetId; + query = query.withOverriddenContext( + ImmutableMap.of(QueryResource.HEADER_IF_NONE_MATCH, existingResultSetId)); + + Sequence resultFromClient = baseRunner.run( + QueryPlus.wrap(query), + responseContext + ); + String newResultSetId = (String) responseContext.get(QueryResource.HEADER_ETAG); + + if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) { + log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId()); + return deserializeResults(cachedResultSet, strategy, existingResultSetId); + } else { + @Nullable + ResultLevelCachePopulator resultLevelCachePopulator = createResultLevelCachePopulator( + cacheKeyStr, + newResultSetId + ); + if (resultLevelCachePopulator == null) { + return resultFromClient; + } + final Function cacheFn = strategy.prepareForCache(true); + + return Sequences.wrap(Sequences.map( + resultFromClient, + new Function() + { + @Override + public T apply(T input) + { + if (resultLevelCachePopulator.isShouldPopulate()) { + resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn); + } + return input; + } + } + ), new SequenceWrapper() + { + @Override + public void after(boolean isDone, Throwable thrown) + { + Preconditions.checkNotNull( + resultLevelCachePopulator, + "ResultLevelCachePopulator cannot be null during cache population" + ); + if (thrown != null) { + log.error( + thrown, + "Error while preparing for result level caching for query %s with error %s ", + query.getId(), + thrown.getMessage() + ); + } else if (resultLevelCachePopulator.isShouldPopulate()) { + // The resultset identifier and its length is cached along with the resultset + resultLevelCachePopulator.populateResults(); + log.debug("Cache population complete for query %s", query.getId()); + } + resultLevelCachePopulator.cacheObjectStream = null; + } + }); + } + } else { + return baseRunner.run( + queryPlus, + responseContext + ); + } + } + + private byte[] fetchResultsFromResultLevelCache( + final String queryCacheKey + ) + { + if (useResultCache && queryCacheKey != null) { + return cache.get(ResultLevelCacheUtil.computeResultLevelCacheKey(queryCacheKey)); + } + return null; + } + + private String extractEtagFromResults( + final byte[] cachedResult + ) + { + if (cachedResult == null) { + return null; + } + log.debug("Fetching result level cache identifier for query: %s", query.getId()); + int etagLength = ByteBuffer.wrap(cachedResult, 0, Integer.BYTES).getInt(); + return StringUtils.fromUtf8(Arrays.copyOfRange(cachedResult, Integer.BYTES, etagLength + Integer.BYTES)); + } + + private Sequence deserializeResults( + final byte[] cachedResult, CacheStrategy strategy, String resultSetId + ) + { + if (cachedResult == null) { + log.error("Cached result set is null"); + } + final Function pullFromCacheFunction = strategy.pullFromCache(true); + final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); + //Skip the resultsetID and its length bytes + Sequence cachedSequence = Sequences.simple(() -> { + try { + int resultOffset = Integer.BYTES + resultSetId.length(); + return objectMapper.readValues( + objectMapper.getFactory().createParser( + cachedResult, + resultOffset, + cachedResult.length - resultOffset + ), + cacheObjectClazz + ); + } + catch (IOException e) { + throw new RE(e, "Failed to retrieve results from cache for query ID [%s]", query.getId()); + } + }); + + return Sequences.map(cachedSequence, pullFromCacheFunction); + } + + private ResultLevelCachePopulator createResultLevelCachePopulator( + String cacheKeyStr, + String resultSetId + ) + { + if (resultSetId != null && populateResultCache) { + ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator( + cache, + objectMapper, + ResultLevelCacheUtil.computeResultLevelCacheKey(cacheKeyStr), + cacheConfig, + true + ); + try { + // Save the resultSetId and its length + resultLevelCachePopulator.cacheObjectStream.write(ByteBuffer.allocate(Integer.BYTES) + .putInt(resultSetId.length()) + .array()); + resultLevelCachePopulator.cacheObjectStream.write(StringUtils.toUtf8(resultSetId)); + } + catch (IOException ioe) { + log.error(ioe, "Failed to write cached values for query %s", query.getId()); + return null; + } + return resultLevelCachePopulator; + } else { + return null; + } + } + + public class ResultLevelCachePopulator + { + private final Cache cache; + private final ObjectMapper mapper; + private final Cache.NamedKey key; + private final CacheConfig cacheConfig; + private ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream(); + + public boolean isShouldPopulate() + { + return shouldPopulate; + } + + private boolean shouldPopulate; + + private ResultLevelCachePopulator( + Cache cache, + ObjectMapper mapper, + Cache.NamedKey key, + CacheConfig cacheConfig, + boolean shouldPopulate + ) + { + this.cache = cache; + this.mapper = mapper; + this.key = key; + this.cacheConfig = cacheConfig; + this.shouldPopulate = shouldPopulate; + } + + private void cacheResultEntry( + ResultLevelCachePopulator resultLevelCachePopulator, + T resultEntry, + Function cacheFn + ) + { + + int cacheLimit = cacheConfig.getResultLevelCacheLimit(); + try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) { + gen.writeObject(cacheFn.apply(resultEntry)); + if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) { + shouldPopulate = false; + resultLevelCachePopulator.cacheObjectStream = null; + return; + } + } + catch (IOException ex) { + log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!"); + shouldPopulate = false; + resultLevelCachePopulator.cacheObjectStream = null; + } + } + + public void populateResults() + { + ResultLevelCacheUtil.populate( + cache, + key, + cacheObjectStream.toByteArray() + ); + } + } +} diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 768e284ca1c5..e9ee6959ea32 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -24,6 +24,8 @@ import com.google.inject.Inject; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.CachingClusteredClient; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.query.FluentQueryRunnerBuilder; import io.druid.query.PostProcessingOperator; import io.druid.query.Query; @@ -31,6 +33,7 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.ResultLevelCachingQueryRunner; import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; @@ -47,6 +50,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final RetryQueryRunnerConfig retryConfig; private final ObjectMapper objectMapper; private final ServerConfig serverConfig; + private final Cache cache; + private final CacheConfig cacheConfig; + @Inject public ClientQuerySegmentWalker( @@ -55,7 +61,9 @@ public ClientQuerySegmentWalker( QueryToolChestWarehouse warehouse, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, - ServerConfig serverConfig + ServerConfig serverConfig, + Cache cache, + CacheConfig cacheConfig ) { this.emitter = emitter; @@ -64,6 +72,8 @@ public ClientQuerySegmentWalker( this.retryConfig = retryConfig; this.objectMapper = objectMapper; this.serverConfig = serverConfig; + this.cache = cache; + this.cacheConfig = cacheConfig; } @Override @@ -81,6 +91,22 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable QueryRunner makeRunner(Query query, QueryRunner baseClientRunner) { QueryToolChest> toolChest = warehouse.getToolChest(query); + + // This does not adhere to the fluent workflow. See https://github.com/druid-io/druid/issues/5517 + return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest), + toolChest, + query, + objectMapper, + cache, + cacheConfig); + } + + private QueryRunner makeRunner( + Query query, + QueryRunner baseClientRunner, + QueryToolChest> toolChest + ) + { PostProcessingOperator postProcessing = objectMapper.convertValue( query.getContextValue("postProcessing"), new TypeReference>() @@ -105,6 +131,4 @@ private QueryRunner makeRunner(Query query, QueryRunner baseClientR .emitCPUTimeMetric(emitter) .postProcess(postProcessing); } - - } diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index 4ee3013eb7d1..65ee95acdfae 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -314,7 +314,7 @@ public boolean isUseCache() byte[] cacheValue = cache.get(cacheKey); Assert.assertNotNull(cacheValue); - Function fn = cacheStrategy.pullFromCache(); + Function fn = cacheStrategy.pullFromSegmentLevelCache(); List cacheResults = Lists.newArrayList( Iterators.transform( objectMapper.readValues( @@ -349,7 +349,7 @@ private void testUseCache( cache, objectMapper, cacheKey, - Iterables.transform(expectedResults, cacheStrategy.prepareForCache()) + Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache()) ); CachingQueryRunner runner = new CachingQueryRunner(