diff --git a/docs/configuration/index.md b/docs/configuration/index.md index ea15380260ee..adaba483f8b8 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1985,7 +1985,7 @@ You can optionally only configure caching to be enabled on the Broker by setting See [cache configuration](#cache-configuration) for how to configure cache settings. -> Note: Even if cache is enabled, for [groupBy v2](../querying/groupbyquery.md#strategies) queries, both of non-result level cache and result level cache do not work on Brokers. +> Note: Even if cache is enabled, for [groupBy v2](../querying/groupbyquery.md#strategies) queries, segment level cache do not work on Brokers. > See [Differences between v1 and v2](../querying/groupbyquery.md#differences-between-v1-and-v2) and [Query caching](../querying/caching.md) for more information. #### Segment Discovery diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java index bd21034186ca..d883e7555e97 100644 --- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java @@ -36,6 +36,23 @@ @ExtensionPoint public interface CacheStrategy> { + /** + * This method is deprecated and retained for backward incompatibility. + * Returns whether the given query is cacheable or not. + * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. + * + * @param ignoredQuery the query to be cached + * @param ignoredWillMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be + * called on the cached by-segment results + * + * @return true if the query is cacheable, otherwise false. + */ + @Deprecated + default boolean isCacheable(QueryType ignoredQuery, boolean ignoredWillMergeRunners) + { + return false; + } + /** * Returns whether the given query is cacheable or not. * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. @@ -43,10 +60,14 @@ public interface CacheStrategy> * @param query the query to be cached * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be * called on the cached by-segment results + * @param bySegment segment level or result level cache * * @return true if the query is cacheable, otherwise false. */ - boolean isCacheable(QueryType query, boolean willMergeRunners); + default boolean isCacheable(QueryType query, boolean willMergeRunners, boolean bySegment) + { + return isCacheable(query, willMergeRunners); + } /** * Computes the per-segment cache key for the given query. Because this is a per-segment cache key, it should only diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 173e6babd0d4..e49cce957401 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -515,9 +515,9 @@ public CacheStrategy getCacheStrategy(final Gro private final List dims = query.getDimensions(); @Override - public boolean isCacheable(GroupByQuery query, boolean willMergeRunners) + public boolean isCacheable(GroupByQuery query, boolean willMergeRunners, boolean bySegment) { - return strategySelector.strategize(query).isCacheable(willMergeRunners); + return strategySelector.strategize(query).isCacheable(willMergeRunners, bySegment); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java index 87e6a3dcac10..230e824cab1e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java @@ -60,10 +60,11 @@ public interface GroupByStrategy * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be * called on the cached by-segment results. Can be used to distinguish if we are running on * a broker or data node. + * @param bySegment segment level or result level cache * * @return true if this strategy is cacheable, otherwise false. */ - boolean isCacheable(boolean willMergeRunners); + boolean isCacheable(boolean willMergeRunners, boolean bySegment); /** * Indicates if this query should undergo "mergeResults" or not. Checked by diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java index 8c119d59dd43..c82d29d16922 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -83,7 +83,7 @@ public GroupByQueryResource prepareResource(GroupByQuery query) } @Override - public boolean isCacheable(boolean willMergeRunners) + public boolean isCacheable(boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index cda33f459edf..df599952845d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -180,9 +180,11 @@ private static int countRequiredMergeBufferNumWithoutSubtotal(Query query, int f } @Override - public boolean isCacheable(boolean willMergeRunners) + public boolean isCacheable(boolean willMergeRunners, boolean bySegment) { - return willMergeRunners; + //disable segment-level cache on borker, + //see PR https://github.com/apache/druid/issues/3820 + return willMergeRunners || !bySegment; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 45cc18ff5a39..9041db0c6a25 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -185,7 +185,7 @@ public CacheStrategy get return new CacheStrategy() { @Override - public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners) + public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java index da59cf9c7f59..b390cd83a58d 100644 --- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java @@ -136,7 +136,7 @@ public CacheStrategy, Object, SearchQuery> getCacheStr : Collections.emptyList(); @Override - public boolean isCacheable(SearchQuery query, boolean willMergeRunners) + public boolean isCacheable(SearchQuery query, boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 1bb109ef04f7..9087dd26a885 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -167,7 +167,7 @@ public CacheStrategy, Object, TimeBoundaryQuery> return new CacheStrategy, Object, TimeBoundaryQuery>() { @Override - public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners) + public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 5a4417aa719f..909946262a01 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -269,7 +269,7 @@ public CacheStrategy, Object, TimeseriesQuery> get private final List aggs = query.getAggregatorSpecs(); @Override - public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners) + public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index a7785a01be1a..e998f2be63d8 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -282,7 +282,7 @@ public CacheStrategy, Object, TopNQuery> getCacheStrateg ); @Override - public boolean isCacheable(TopNQuery query, boolean willMergeRunners) + public boolean isCacheable(TopNQuery query, boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 2a700c08bd48..2e877f61cfa3 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -20,10 +20,17 @@ package org.apache.druid.query.groupby; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.SerializablePair; +import org.apache.druid.collections.StupidPool; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.DateTimes; @@ -31,6 +38,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerTestHelper; @@ -62,6 +70,9 @@ import org.apache.druid.query.groupby.having.OrHavingSpec; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnType; @@ -74,6 +85,7 @@ import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -1136,4 +1148,68 @@ private static ResultRow makeRow(final GroupByQuery query, final String timestam { return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); } + + @Test + public void testIsQueryCacheableOnGroupByStrategyV2() + { + final GroupByQuery query = new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setGranularity(Granularities.DAY) + .setDimensions(new DefaultDimensionSpec("col", "dim")) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setAggregatorSpecs(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .build(); + final DruidProcessingConfig processingConfig = new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return "processing-%s"; + } + }; + final GroupByQueryConfig queryConfig = new GroupByQueryConfig(); + final Supplier queryConfigSupplier = Suppliers.ofInstance(queryConfig); + final Supplier bufferSupplier = + () -> ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes()); + + final NonBlockingPool bufferPool = new StupidPool<>( + "GroupByQueryEngine-bufferPool", + bufferSupplier + ); + final BlockingPool mergeBufferPool = new DefaultBlockingPool<>( + bufferSupplier, + processingConfig.getNumMergeBuffers() + ); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + queryConfigSupplier, + new GroupByStrategyV1( + queryConfigSupplier, + new GroupByQueryEngine(queryConfigSupplier, bufferPool), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new GroupByStrategyV2( + processingConfig, + queryConfigSupplier, + bufferPool, + mergeBufferPool, + TestHelper.makeJsonMapper(), + new ObjectMapper(new SmileFactory()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ); + final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(strategySelector); + CacheStrategy cacheStrategy = queryToolChest.getCacheStrategy(query); + Assert.assertTrue( + "result level cache on broker server for GroupByStrategyV2 should be enabled", + cacheStrategy.isCacheable(query, false, false) + ); + Assert.assertFalse( + "segment level cache on broker server for GroupByStrategyV2 should be disabled", + cacheStrategy.isCacheable(query, false, true) + ); + Assert.assertTrue( + "segment level cache on data server for GroupByStrategyV2 should be enabled", + cacheStrategy.isCacheable(query, true, true) + ); + } } diff --git a/server/src/main/java/org/apache/druid/client/CacheUtil.java b/server/src/main/java/org/apache/druid/client/CacheUtil.java index 42cfe171c7b9..a8d65e4effac 100644 --- a/server/src/main/java/org/apache/druid/client/CacheUtil.java +++ b/server/src/main/java/org/apache/druid/client/CacheUtil.java @@ -110,7 +110,7 @@ public static boolean isUseSegmentCache( { return cacheConfig.isUseCache() && query.context().isUseCache() - && isQueryCacheable(query, cacheStrategy, cacheConfig, serverType); + && isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, true); } /** @@ -128,7 +128,7 @@ public static boolean isPopulateSegmentCache( ServerType serverType ) { - return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, true) && query.context().isPopulateCache() && cacheConfig.isPopulateCache(); } @@ -148,7 +148,7 @@ public static boolean isUseResultCache( ServerType serverType ) { - return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, false) && query.context().isUseResultLevelCache() && cacheConfig.isUseResultLevelCache(); } @@ -168,7 +168,7 @@ public static boolean isPopulateResultCache( ServerType serverType ) { - return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, false) && query.context().isPopulateResultLevelCache() && cacheConfig.isPopulateResultLevelCache(); } @@ -181,16 +181,18 @@ public static boolean isPopulateResultCache( * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query * @param cacheConfig current active cache config * @param serverType BROKER or DATA + * @param bySegment segement level or result-level cache */ static boolean isQueryCacheable( final Query query, @Nullable final CacheStrategy> cacheStrategy, final CacheConfig cacheConfig, - final ServerType serverType + final ServerType serverType, + final boolean bySegment ) { return cacheStrategy != null - && cacheStrategy.isCacheable(query, serverType.willMergeRunners()) + && cacheStrategy.isCacheable(query, serverType.willMergeRunners(), bySegment) && cacheConfig.isQueryCacheable(query) && query.getDataSource().isCacheable(serverType == ServerType.BROKER); } diff --git a/server/src/test/java/org/apache/druid/client/CacheUtilTest.java b/server/src/test/java/org/apache/druid/client/CacheUtilTest.java index dc155398ab0b..9d886765ddce 100644 --- a/server/src/test/java/org/apache/druid/client/CacheUtilTest.java +++ b/server/src/test/java/org/apache/druid/client/CacheUtilTest.java @@ -54,7 +54,8 @@ public void test_isQueryCacheable_cacheableOnBroker() timeseriesQuery, new DummyCacheStrategy<>(true, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -67,7 +68,8 @@ public void test_isQueryCacheable_cacheableOnDataServer() timeseriesQuery, new DummyCacheStrategy<>(true, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.DATA + CacheUtil.ServerType.DATA, + true ) ); } @@ -80,7 +82,8 @@ public void test_isQueryCacheable_unCacheableOnBroker() timeseriesQuery, new DummyCacheStrategy<>(false, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -93,7 +96,8 @@ public void test_isQueryCacheable_unCacheableOnDataServer() timeseriesQuery, new DummyCacheStrategy<>(true, false), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.DATA + CacheUtil.ServerType.DATA, + true ) ); } @@ -106,7 +110,8 @@ public void test_isQueryCacheable_unCacheableType() timeseriesQuery, new DummyCacheStrategy<>(true, false), makeCacheConfig(ImmutableMap.of("unCacheable", ImmutableList.of("timeseries"))), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -119,7 +124,8 @@ public void test_isQueryCacheable_unCacheableDataSourceOnBroker() timeseriesQuery.withDataSource(new GlobalTableDataSource("global")), new DummyCacheStrategy<>(true, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -132,7 +138,8 @@ public void test_isQueryCacheable_unCacheableDataSourceOnDataServer() timeseriesQuery.withDataSource(new LookupDataSource("lookyloo")), new DummyCacheStrategy<>(true, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.DATA + CacheUtil.ServerType.DATA, + true ) ); } @@ -145,7 +152,8 @@ public void test_isQueryCacheable_nullCacheStrategy() timeseriesQuery, null, makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -168,7 +176,7 @@ public DummyCacheStrategy(boolean cacheableOnBrokers, boolean cacheableOnDataSer } @Override - public boolean isCacheable(QueryType query, boolean willMergeRunners) + public boolean isCacheable(QueryType query, boolean willMergeRunners, boolean bySegment) { return willMergeRunners ? cacheableOnDataServers : cacheableOnBrokers; }