diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java index 7a300c0c63dd..8b106a6f2b7a 100644 --- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java @@ -51,6 +51,16 @@ public interface CacheStrategy> */ byte[] computeCacheKey(QueryType query); + /** + * Computes the result level cache key for the given query. + * Some implementations may include query parameters that might not be used in {@code computeCacheKey} for same query + * + * @param query the query to be cached + * + * @return the result level cache key + */ + byte[] computeResultLevelCacheKey(QueryType query); + /** * Returns the class type of what is used in the cache * diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index a49b48ae216a..c94d427b1bd2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -481,6 +481,28 @@ public byte[] computeCacheKey(GroupByQuery query) .build(); } + @Override + public byte[] computeResultLevelCacheKey(GroupByQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY) + .appendByte(CACHE_STRATEGY_VERSION) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheables(query.getDimensions()) + .appendCacheable(query.getVirtualColumns()) + .appendCacheable(query.getHavingSpec()) + .appendCacheable(query.getLimitSpec()) + .appendCacheables(query.getPostAggregatorSpecs()); + + if (query.getSubtotalsSpec() != null && !query.getSubtotalsSpec().isEmpty()) { + for (List subTotalSpec : query.getSubtotalsSpec()) { + builder.appendStrings(subTotalSpec); + } + } + return builder.build(); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java index 1ac4f3140a30..8450589f814a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java @@ -20,6 +20,7 @@ package org.apache.druid.query.groupby.having; import org.apache.druid.data.input.Row; +import org.apache.druid.query.cache.CacheKeyBuilder; /** * A "having" spec that always evaluates to true @@ -31,4 +32,10 @@ public boolean eval(Row row) { return true; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_ALWAYS).build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java index 55ca3ee31a17..f035db3dbde2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; import java.util.List; @@ -110,4 +111,11 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_AND) + .appendCacheables(havingSpecs).build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java index 00155bab39ba..cf916bf02556 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java @@ -25,6 +25,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.transform.RowFunction; @@ -170,6 +171,15 @@ public RowFunction getRowFunction() return new TransformSpec(filter, transforms).toTransformer(rowSignature); } + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_FILTER) + .appendCacheable(dimFilter) + .appendByte((byte) (isFinalize() ? 1 : 0)) + .build(); + } + private static class RowAsInputRow implements InputRow { private final Row row; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java index 90041167693a..4dfc6e63c607 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.druid.data.input.Row; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.extraction.IdentityExtractionFn; @@ -117,4 +118,14 @@ public String toString() ", extractionFn=" + extractionFn + '}'; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_SELECTOR) + .appendString(dimension) + .appendString(value) + .appendByteArray(extractionFn == null ? new byte[0] : extractionFn.getCacheKey()) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java index ac8d64e99a10..00c471b200b9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Map; @@ -123,4 +125,13 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_EQUAL) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java index 842dbcde69a5..aa276a9466ca 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Map; @@ -119,4 +121,13 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_GREATER_THAN) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java index 19ad071365f3..e75641f54199 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.Cacheable; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.column.ValueType; @@ -44,7 +45,7 @@ @JsonSubTypes.Type(name = "always", value = AlwaysHavingSpec.class), @JsonSubTypes.Type(name = "filter", value = DimFilterHavingSpec.class) }) -public interface HavingSpec +public interface HavingSpec extends Cacheable { // Atoms for easy combination, but for now they are mostly useful // for testing. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java new file mode 100644 index 000000000000..e1227e7f0437 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.having; + +public class HavingSpecUtil +{ + static final byte CACHE_TYPE_ID_ALWAYS = 0x0; + static final byte CACHE_TYPE_ID_AND = 0x1; + static final byte CACHE_TYPE_ID_DIM_SELECTOR = 0x2; + static final byte CACHE_TYPE_ID_DIM_FILTER = 0x3; + static final byte CACHE_TYPE_ID_EQUAL = 0x4; + static final byte CACHE_TYPE_ID_GREATER_THAN = 0x5; + static final byte CACHE_TYPE_ID_LESS_THAN = 0x6; + static final byte CACHE_TYPE_ID_NEVER = 0x7; + static final byte CACHE_TYPE_ID_NOT = 0x8; + static final byte CACHE_TYPE_ID_OR = 0x9; + static final byte CACHE_TYPE_ID_COUNTING = 0xA; +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java index 99917cb245d9..3c937cfba9fd 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Map; @@ -117,4 +119,13 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_LESS_THAN) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java index 55ab48b505fe..fa2c15d4bc48 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java @@ -20,6 +20,7 @@ package org.apache.druid.query.groupby.having; import org.apache.druid.data.input.Row; +import org.apache.druid.query.cache.CacheKeyBuilder; /** * A "having" spec that always evaluates to false @@ -31,4 +32,10 @@ public boolean eval(Row row) { return false; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_NEVER).build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java index 315d05932ea8..81d7a63eaf87 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; import java.util.Map; @@ -98,4 +99,12 @@ public int hashCode() { return havingSpec != null ? havingSpec.hashCode() : 0; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_NOT) + .appendCacheable(havingSpec) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java index 7425b9024764..e6483490d7e0 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; import java.util.List; @@ -110,4 +111,12 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_OR) + .appendCacheables(havingSpecs) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 4dc50f56887f..2813000ab760 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -50,6 +50,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.MetricManipulationFn; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -73,6 +74,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest MERGE_TRANSFORM_FN = new Function() { @Override @@ -194,6 +196,16 @@ public byte[] computeCacheKey(SegmentMetadataQuery query) .array(); } + @Override + public byte[] computeResultLevelCacheKey(SegmentMetadataQuery query) + { + // need to include query "merge" and "lenientAggregatorMerge" for result level cache key + return new CacheKeyBuilder(SEGMENT_METADATA_QUERY).appendByteArray(computeCacheKey(query)) + .appendBoolean(query.isMerge()) + .appendBoolean(query.isLenientAggregatorMerge()) + .build(); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java index 61f77e356962..8f35b2541bd8 100644 --- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java @@ -201,6 +201,12 @@ public byte[] computeCacheKey(SearchQuery query) return queryCacheKey.array(); } + @Override + public byte[] computeResultLevelCacheKey(SearchQuery query) + { + return computeCacheKey(query); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java index d551266afa6a..45ebb95a52dd 100644 --- a/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java @@ -236,6 +236,12 @@ public byte[] computeCacheKey(SelectQuery query) return queryCacheKey.array(); } + @Override + public byte[] computeResultLevelCacheKey(SelectQuery query) + { + return computeCacheKey(query); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 351903799f85..9fc0e88e7e5b 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -154,6 +154,12 @@ public byte[] computeCacheKey(TimeBoundaryQuery query) .array(); } + @Override + public byte[] computeResultLevelCacheKey(TimeBoundaryQuery query) + { + return computeCacheKey(query); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 479fbf0c2d1b..f8f5aa0c4c7e 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -274,6 +274,22 @@ public byte[] computeCacheKey(TimeseriesQuery query) .build(); } + @Override + public byte[] computeResultLevelCacheKey(TimeseriesQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(TIMESERIES_QUERY) + .appendBoolean(query.isDescending()) + .appendBoolean(query.isSkipEmptyBuckets()) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimensionsFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheable(query.getVirtualColumns()) + .appendCacheables(query.getPostAggregatorSpecs()) + .appendInt(query.getLimit()) + .appendBoolean(query.isGrandTotal()); + return builder.build(); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 4774c5c9b1a2..2c3bd2b2f758 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -326,6 +326,21 @@ public byte[] computeCacheKey(TopNQuery query) return builder.build(); } + @Override + public byte[] computeResultLevelCacheKey(TopNQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(TOPN_QUERY) + .appendCacheable(query.getDimensionSpec()) + .appendCacheable(query.getTopNMetricSpec()) + .appendInt(query.getThreshold()) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimensionsFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheable(query.getVirtualColumns()) + .appendCacheables(query.getPostAggregatorSpecs()); + return builder.build(); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java new file mode 100644 index 000000000000..2bad8f82f7f4 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.AndDimFilter; +import org.apache.druid.query.filter.BoundDimFilter; +import org.apache.druid.query.filter.OrDimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.having.AndHavingSpec; +import org.apache.druid.query.groupby.having.DimFilterHavingSpec; +import org.apache.druid.query.groupby.having.EqualToHavingSpec; +import org.apache.druid.query.groupby.having.GreaterThanHavingSpec; +import org.apache.druid.query.groupby.having.HavingSpec; +import org.apache.druid.query.groupby.having.LessThanHavingSpec; +import org.apache.druid.query.groupby.having.NotHavingSpec; +import org.apache.druid.query.groupby.having.OrHavingSpec; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.ordering.StringComparators; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class GroupByQueryQueryToolChestTest +{ + + @Test + public void testResultLevelCacheKeyWithPostAggregate() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias - 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithLimitSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias - 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithHavingSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.uniqueMetric, 8)) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.uniqueMetric, 10)) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithAndHavingSpec() + { + final List havings = Arrays.asList( + new GreaterThanHavingSpec("agg", Double.valueOf(1.3)), + new OrHavingSpec( + Arrays.asList( + new LessThanHavingSpec("lessAgg", Long.valueOf(1L)), + new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2))) + ) + ) + ); + final HavingSpec andHavingSpec = new AndHavingSpec(havings); + + final List havings2 = Arrays.asList( + new GreaterThanHavingSpec("agg", Double.valueOf(13.0)), + new OrHavingSpec( + Arrays.asList( + new LessThanHavingSpec("lessAgg", Long.valueOf(1L)), + new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(22))) + ) + ) + ); + final HavingSpec andHavingSpec2 = new AndHavingSpec(havings2); + + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(andHavingSpec) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(andHavingSpec2) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithHavingDimFilterHavingSpec() + { + final DimFilterHavingSpec havingSpec1 = new DimFilterHavingSpec( + new AndDimFilter( + ImmutableList.of( + new OrDimFilter( + ImmutableList.of( + new BoundDimFilter("rows", "2", null, true, false, null, null, StringComparators.NUMERIC), + new SelectorDimFilter("idx", "217", null) + ) + ), + new SelectorDimFilter("__time", String.valueOf(DateTimes.of("2011-04-01").getMillis()), null) + ) + ), + null + ); + + final DimFilterHavingSpec havingSpec2 = new DimFilterHavingSpec( + new AndDimFilter( + ImmutableList.of( + new OrDimFilter( + ImmutableList.of( + new BoundDimFilter("rows", "2", null, true, false, null, null, StringComparators.NUMERIC), + new SelectorDimFilter("idx", "317", null) + ) + ), + new SelectorDimFilter("__time", String.valueOf(DateTimes.of("2011-04-01").getMillis()), null) + ) + ), + null + ); + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(havingSpec1) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(havingSpec2) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithSubTotalsSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", "market") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of("market"), + ImmutableList.of() + )) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", "market") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of() + )) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 96e369966dbb..cc25be424a08 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -5368,6 +5368,12 @@ public void testSubqueryWithPostAggregatorsAndHaving() .setHavingSpec( new BaseHavingSpec() { + @Override + public byte[] getCacheKey() + { + return new byte[0]; + } + @Override public boolean eval(Row row) { @@ -5629,6 +5635,12 @@ public void testSubqueryWithMultiColumnAggregators() .setHavingSpec( new BaseHavingSpec() { + @Override + public byte[] getCacheKey() + { + return new byte[0]; + } + @Override public boolean eval(Row row) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java index ec44b7f191df..a0e1d7457117 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java @@ -25,6 +25,8 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.junit.Test; import java.util.ArrayList; @@ -233,6 +235,15 @@ public boolean eval(Row row) counter.incrementAndGet(); return value; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_COUNTING) + .appendByte((byte) (value ? 1 : 0)) + .appendByteArray(StringUtils.toUtf8(String.valueOf(counter))) + .build(); + } } @Test diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 0742e1e845ad..6d07c59d269e 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -32,7 +32,9 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.VirtualColumns; @@ -158,4 +160,164 @@ public void testCacheKey() ) ); } + + @Test + public void testResultLevelCacheKey() + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .build(); + + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .build(); + + Assert.assertTrue( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) + ) + ); + Assert.assertFalse( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeResultLevelCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeResultLevelCacheKey(query2) + ) + ); + } + + @Test + public void testResultLevelCacheKeyWithGrandTotal() + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true)) + .build(); + + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true)) + .build(); + + Assert.assertTrue( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) + ) + ); + Assert.assertFalse( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeResultLevelCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeResultLevelCacheKey(query2) + ) + ); + } } diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index e263c1eef2b8..cede671e6a47 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -36,6 +36,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TestQueryRunners; import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; @@ -128,6 +129,92 @@ public void testComputeCacheKeyWithDifferentPostAgg() ).getCacheStrategy(query2); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2))); + } + + @Test + public void testComputeResultLevelCacheKeyWithDifferentPostAgg() throws IOException + { + final TopNQuery query1 = new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new LegacyTopNMetricSpec("metric1"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01T18:00:00/2015-01-02T18:00:00"))), + null, + Granularities.ALL, + ImmutableList.of( + new LongSumAggregatorFactory("metric1", "metric1"), + new LongSumAggregatorFactory("metric2", "metric2") + ), + ImmutableList.of( + new ArithmeticPostAggregator( + "post1", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + "metric1", + "metric1" + ), + new FieldAccessPostAggregator( + "metric2", + "metric2" + ) + ) + ) + ), + null + ); + + final TopNQuery query2 = new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new LegacyTopNMetricSpec("metric1"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01T18:00:00/2015-01-02T18:00:00"))), + null, + Granularities.ALL, + ImmutableList.of( + new LongSumAggregatorFactory("metric1", "metric1"), + new LongSumAggregatorFactory("metric2", "metric2") + ), + ImmutableList.of( + new ArithmeticPostAggregator( + "post2", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + "metric1", + "metric1" + ), + new FieldAccessPostAggregator( + "metric2", + "metric2" + ) + ) + ) + ), + null + ); + + final CacheStrategy, Object, TopNQuery> strategy1 = new TopNQueryQueryToolChest( + null, + null + ).getCacheStrategy(query1); + + final CacheStrategy, Object, TopNQuery> strategy2 = new TopNQueryQueryToolChest( + null, + null + ).getCacheStrategy(query2); + + //segment level cache key excludes postaggregates in topn + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy1.computeResultLevelCacheKey(query1))); + Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2))); } @Test diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index eaa08d656c5a..1facccdf0fea 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -385,6 +385,10 @@ private String computeCurrentEtag(final Set segments, @Nullable break; } hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8); + // it is important to add the "query interval" as part ETag calculation + // to have result level cache work correctly for queries with different + // intervals covering the same set of segments + hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8); } if (hasOnlyHistoricalSegments) { diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 3efd9469dced..6a303b8a6df7 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -80,7 +80,7 @@ public Sequence run(QueryPlus queryPlus, Map responseContext) { if (useResultCache || populateResultCache) { - final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeCacheKey(query)); + final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query)); final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr); String existingResultSetId = extractEtagFromResults(cachedResultSet); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index f81d64f4799b..ca3a309e687e 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -3129,7 +3129,56 @@ public void testIfNoneMatch() Map responseContext = new HashMap<>(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); - Assert.assertEquals("Z/eS4rQz5v477iq7Aashr6JPZa0=", responseContext.get("ETag")); + Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get("ETag")); + } + + @Test + public void testEtagforDifferentQueryInterval() + { + final Interval interval = Intervals.of("2016-01-01/2016-01-02"); + final Interval queryInterval = Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00"); + final Interval queryInterval2 = Intervals.of("2016-01-01T18:00:00/2016-01-02T18:00:00"); + final DataSegment dataSegment = new DataSegment( + "dataSource", + interval, + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum"), + NoneShardSpec.instance(), + 9, + 12334 + ); + final ServerSelector selector = new ServerSelector( + dataSegment, + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + ); + selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment); + timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector)); + + final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval))) + .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .build(); + + final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval2))) + .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .build(); + + + final Map responseContext = new HashMap<>(); + + getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); + final Object etag1 = responseContext.get("ETag"); + getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext); + final Object etag2 = responseContext.get("ETag"); + Assert.assertNotEquals(etag1, etag2); } @SuppressWarnings("unchecked")