Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@ You can optionally only configure caching to be enabled on the Broker by setting

See [cache configuration](#cache-configuration) for how to configure cache settings.

> Note: Even if cache is enabled, for [groupBy v2](../querying/groupbyquery.md#strategies) queries, both of non-result level cache and result level cache do not work on Brokers.
> Note: Even if cache is enabled, for [groupBy v2](../querying/groupbyquery.md#strategies) queries, segment level cache do not work on Brokers.
> See [Differences between v1 and v2](../querying/groupbyquery.md#differences-between-v1-and-v2) and [Query caching](../querying/caching.md) for more information.

#### Segment Discovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,38 @@
@ExtensionPoint
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
/**
* This method is deprecated and retained for backward incompatibility.
* Returns whether the given query is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param ignoredQuery the query to be cached
* @param ignoredWillMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
* called on the cached by-segment results
*
* @return true if the query is cacheable, otherwise false.
*/
@Deprecated
default boolean isCacheable(QueryType ignoredQuery, boolean ignoredWillMergeRunners)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'ignoredQuery' is never used.

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'ignoredWillMergeRunners' is never used.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the old method and doesn't need a default implementation as such but a default impl is good since implementations don't need to override the deprecated method. The new method will definitely require a default implementation. You can do this

  /**
   * This method is deprecated and retained for backward incompatibility.
   * Returns whether the given query is cacheable or not.
   * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
   *
   * @param ignoredQuery            the query to be cached
   * @param ignoredWillMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
   *                         called on the cached by-segment results
   *
   * @return true if the query is cacheable, otherwise false.
   */
  @Deprecated
  default boolean isCacheable(QueryType ignoredQuery, boolean ignoredWillMergeRunners)
  {
    return false;
  }

  /**
   * Returns whether the given query is cacheable or not.
   * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
   *
   * @param query            the query to be cached
   * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
   *                         called on the cached by-segment results
   * @param bySegment        segment level or result level cache
   *
   * @return true if the query is cacheable, otherwise false.
   */
  default boolean isCacheable(QueryType query, boolean willMergeRunners, boolean bySegment) {
    return isCacheable(query, willMergeRunners)
  }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please let me know what you think. The aim to the above suggestion is for existing extensions to keep working as it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I will take it.

{
return false;
}

/**
* Returns whether the given query is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
* called on the cached by-segment results
* @param bySegment segment level or result level cache
*
* @return true if the query is cacheable, otherwise false.
*/
boolean isCacheable(QueryType query, boolean willMergeRunners);
default boolean isCacheable(QueryType query, boolean willMergeRunners, boolean bySegment)
{
return isCacheable(query, willMergeRunners);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [CacheStrategy.isCacheable](1) should be avoided because it has been deprecated.
}

/**
* Computes the per-segment cache key for the given query. Because this is a per-segment cache key, it should only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(final Gro
private final List<DimensionSpec> dims = query.getDimensions();

@Override
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners)
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners, boolean bySegment)
{
return strategySelector.strategize(query).isCacheable(willMergeRunners);
return strategySelector.strategize(query).isCacheable(willMergeRunners, bySegment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ public interface GroupByStrategy
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
* called on the cached by-segment results. Can be used to distinguish if we are running on
* a broker or data node.
* @param bySegment segment level or result level cache
*
* @return true if this strategy is cacheable, otherwise false.
*/
boolean isCacheable(boolean willMergeRunners);
boolean isCacheable(boolean willMergeRunners, boolean bySegment);

/**
* Indicates if this query should undergo "mergeResults" or not. Checked by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public GroupByQueryResource prepareResource(GroupByQuery query)
}

@Override
public boolean isCacheable(boolean willMergeRunners)
public boolean isCacheable(boolean willMergeRunners, boolean bySegment)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ private static int countRequiredMergeBufferNumWithoutSubtotal(Query query, int f
}

@Override
public boolean isCacheable(boolean willMergeRunners)
public boolean isCacheable(boolean willMergeRunners, boolean bySegment)
{
return willMergeRunners;
//disable segment-level cache on borker,
//see PR https://github.com/apache/druid/issues/3820
return willMergeRunners || !bySegment;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> get
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
{
@Override
public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners)
public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery> getCacheStr
: Collections.emptyList();

@Override
public boolean isCacheable(SearchQuery query, boolean willMergeRunners)
public boolean isCacheable(SearchQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
{
@Override
public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners)
public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> get
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();

@Override
public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners)
public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrateg
);

@Override
public boolean isCacheable(TopNQuery query, boolean willMergeRunners)
public boolean isCacheable(TopNQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@
package org.apache.druid.query.groupby;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerTestHelper;
Expand Down Expand Up @@ -62,6 +70,9 @@
import org.apache.druid.query.groupby.having.OrHavingSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
Expand All @@ -74,6 +85,7 @@
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1136,4 +1148,68 @@ private static ResultRow makeRow(final GroupByQuery query, final String timestam
{
return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals);
}

@Test
public void testIsQueryCacheableOnGroupByStrategyV2()
{
final GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setGranularity(Granularities.DAY)
.setDimensions(new DefaultDimensionSpec("col", "dim"))
.setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.setAggregatorSpecs(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS)
.build();
final DruidProcessingConfig processingConfig = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return "processing-%s";
}
};
final GroupByQueryConfig queryConfig = new GroupByQueryConfig();
final Supplier<GroupByQueryConfig> queryConfigSupplier = Suppliers.ofInstance(queryConfig);
final Supplier<ByteBuffer> bufferSupplier =
() -> ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());

final NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByQueryEngine-bufferPool",
bufferSupplier
);
final BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(
bufferSupplier,
processingConfig.getNumMergeBuffers()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
queryConfigSupplier,
new GroupByStrategyV1(
queryConfigSupplier,
new GroupByQueryEngine(queryConfigSupplier, bufferPool),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
processingConfig,
queryConfigSupplier,
bufferPool,
mergeBufferPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(strategySelector);
CacheStrategy<ResultRow, Object, GroupByQuery> cacheStrategy = queryToolChest.getCacheStrategy(query);
Assert.assertTrue(
"result level cache on broker server for GroupByStrategyV2 should be enabled",
cacheStrategy.isCacheable(query, false, false)
);
Assert.assertFalse(
"segment level cache on broker server for GroupByStrategyV2 should be disabled",
cacheStrategy.isCacheable(query, false, true)
);
Assert.assertTrue(
"segment level cache on data server for GroupByStrategyV2 should be enabled",
cacheStrategy.isCacheable(query, true, true)
);
}
}
14 changes: 8 additions & 6 deletions server/src/main/java/org/apache/druid/client/CacheUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static <T> boolean isUseSegmentCache(
{
return cacheConfig.isUseCache()
&& query.context().isUseCache()
&& isQueryCacheable(query, cacheStrategy, cacheConfig, serverType);
&& isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, true);
}

/**
Expand All @@ -128,7 +128,7 @@ public static <T> boolean isPopulateSegmentCache(
ServerType serverType
)
{
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, true)
&& query.context().isPopulateCache()
&& cacheConfig.isPopulateCache();
}
Expand All @@ -148,7 +148,7 @@ public static <T> boolean isUseResultCache(
ServerType serverType
)
{
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, false)
&& query.context().isUseResultLevelCache()
&& cacheConfig.isUseResultLevelCache();
}
Expand All @@ -168,7 +168,7 @@ public static <T> boolean isPopulateResultCache(
ServerType serverType
)
{
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, false)
&& query.context().isPopulateResultLevelCache()
&& cacheConfig.isPopulateResultLevelCache();
}
Expand All @@ -181,16 +181,18 @@ public static <T> boolean isPopulateResultCache(
* @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query
* @param cacheConfig current active cache config
* @param serverType BROKER or DATA
* @param bySegment segement level or result-level cache
*/
static <T> boolean isQueryCacheable(
final Query<T> query,
@Nullable final CacheStrategy<T, Object, Query<T>> cacheStrategy,
final CacheConfig cacheConfig,
final ServerType serverType
final ServerType serverType,
final boolean bySegment
)
{
return cacheStrategy != null
&& cacheStrategy.isCacheable(query, serverType.willMergeRunners())
&& cacheStrategy.isCacheable(query, serverType.willMergeRunners(), bySegment)
&& cacheConfig.isQueryCacheable(query)
&& query.getDataSource().isCacheable(serverType == ServerType.BROKER);
}
Expand Down
Loading