Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions processing/src/main/java/io/druid/query/CacheStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,23 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;

import java.util.concurrent.ExecutorService;

/**
*/
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
/**
* Returns the given query is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarification add, "will be called on the cached by-segment results"

* called on the cached by-segment results
* @return true if the query is cacheable, otherwise false.
*/
boolean isCacheable(QueryType query, boolean willMergeRunners);

/**
* Computes the cache key for the given query
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ public CacheStrategy<Row, Object, GroupByQuery> getCacheStrategy(final GroupByQu
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<DimensionSpec> dims = query.getDimensions();

@Override
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners)
{
return strategySelector.strategize(query).isCacheable(willMergeRunners);
}

@Override
public byte[] computeCacheKey(GroupByQuery query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,26 @@
import io.druid.data.input.Row;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.groupby.GroupByQuery;
import io.druid.segment.StorageAdapter;

import java.util.Map;
import java.util.concurrent.ExecutorService;

public interface GroupByStrategy
{

/**
* Indicates this strategy is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* called on the cached by-segment results
* @return true if this strategy is cacheable, otherwise false.
*/
boolean isCacheable(boolean willMergeRunners);

Sequence<Row> mergeResults(
QueryRunner<Row> baseRunner,
GroupByQuery query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ public GroupByStrategyV1(
this.bufferPool = bufferPool;
}

@Override
public boolean isCacheable(boolean willMergeRunners)
{
return true;
}

@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public static DateTime getUniversalTimestamp(final GroupByQuery query)
}
}

@Override
public boolean isCacheable(boolean willMergeRunners)
{
return willMergeRunners;
}

@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> get
{
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
{
@Override
public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(SegmentMetadataQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ public String apply(DimensionSpec input) {
:
Collections.<String>emptyList();

@Override
public boolean isCacheable(SearchQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(SearchQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ public String apply(DimensionSpec input) {
:
Collections.<String>emptyList();

@Override
public boolean isCacheable(SelectQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(SelectQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>
{
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
{
@Override
public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> get
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();

@Override
public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(TimeseriesQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrateg
.getMetricName(query.getDimensionSpec())
);

@Override
public boolean isCacheable(TopNQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(TopNQuery query)
{
Expand Down
64 changes: 64 additions & 0 deletions server/src/main/java/io/druid/client/CacheUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;

Expand Down Expand Up @@ -77,4 +81,64 @@ public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key
}
}

public static <T> boolean useCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}

public static <T> boolean populateCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}

public static <T> boolean useCacheOnDataNodes(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true);
}

public static <T> boolean populateCacheOnDataNodes(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true);
}

private static <T> boolean useCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
}

private static <T> boolean populateCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
}

}
18 changes: 9 additions & 9 deletions server/src/main/java/io/druid/client/CachingClusteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public CachingClusteredClient(
this.cacheConfig = cacheConfig;
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);

if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) {
log.warn(
"Even though groupBy caching is enabled, v2 groupBys will not be cached. "
+ "Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching."
);
}

serverView.registerSegmentCallback(
Execs.singleThreaded("CCClient-ServerView-CB-%d"),
new ServerView.BaseSegmentCallback()
Expand All @@ -137,17 +144,10 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> responseC
final List<Pair<Interval, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();

final boolean useCache = BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final boolean populateCache = BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig);
final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig);
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);


final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();

final int priority = BaseQuery.getContextPriority(query, 0);
Expand Down
13 changes: 2 additions & 11 deletions server/src/main/java/io/druid/client/CachingQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand Down Expand Up @@ -88,16 +87,8 @@ public CachingQueryRunner(
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
final CacheStrategy strategy = toolChest.getCacheStrategy(query);

final boolean populateCache = BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);

final boolean useCache = BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig);
final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig);

final Cache.NamedKey key;
if (strategy != null && (useCache || populateCache)) {
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/io/druid/client/cache/CacheConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ public int getCacheBulkMergeLimit()

public boolean isQueryCacheable(Query query)
{
return isQueryCacheable(query.getType());
}

public boolean isQueryCacheable(String queryType) {
// O(n) impl, but I don't think we'll ever have a million query types here
return !unCacheable.contains(query.getType());
return !unCacheable.contains(queryType);
}
}