From 949c54d1a155bb41dfb9ea89c53e31609caef554 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 17 Feb 2017 09:22:57 +0900 Subject: [PATCH 1/3] Disable caching on brokers for groupBy v2 --- .../java/io/druid/query/CacheStrategy.java | 9 +++ .../groupby/GroupByQueryQueryToolChest.java | 5 ++ .../strategy/GroupByStrategySelector.java | 15 ++++- .../SegmentMetadataQueryQueryToolChest.java | 6 ++ .../search/SearchQueryQueryToolChest.java | 6 ++ .../select/SelectQueryQueryToolChest.java | 6 ++ .../TimeBoundaryQueryQueryToolChest.java | 6 ++ .../TimeseriesQueryQueryToolChest.java | 6 ++ .../query/topn/TopNQueryQueryToolChest.java | 6 ++ .../main/java/io/druid/client/CacheUtil.java | 64 +++++++++++++++++++ .../druid/client/CachingClusteredClient.java | 15 ++--- .../io/druid/client/CachingQueryRunner.java | 13 +--- .../io/druid/client/cache/CacheConfig.java | 6 +- 13 files changed, 142 insertions(+), 21 deletions(-) diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index 6b00ef0e5776..1c2917ad305d 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -26,6 +26,15 @@ */ public interface CacheStrategy> { + /** + * Returns the given query is cacheable or not. + * + * @param query the query to be cached + * @param willSortCachedData indicates the query requires sorting after pulling cached data + * @return true if the query is cacheable, otherwise false. + */ + boolean isCacheable(QueryType query, boolean willSortCachedData); + /** * Computes the cache key for the given query * diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 57979634becb..1a9b60345cf7 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -351,6 +351,11 @@ public CacheStrategy getCacheStrategy(final GroupByQu private final List aggs = query.getAggregatorSpecs(); private final List dims = query.getDimensions(); + @Override + public boolean isCacheable(GroupByQuery query, boolean willSortCachedData) + { + return willSortCachedData || !strategySelector.useStrategyV2(query); + } @Override public byte[] computeCacheKey(GroupByQuery query) diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java index e222f14d0796..56b22592632d 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java @@ -25,6 +25,8 @@ import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; +import javax.validation.constraints.NotNull; + public class GroupByStrategySelector { public static final String STRATEGY_V2 = "v2"; @@ -48,7 +50,7 @@ public GroupByStrategySelector( public GroupByStrategy strategize(GroupByQuery query) { - final String strategyString = config.withOverrides(query).getDefaultStrategy(); + final String strategyString = getStrategy(query); switch (strategyString) { case STRATEGY_V2: @@ -61,4 +63,15 @@ public GroupByStrategy strategize(GroupByQuery query) throw new ISE("No such strategy[%s]", strategyString); } } + + @NotNull + private String getStrategy(GroupByQuery query) + { + return config.withOverrides(query).getDefaultStrategy(); + } + + public boolean useStrategyV2(GroupByQuery query) + { + return getStrategy(query).equals(STRATEGY_V2); + } } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 0cb17be84436..23bebdb5456b 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -171,6 +171,12 @@ public CacheStrategy get { return new CacheStrategy() { + @Override + public boolean isCacheable(SegmentMetadataQuery query, boolean willSortCachedData) + { + return true; + } + @Override public byte[] computeCacheKey(SegmentMetadataQuery query) { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 4a0103226301..90061e70d737 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -153,6 +153,12 @@ public String apply(DimensionSpec input) { : Collections.emptyList(); + @Override + public boolean isCacheable(SearchQuery query, boolean willSortCachedData) + { + return true; + } + @Override public byte[] computeCacheKey(SearchQuery query) { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index f65918846966..2b071f28e9fc 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -159,6 +159,12 @@ public String apply(DimensionSpec input) { : Collections.emptyList(); + @Override + public boolean isCacheable(SelectQuery query, boolean willSortCachedData) + { + return true; + } + @Override public byte[] computeCacheKey(SelectQuery query) { diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 03da955bb2d0..6a8503768341 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -133,6 +133,12 @@ public CacheStrategy, Object, TimeBoundaryQuery> { return new CacheStrategy, Object, TimeBoundaryQuery>() { + @Override + public boolean isCacheable(TimeBoundaryQuery query, boolean willSortCachedData) + { + return true; + } + @Override public byte[] computeCacheKey(TimeBoundaryQuery query) { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index a399af557e76..30421e1f6c24 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -127,6 +127,12 @@ public CacheStrategy, Object, TimeseriesQuery> get { private final List aggs = query.getAggregatorSpecs(); + @Override + public boolean isCacheable(TimeseriesQuery query, boolean willSortCachedData) + { + return true; + } + @Override public byte[] computeCacheKey(TimeseriesQuery query) { diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 6f876de37e25..7e2cc7abca92 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -302,6 +302,12 @@ public CacheStrategy, Object, TopNQuery> getCacheStrateg .getMetricName(query.getDimensionSpec()) ); + @Override + public boolean isCacheable(TopNQuery query, boolean willSortCachedData) + { + return true; + } + @Override public byte[] computeCacheKey(TopNQuery query) { diff --git a/server/src/main/java/io/druid/client/CacheUtil.java b/server/src/main/java/io/druid/client/CacheUtil.java index 936f029a44a6..6210f2c5ee37 100644 --- a/server/src/main/java/io/druid/client/CacheUtil.java +++ b/server/src/main/java/io/druid/client/CacheUtil.java @@ -23,6 +23,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.query.BaseQuery; +import io.druid.query.CacheStrategy; +import io.druid.query.Query; import io.druid.query.SegmentDescriptor; import org.joda.time.Interval; @@ -77,4 +81,64 @@ public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key } } + public static boolean useCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + public static boolean populateCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + public static boolean useCacheOnDataNodes( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true); + } + + public static boolean populateCacheOnDataNodes( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true); + } + + private static boolean useCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return BaseQuery.getContextUseCache(query, true) + && strategy != null + && cacheConfig.isUseCache() + && cacheConfig.isQueryCacheable(query); + } + + private static boolean populateCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return BaseQuery.getContextPopulateCache(query, true) + && strategy != null + && cacheConfig.isPopulateCache() + && cacheConfig.isQueryCacheable(query); + } + } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 960c4feb475f..11caa303cd31 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -112,6 +112,10 @@ public CachingClusteredClient( this.cacheConfig = cacheConfig; this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); + if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) { + log.warn("Caching for groupBys is enabled, but they won't be cached if they are executed with the strategy v2!"); + } + serverView.registerSegmentCallback( Execs.singleThreaded("CCClient-ServerView-CB-%d"), new ServerView.BaseSegmentCallback() @@ -137,16 +141,11 @@ public Sequence run(final Query query, final Map responseC final List> cachedResults = Lists.newArrayList(); final Map cachePopulatorMap = Maps.newHashMap(); - final boolean useCache = BaseQuery.getContextUseCache(query, true) - && strategy != null - && cacheConfig.isUseCache() - && cacheConfig.isQueryCacheable(query); - final boolean populateCache = BaseQuery.getContextPopulateCache(query, true) - && strategy != null - && cacheConfig.isPopulateCache() - && cacheConfig.isQueryCacheable(query); + final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); + final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + log.info("useCache: " + useCache); final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 33f875b770f6..4d0119ae7246 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -36,7 +36,6 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -88,16 +87,8 @@ public CachingQueryRunner( public Sequence run(Query query, Map responseContext) { final CacheStrategy strategy = toolChest.getCacheStrategy(query); - - final boolean populateCache = BaseQuery.getContextPopulateCache(query, true) - && strategy != null - && cacheConfig.isPopulateCache() - && cacheConfig.isQueryCacheable(query); - - final boolean useCache = BaseQuery.getContextUseCache(query, true) - && strategy != null - && cacheConfig.isUseCache() - && cacheConfig.isQueryCacheable(query); + final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig); + final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig); final Cache.NamedKey key; if (strategy != null && (useCache || populateCache)) { diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index d7eb97149550..07189e31ffcd 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -69,7 +69,11 @@ public int getCacheBulkMergeLimit() public boolean isQueryCacheable(Query query) { + return isQueryCacheable(query.getType()); + } + + public boolean isQueryCacheable(String queryType) { // O(n) impl, but I don't think we'll ever have a million query types here - return !unCacheable.contains(query.getType()); + return !unCacheable.contains(queryType); } } From dcc12317a81f2a55af779b73672ad614dfd23639 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 18 Feb 2017 00:37:14 +0900 Subject: [PATCH 2/3] Rename parameter --- .../src/main/java/io/druid/query/CacheStrategy.java | 10 +++++++--- .../query/groupby/GroupByQueryQueryToolChest.java | 4 ++-- .../metadata/SegmentMetadataQueryQueryToolChest.java | 2 +- .../druid/query/search/SearchQueryQueryToolChest.java | 2 +- .../druid/query/select/SelectQueryQueryToolChest.java | 2 +- .../timeboundary/TimeBoundaryQueryQueryToolChest.java | 2 +- .../timeseries/TimeseriesQueryQueryToolChest.java | 2 +- .../io/druid/query/topn/TopNQueryQueryToolChest.java | 2 +- .../java/io/druid/client/CachingClusteredClient.java | 2 -- 9 files changed, 15 insertions(+), 13 deletions(-) diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index 1c2917ad305d..c2d2344bd951 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -22,18 +22,22 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; +import java.util.concurrent.ExecutorService; + /** */ public interface CacheStrategy> { /** * Returns the given query is cacheable or not. + * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. * - * @param query the query to be cached - * @param willSortCachedData indicates the query requires sorting after pulling cached data + * @param query the query to be cached + * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be + * called * @return true if the query is cacheable, otherwise false. */ - boolean isCacheable(QueryType query, boolean willSortCachedData); + boolean isCacheable(QueryType query, boolean willMergeRunners); /** * Computes the cache key for the given query diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 1a9b60345cf7..3685e1e70cc3 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -352,9 +352,9 @@ public CacheStrategy getCacheStrategy(final GroupByQu private final List dims = query.getDimensions(); @Override - public boolean isCacheable(GroupByQuery query, boolean willSortCachedData) + public boolean isCacheable(GroupByQuery query, boolean willMergeRunners) { - return willSortCachedData || !strategySelector.useStrategyV2(query); + return willMergeRunners || !strategySelector.useStrategyV2(query); } @Override diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 23bebdb5456b..a9fa0be3c41c 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -172,7 +172,7 @@ public CacheStrategy get return new CacheStrategy() { @Override - public boolean isCacheable(SegmentMetadataQuery query, boolean willSortCachedData) + public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners) { return true; } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 90061e70d737..7943b84d6dfb 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -154,7 +154,7 @@ public String apply(DimensionSpec input) { Collections.emptyList(); @Override - public boolean isCacheable(SearchQuery query, boolean willSortCachedData) + public boolean isCacheable(SearchQuery query, boolean willMergeRunners) { return true; } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 2b071f28e9fc..0e4515a55346 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -160,7 +160,7 @@ public String apply(DimensionSpec input) { Collections.emptyList(); @Override - public boolean isCacheable(SelectQuery query, boolean willSortCachedData) + public boolean isCacheable(SelectQuery query, boolean willMergeRunners) { return true; } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 6a8503768341..2a98bb205f82 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -134,7 +134,7 @@ public CacheStrategy, Object, TimeBoundaryQuery> return new CacheStrategy, Object, TimeBoundaryQuery>() { @Override - public boolean isCacheable(TimeBoundaryQuery query, boolean willSortCachedData) + public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners) { return true; } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 30421e1f6c24..edb2e151bb5d 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -128,7 +128,7 @@ public CacheStrategy, Object, TimeseriesQuery> get private final List aggs = query.getAggregatorSpecs(); @Override - public boolean isCacheable(TimeseriesQuery query, boolean willSortCachedData) + public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners) { return true; } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 7e2cc7abca92..950bc88dd70a 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -303,7 +303,7 @@ public CacheStrategy, Object, TopNQuery> getCacheStrateg ); @Override - public boolean isCacheable(TopNQuery query, boolean willSortCachedData) + public boolean isCacheable(TopNQuery query, boolean willMergeRunners) { return true; } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 11caa303cd31..bcf5664121f2 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -145,8 +145,6 @@ public Sequence run(final Query query, final Map responseC final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); final boolean isBySegment = BaseQuery.getContextBySegment(query, false); - log.info("useCache: " + useCache); - final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); final int priority = BaseQuery.getContextPriority(query, 0); From 3bc3a97a011295218279cb330b8399e091c6d3bd Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 18 Feb 2017 10:30:06 +0900 Subject: [PATCH 3/3] address comments --- .../main/java/io/druid/query/CacheStrategy.java | 2 +- .../query/groupby/GroupByQueryQueryToolChest.java | 2 +- .../query/groupby/strategy/GroupByStrategy.java | 13 +++++++++++++ .../groupby/strategy/GroupByStrategySelector.java | 15 +-------------- .../query/groupby/strategy/GroupByStrategyV1.java | 6 ++++++ .../query/groupby/strategy/GroupByStrategyV2.java | 6 ++++++ .../io/druid/client/CachingClusteredClient.java | 5 ++++- 7 files changed, 32 insertions(+), 17 deletions(-) diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index c2d2344bd951..8e14947b4065 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -34,7 +34,7 @@ public interface CacheStrategy> * * @param query the query to be cached * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be - * called + * called on the cached by-segment results * @return true if the query is cacheable, otherwise false. */ boolean isCacheable(QueryType query, boolean willMergeRunners); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 3685e1e70cc3..2058b8295263 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -354,7 +354,7 @@ public CacheStrategy getCacheStrategy(final GroupByQu @Override public boolean isCacheable(GroupByQuery query, boolean willMergeRunners) { - return willMergeRunners || !strategySelector.useStrategyV2(query); + return strategySelector.strategize(query).isCacheable(willMergeRunners); } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java index 4fb63a92e531..4a84f665ffbf 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java @@ -23,13 +23,26 @@ import io.druid.data.input.Row; import io.druid.java.util.common.guava.Sequence; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; import io.druid.query.groupby.GroupByQuery; import io.druid.segment.StorageAdapter; import java.util.Map; +import java.util.concurrent.ExecutorService; public interface GroupByStrategy { + + /** + * Indicates this strategy is cacheable or not. + * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. + * + * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be + * called on the cached by-segment results + * @return true if this strategy is cacheable, otherwise false. + */ + boolean isCacheable(boolean willMergeRunners); + Sequence mergeResults( QueryRunner baseRunner, GroupByQuery query, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java index 56b22592632d..e222f14d0796 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java @@ -25,8 +25,6 @@ import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; -import javax.validation.constraints.NotNull; - public class GroupByStrategySelector { public static final String STRATEGY_V2 = "v2"; @@ -50,7 +48,7 @@ public GroupByStrategySelector( public GroupByStrategy strategize(GroupByQuery query) { - final String strategyString = getStrategy(query); + final String strategyString = config.withOverrides(query).getDefaultStrategy(); switch (strategyString) { case STRATEGY_V2: @@ -63,15 +61,4 @@ public GroupByStrategy strategize(GroupByQuery query) throw new ISE("No such strategy[%s]", strategyString); } } - - @NotNull - private String getStrategy(GroupByQuery query) - { - return config.withOverrides(query).getDefaultStrategy(); - } - - public boolean useStrategyV2(GroupByQuery query) - { - return getStrategy(query).equals(STRATEGY_V2); - } } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 794fe68d7e6b..cc85d3a5a095 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -77,6 +77,12 @@ public GroupByStrategyV1( this.bufferPool = bufferPool; } + @Override + public boolean isCacheable(boolean willMergeRunners) + { + return true; + } + @Override public Sequence mergeResults( final QueryRunner baseRunner, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 95df89b550ab..08ff15b29fcd 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -112,6 +112,12 @@ public static DateTime getUniversalTimestamp(final GroupByQuery query) } } + @Override + public boolean isCacheable(boolean willMergeRunners) + { + return willMergeRunners; + } + @Override public Sequence mergeResults( final QueryRunner baseRunner, diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index bcf5664121f2..1083a48e4f08 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -113,7 +113,10 @@ public CachingClusteredClient( this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) { - log.warn("Caching for groupBys is enabled, but they won't be cached if they are executed with the strategy v2!"); + log.warn( + "Even though groupBy caching is enabled, v2 groupBys will not be cached. " + + "Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching." + ); } serverView.registerSegmentCallback(