From e73fc63dad56ec3b1e2e3b9a295ff64b2fa335d1 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 22 Mar 2019 11:04:10 -0700 Subject: [PATCH 1/8] Add SegmentDescriptor interval in the hash while calculating Etag --- .../druid/client/CachingClusteredClient.java | 1 + .../client/CachingClusteredClientTest.java | 51 ++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 253aa0f68062..5695000edee9 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -385,6 +385,7 @@ private String computeCurrentEtag(final Set segments, @Nullable break; } hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8); + hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8); } if (hasOnlyHistoricalSegments) { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index f81d64f4799b..ca3a309e687e 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -3129,7 +3129,56 @@ public void testIfNoneMatch() Map responseContext = new HashMap<>(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); - Assert.assertEquals("Z/eS4rQz5v477iq7Aashr6JPZa0=", responseContext.get("ETag")); + Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get("ETag")); + } + + @Test + public void testEtagforDifferentQueryInterval() + { + final Interval interval = Intervals.of("2016-01-01/2016-01-02"); + final Interval queryInterval = Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00"); + final Interval queryInterval2 = Intervals.of("2016-01-01T18:00:00/2016-01-02T18:00:00"); + final DataSegment dataSegment = new DataSegment( + "dataSource", + interval, + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum"), + NoneShardSpec.instance(), + 9, + 12334 + ); + final ServerSelector selector = new ServerSelector( + dataSegment, + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + ); + selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment); + timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector)); + + final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval))) + .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .build(); + + final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval2))) + .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .build(); + + + final Map responseContext = new HashMap<>(); + + getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); + final Object etag1 = responseContext.get("ETag"); + getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext); + final Object etag2 = responseContext.get("ETag"); + Assert.assertNotEquals(etag1, etag2); } @SuppressWarnings("unchecked") From 7b3af143c160a4d9389bc62186c769d6f23b6d55 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 26 Mar 2019 13:45:51 -0700 Subject: [PATCH 2/8] Add computeResultLevelCacheKey to CacheStrategy Make HavingSpec cacheable and implement getCacheKey for subclasses Add unit tests for computeResultLevelCacheKey --- .../org/apache/druid/query/CacheStrategy.java | 10 + .../groupby/GroupByQueryQueryToolChest.java | 25 ++ .../groupby/having/AlwaysHavingSpec.java | 6 + .../query/groupby/having/AndHavingSpec.java | 20 + .../groupby/having/DimFilterHavingSpec.java | 45 ++ .../having/DimensionSelectorHavingSpec.java | 18 + .../groupby/having/EqualToHavingSpec.java | 36 ++ .../groupby/having/GreaterThanHavingSpec.java | 36 ++ .../query/groupby/having/HavingSpec.java | 3 +- .../query/groupby/having/HavingSpecUtil.java | 35 ++ .../groupby/having/LessThanHavingSpec.java | 36 ++ .../query/groupby/having/NeverHavingSpec.java | 6 + .../query/groupby/having/NotHavingSpec.java | 12 + .../query/groupby/having/OrHavingSpec.java | 20 + .../SegmentMetadataQueryQueryToolChest.java | 6 + .../search/SearchQueryQueryToolChest.java | 6 + .../select/SelectQueryQueryToolChest.java | 6 + .../TimeBoundaryQueryQueryToolChest.java | 6 + .../TimeseriesQueryQueryToolChest.java | 18 + .../query/topn/TopNQueryQueryToolChest.java | 19 + .../GroupByQueryQueryToolChestTest.java | 395 ++++++++++++++++++ .../query/groupby/GroupByQueryRunnerTest.java | 12 + .../query/groupby/having/HavingSpecTest.java | 13 + .../TimeseriesQueryQueryToolChestTest.java | 162 +++++++ .../topn/TopNQueryQueryToolChestTest.java | 87 ++++ .../query/ResultLevelCachingQueryRunner.java | 2 +- 26 files changed, 1038 insertions(+), 2 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java create mode 100644 processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java index 7a300c0c63dd..8b106a6f2b7a 100644 --- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java @@ -51,6 +51,16 @@ public interface CacheStrategy> */ byte[] computeCacheKey(QueryType query); + /** + * Computes the result level cache key for the given query. + * Some implementations may include query parameters that might not be used in {@code computeCacheKey} for same query + * + * @param query the query to be cached + * + * @return the result level cache key + */ + byte[] computeResultLevelCacheKey(QueryType query); + /** * Returns the class type of what is used in the cache * diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index a49b48ae216a..2a2505d5ab72 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -481,6 +481,31 @@ public byte[] computeCacheKey(GroupByQuery query) .build(); } + @Override + public byte[] computeResultLevelCacheKey(GroupByQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY) + .appendByte(CACHE_STRATEGY_VERSION) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheables(query.getDimensions()) + .appendCacheable(query.getVirtualColumns()) + .appendCacheable(query.getHavingSpec()) + .appendCacheable(query.getLimitSpec()); + + if (!query.getPostAggregatorSpecs().isEmpty()) { + builder.appendCacheables(query.getPostAggregatorSpecs()); + } + if (query.getSubtotalsSpec() != null && !query.getSubtotalsSpec().isEmpty()) { + for (List subTotalSpec : query.getSubtotalsSpec()) { + builder.appendStrings(subTotalSpec); + } + } + + return builder.build(); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java index 1ac4f3140a30..ed70d182832c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java @@ -31,4 +31,10 @@ public boolean eval(Row row) { return true; } + + @Override + public byte[] getCacheKey() + { + return new byte[]{HavingSpecUtil.CACHE_TYPE_ID_ALWAYS}; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java index 55ca3ee31a17..69e5f2829a57 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java @@ -26,6 +26,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.column.ValueType; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -110,4 +111,23 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + final byte[][] havingBytes = new byte[havingSpecs.size()][]; + int havingBytesSize = 0; + int index = 0; + for (HavingSpec spec : havingSpecs) { + havingBytes[index] = spec.getCacheKey(); + havingBytesSize += havingBytes[index].length; + ++index; + } + ByteBuffer buffer = ByteBuffer.allocate(1 + havingBytesSize) + .put(HavingSpecUtil.CACHE_TYPE_ID_AND); + for (byte[] havingByte : havingBytes) { + buffer.put(havingByte); + } + return buffer.array(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java index 00155bab39ba..4af23d504735 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.column.ValueType; @@ -33,6 +35,8 @@ import org.apache.druid.segment.transform.Transformer; import org.joda.time.DateTime; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -170,6 +174,47 @@ public RowFunction getRowFunction() return new TransformSpec(filter, transforms).toTransformer(rowSignature); } + @Override + public byte[] getCacheKey() + { + try { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + outputStream.write(HavingSpecUtil.CACHE_TYPE_ID_DIM_FILTER); + outputStream.write(isFinalize() ? 1 : 0); + outputStream.write(dimFilter.getCacheKey()); + + for (Map.Entry entry : aggregators.entrySet()) { + final String key = entry.getKey(); + final AggregatorFactory val = entry.getValue(); + if (!Strings.isNullOrEmpty(key)) { + outputStream.write(StringUtils.toUtf8(key)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + byte[] aggregatorBytes = val.getCacheKey(); + outputStream.write(aggregatorBytes); + } + + for (Map.Entry entry : rowSignature.entrySet()) { + final String key = entry.getKey(); + final String val = entry.getValue().toString(); + if (!Strings.isNullOrEmpty(key)) { + outputStream.write(StringUtils.toUtf8(key)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + if (!Strings.isNullOrEmpty(val)) { + outputStream.write(StringUtils.toUtf8(val)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + } + + return outputStream.toByteArray(); + } + catch (IOException ex) { + // If ByteArrayOutputStream.write has problems, that is a very bad thing + throw new RuntimeException(ex); + } + } + private static class RowAsInputRow implements InputRow { private final Row row; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java index 90041167693a..8c9d16bdc45b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java @@ -24,9 +24,11 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.extraction.IdentityExtractionFn; +import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -117,4 +119,20 @@ public String toString() ", extractionFn=" + extractionFn + '}'; } + + @Override + public byte[] getCacheKey() + { + byte[] dimensionBytes = StringUtils.toUtf8(dimension); + byte[] valueBytes = value == null ? new byte[]{} : StringUtils.toUtf8(value); + byte[] extractionFnBytes = extractionFn.getCacheKey(); + return ByteBuffer.allocate(3 + dimensionBytes.length + valueBytes.length + extractionFnBytes.length) + .put(HavingSpecUtil.CACHE_TYPE_ID_DIM_SELECTOR) + .put(dimensionBytes) + .put(HavingSpecUtil.STRING_SEPARATOR) + .put(valueBytes) + .put(HavingSpecUtil.STRING_SEPARATOR) + .put(extractionFnBytes) + .array(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java index ac8d64e99a10..d5b30d26078e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java @@ -21,9 +21,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.Map; /** @@ -123,4 +127,36 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + try { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + outputStream.write(HavingSpecUtil.CACHE_TYPE_ID_EQUAL); + if (!Strings.isNullOrEmpty(aggregationName)) { + outputStream.write(StringUtils.toUtf8(aggregationName)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + outputStream.write(StringUtils.toUtf8(String.valueOf(value))); + if (aggregators == null) { + return outputStream.toByteArray(); + } + for (Map.Entry entry : aggregators.entrySet()) { + final String key = entry.getKey(); + final AggregatorFactory val = entry.getValue(); + if (!Strings.isNullOrEmpty(key)) { + outputStream.write(StringUtils.toUtf8(key)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + byte[] aggregatorBytes = val.getCacheKey(); + outputStream.write(aggregatorBytes); + } + return outputStream.toByteArray(); + } + catch (IOException ex) { + // If ByteArrayOutputStream.write has problems, that is a very bad thing + throw new RuntimeException(ex); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java index 842dbcde69a5..53974aee53d9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -21,9 +21,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.Map; /** @@ -119,4 +123,36 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + try { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + outputStream.write(HavingSpecUtil.CACHE_TYPE_ID_GREATER_THAN); + if (!Strings.isNullOrEmpty(aggregationName)) { + outputStream.write(StringUtils.toUtf8(aggregationName)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + outputStream.write(StringUtils.toUtf8(String.valueOf(value))); + if (aggregators == null) { + return outputStream.toByteArray(); + } + for (Map.Entry entry : aggregators.entrySet()) { + final String key = entry.getKey(); + final AggregatorFactory val = entry.getValue(); + if (!Strings.isNullOrEmpty(key)) { + outputStream.write(StringUtils.toUtf8(key)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + byte[] aggregatorBytes = val.getCacheKey(); + outputStream.write(aggregatorBytes); + } + return outputStream.toByteArray(); + } + catch (IOException ex) { + // If ByteArrayOutputStream.write has problems, that is a very bad thing + throw new RuntimeException(ex); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java index 19ad071365f3..e75641f54199 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.Cacheable; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.column.ValueType; @@ -44,7 +45,7 @@ @JsonSubTypes.Type(name = "always", value = AlwaysHavingSpec.class), @JsonSubTypes.Type(name = "filter", value = DimFilterHavingSpec.class) }) -public interface HavingSpec +public interface HavingSpec extends Cacheable { // Atoms for easy combination, but for now they are mostly useful // for testing. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java new file mode 100644 index 000000000000..96f47e8798fa --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.having; + +public class HavingSpecUtil +{ + static final byte CACHE_TYPE_ID_ALWAYS = 0x0; + static final byte CACHE_TYPE_ID_AND = 0x1; + static final byte CACHE_TYPE_ID_DIM_SELECTOR = 0x2; + static final byte CACHE_TYPE_ID_DIM_FILTER = 0x3; + static final byte CACHE_TYPE_ID_EQUAL = 0x4; + static final byte CACHE_TYPE_ID_GREATER_THAN = 0x5; + static final byte CACHE_TYPE_ID_LESS_THAN = 0x6; + static final byte CACHE_TYPE_ID_NEVER = 0x7; + static final byte CACHE_TYPE_ID_NOT = 0x8; + static final byte CACHE_TYPE_ID_OR = 0x9; + static final byte STRING_SEPARATOR = (byte) 0xFF; +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java index 99917cb245d9..147317fc4dd6 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java @@ -20,9 +20,13 @@ package org.apache.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.Map; /** @@ -117,4 +121,36 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + try { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + outputStream.write(HavingSpecUtil.CACHE_TYPE_ID_LESS_THAN); + if (!Strings.isNullOrEmpty(aggregationName)) { + outputStream.write(StringUtils.toUtf8(aggregationName)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + outputStream.write(StringUtils.toUtf8(String.valueOf(value))); + if (aggregators == null) { + return outputStream.toByteArray(); + } + for (Map.Entry entry : aggregators.entrySet()) { + final String key = entry.getKey(); + final AggregatorFactory val = entry.getValue(); + if (!Strings.isNullOrEmpty(key)) { + outputStream.write(StringUtils.toUtf8(key)); + } + outputStream.write(HavingSpecUtil.STRING_SEPARATOR); + byte[] aggregatorBytes = val.getCacheKey(); + outputStream.write(aggregatorBytes); + } + return outputStream.toByteArray(); + } + catch (IOException ex) { + // If ByteArrayOutputStream.write has problems, that is a very bad thing + throw new RuntimeException(ex); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java index 55ab48b505fe..42bb97ddb461 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java @@ -31,4 +31,10 @@ public boolean eval(Row row) { return false; } + + @Override + public byte[] getCacheKey() + { + return new byte[]{HavingSpecUtil.CACHE_TYPE_ID_NEVER}; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java index 315d05932ea8..af87c25a6916 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java @@ -25,6 +25,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.column.ValueType; +import java.nio.ByteBuffer; import java.util.Map; /** @@ -98,4 +99,15 @@ public int hashCode() { return havingSpec != null ? havingSpec.hashCode() : 0; } + + @Override + public byte[] getCacheKey() + { + byte[] havingSpecBytes = havingSpec.getCacheKey(); + ByteBuffer buffer = ByteBuffer.allocate(1 + havingSpecBytes.length) + .put(HavingSpecUtil.CACHE_TYPE_ID_NOT) + .put(havingSpecBytes); + + return buffer.array(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java index 7425b9024764..9a1405087f08 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java @@ -26,6 +26,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.column.ValueType; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -110,4 +111,23 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + final byte[][] havingBytes = new byte[havingSpecs.size()][]; + int havingBytesSize = 0; + int index = 0; + for (HavingSpec spec : havingSpecs) { + havingBytes[index] = spec.getCacheKey(); + havingBytesSize += havingBytes[index].length; + ++index; + } + ByteBuffer buffer = ByteBuffer.allocate(1 + havingBytesSize) + .put(HavingSpecUtil.CACHE_TYPE_ID_OR); + for (byte[] havingByte : havingBytes) { + buffer.put(havingByte); + } + return buffer.array(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 4dc50f56887f..52c723e2f9a3 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -194,6 +194,12 @@ public byte[] computeCacheKey(SegmentMetadataQuery query) .array(); } + @Override + public byte[] computeResultLevelCacheKey(SegmentMetadataQuery query) + { + return computeCacheKey(query); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java index 61f77e356962..8f35b2541bd8 100644 --- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java @@ -201,6 +201,12 @@ public byte[] computeCacheKey(SearchQuery query) return queryCacheKey.array(); } + @Override + public byte[] computeResultLevelCacheKey(SearchQuery query) + { + return computeCacheKey(query); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java index d551266afa6a..45ebb95a52dd 100644 --- a/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java @@ -236,6 +236,12 @@ public byte[] computeCacheKey(SelectQuery query) return queryCacheKey.array(); } + @Override + public byte[] computeResultLevelCacheKey(SelectQuery query) + { + return computeCacheKey(query); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 351903799f85..9fc0e88e7e5b 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -154,6 +154,12 @@ public byte[] computeCacheKey(TimeBoundaryQuery query) .array(); } + @Override + public byte[] computeResultLevelCacheKey(TimeBoundaryQuery query) + { + return computeCacheKey(query); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 479fbf0c2d1b..631f65773a95 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -274,6 +274,24 @@ public byte[] computeCacheKey(TimeseriesQuery query) .build(); } + @Override + public byte[] computeResultLevelCacheKey(TimeseriesQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(TIMESERIES_QUERY) + .appendBoolean(query.isDescending()) + .appendBoolean(query.isSkipEmptyBuckets()) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimensionsFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheable(query.getVirtualColumns()) + .appendCacheables(query.getPostAggregatorSpecs()) + .appendInt(query.getLimit()); + if (query.isGrandTotal()) { + builder.appendBoolean(query.getContextValue(TimeseriesQuery.CTX_GRAND_TOTAL)); + } + return builder.build(); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 4774c5c9b1a2..b20b08dafe1c 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -326,6 +326,25 @@ public byte[] computeCacheKey(TopNQuery query) return builder.build(); } + @Override + public byte[] computeResultLevelCacheKey(TopNQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(TOPN_QUERY) + .appendCacheable(query.getDimensionSpec()) + .appendCacheable(query.getTopNMetricSpec()) + .appendInt(query.getThreshold()) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimensionsFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheable(query.getVirtualColumns()); + + final List postAggregators = query.getPostAggregatorSpecs(); + if (!postAggregators.isEmpty()) { + builder.appendCacheables(query.getPostAggregatorSpecs()); + } + return builder.build(); + } + @Override public TypeReference getCacheObjectClazz() { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java new file mode 100644 index 000000000000..026f643958b1 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.AndDimFilter; +import org.apache.druid.query.filter.BoundDimFilter; +import org.apache.druid.query.filter.OrDimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.having.DimFilterHavingSpec; +import org.apache.druid.query.groupby.having.GreaterThanHavingSpec; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.ordering.StringComparators; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class GroupByQueryQueryToolChestTest +{ + + @Test + public void testResultLevelCacheKeyWithPostAggregate() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias - 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithLimitSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias - 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithHavingSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.uniqueMetric, 8)) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.uniqueMetric, 10)) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithHavingDimFilterHavingSpec() + { + final DimFilterHavingSpec havingSpec1 = new DimFilterHavingSpec( + new AndDimFilter( + ImmutableList.of( + new OrDimFilter( + ImmutableList.of( + new BoundDimFilter("rows", "2", null, true, false, null, null, StringComparators.NUMERIC), + new SelectorDimFilter("idx", "217", null) + ) + ), + new SelectorDimFilter("__time", String.valueOf(DateTimes.of("2011-04-01").getMillis()), null) + ) + ), + null + ); + + final DimFilterHavingSpec havingSpec2 = new DimFilterHavingSpec( + new AndDimFilter( + ImmutableList.of( + new OrDimFilter( + ImmutableList.of( + new BoundDimFilter("rows", "2", null, true, false, null, null, StringComparators.NUMERIC), + new SelectorDimFilter("idx", "317", null) + ) + ), + new SelectorDimFilter("__time", String.valueOf(DateTimes.of("2011-04-01").getMillis()), null) + ) + ), + null + ); + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(havingSpec1) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(havingSpec2) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithSubTotalsSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", "market") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of("market"), + ImmutableList.of() + )) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", "market") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of() + )) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 96e369966dbb..cc25be424a08 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -5368,6 +5368,12 @@ public void testSubqueryWithPostAggregatorsAndHaving() .setHavingSpec( new BaseHavingSpec() { + @Override + public byte[] getCacheKey() + { + return new byte[0]; + } + @Override public boolean eval(Row row) { @@ -5629,6 +5635,12 @@ public void testSubqueryWithMultiColumnAggregators() .setHavingSpec( new BaseHavingSpec() { + @Override + public byte[] getCacheKey() + { + return new byte[0]; + } + @Override public boolean eval(Row row) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java index ec44b7f191df..66412161fef6 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java @@ -25,8 +25,10 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; import org.junit.Test; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -233,6 +235,17 @@ public boolean eval(Row row) counter.incrementAndGet(); return value; } + + @Override + public byte[] getCacheKey() + { + byte valueByte = (byte) (value ? 1 : 0); + byte[] counterBytes = StringUtils.toUtf8(String.valueOf(counter)); + return ByteBuffer.allocate(1 + counterBytes.length) + .put(valueByte) + .put(counterBytes) + .array(); + } } @Test diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 0742e1e845ad..6d07c59d269e 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -32,7 +32,9 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.VirtualColumns; @@ -158,4 +160,164 @@ public void testCacheKey() ) ); } + + @Test + public void testResultLevelCacheKey() + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .build(); + + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .build(); + + Assert.assertTrue( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) + ) + ); + Assert.assertFalse( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeResultLevelCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeResultLevelCacheKey(query2) + ) + ); + } + + @Test + public void testResultLevelCacheKeyWithGrandTotal() + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true)) + .build(); + + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true)) + .build(); + + Assert.assertTrue( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) + ) + ); + Assert.assertFalse( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeResultLevelCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeResultLevelCacheKey(query2) + ) + ); + } } diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index e263c1eef2b8..cede671e6a47 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -36,6 +36,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TestQueryRunners; import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; @@ -128,6 +129,92 @@ public void testComputeCacheKeyWithDifferentPostAgg() ).getCacheStrategy(query2); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2))); + } + + @Test + public void testComputeResultLevelCacheKeyWithDifferentPostAgg() throws IOException + { + final TopNQuery query1 = new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new LegacyTopNMetricSpec("metric1"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01T18:00:00/2015-01-02T18:00:00"))), + null, + Granularities.ALL, + ImmutableList.of( + new LongSumAggregatorFactory("metric1", "metric1"), + new LongSumAggregatorFactory("metric2", "metric2") + ), + ImmutableList.of( + new ArithmeticPostAggregator( + "post1", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + "metric1", + "metric1" + ), + new FieldAccessPostAggregator( + "metric2", + "metric2" + ) + ) + ) + ), + null + ); + + final TopNQuery query2 = new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new LegacyTopNMetricSpec("metric1"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01T18:00:00/2015-01-02T18:00:00"))), + null, + Granularities.ALL, + ImmutableList.of( + new LongSumAggregatorFactory("metric1", "metric1"), + new LongSumAggregatorFactory("metric2", "metric2") + ), + ImmutableList.of( + new ArithmeticPostAggregator( + "post2", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + "metric1", + "metric1" + ), + new FieldAccessPostAggregator( + "metric2", + "metric2" + ) + ) + ) + ), + null + ); + + final CacheStrategy, Object, TopNQuery> strategy1 = new TopNQueryQueryToolChest( + null, + null + ).getCacheStrategy(query1); + + final CacheStrategy, Object, TopNQuery> strategy2 = new TopNQueryQueryToolChest( + null, + null + ).getCacheStrategy(query2); + + //segment level cache key excludes postaggregates in topn + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy1.computeResultLevelCacheKey(query1))); + Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2))); } @Test diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 3efd9469dced..6a303b8a6df7 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -80,7 +80,7 @@ public Sequence run(QueryPlus queryPlus, Map responseContext) { if (useResultCache || populateResultCache) { - final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeCacheKey(query)); + final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query)); final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr); String existingResultSetId = extractEtagFromResults(cachedResultSet); From 565aa7ba71fcf7aee804a2bd00b46f8686488f09 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 26 Mar 2019 15:24:07 -0700 Subject: [PATCH 3/8] Add more tests --- .../groupby/GroupByQueryQueryToolChest.java | 1 - .../groupby/having/DimFilterHavingSpec.java | 1 - .../query/groupby/having/NotHavingSpec.java | 1 - .../GroupByQueryQueryToolChestTest.java | 97 ++++++++++++++++++- 4 files changed, 94 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 2a2505d5ab72..062f5b55d9e7 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -502,7 +502,6 @@ public byte[] computeResultLevelCacheKey(GroupByQuery query) builder.appendStrings(subTotalSpec); } } - return builder.build(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java index 4af23d504735..0b1db6259ad5 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java @@ -206,7 +206,6 @@ public byte[] getCacheKey() } outputStream.write(HavingSpecUtil.STRING_SEPARATOR); } - return outputStream.toByteArray(); } catch (IOException ex) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java index af87c25a6916..fe77efcf4289 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java @@ -107,7 +107,6 @@ public byte[] getCacheKey() ByteBuffer buffer = ByteBuffer.allocate(1 + havingSpecBytes.length) .put(HavingSpecUtil.CACHE_TYPE_ID_NOT) .put(havingSpecBytes); - return buffer.array(); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 026f643958b1..2bad8f82f7f4 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -35,8 +35,14 @@ import org.apache.druid.query.filter.BoundDimFilter; import org.apache.druid.query.filter.OrDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.having.AndHavingSpec; import org.apache.druid.query.groupby.having.DimFilterHavingSpec; +import org.apache.druid.query.groupby.having.EqualToHavingSpec; import org.apache.druid.query.groupby.having.GreaterThanHavingSpec; +import org.apache.druid.query.groupby.having.HavingSpec; +import org.apache.druid.query.groupby.having.LessThanHavingSpec; +import org.apache.druid.query.groupby.having.NotHavingSpec; +import org.apache.druid.query.groupby.having.OrHavingSpec; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.ordering.StringComparators; @@ -44,6 +50,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.List; public class GroupByQueryQueryToolChestTest { @@ -148,7 +155,6 @@ public void testResultLevelCacheKeyWithLimitSpec() QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() ).getCacheStrategy(query1); - final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( null, QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() @@ -215,6 +221,93 @@ public void testResultLevelCacheKeyWithHavingSpec() QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() ).getCacheStrategy(query1); + final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithAndHavingSpec() + { + final List havings = Arrays.asList( + new GreaterThanHavingSpec("agg", Double.valueOf(1.3)), + new OrHavingSpec( + Arrays.asList( + new LessThanHavingSpec("lessAgg", Long.valueOf(1L)), + new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2))) + ) + ) + ); + final HavingSpec andHavingSpec = new AndHavingSpec(havings); + + final List havings2 = Arrays.asList( + new GreaterThanHavingSpec("agg", Double.valueOf(13.0)), + new OrHavingSpec( + Arrays.asList( + new LessThanHavingSpec("lessAgg", Long.valueOf(1L)), + new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(22))) + ) + ) + ); + final HavingSpec andHavingSpec2 = new AndHavingSpec(havings2); + + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(andHavingSpec) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(andHavingSpec2) + .build(); + + final CacheStrategy strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( null, @@ -311,7 +404,6 @@ public void testResultLevelCacheKeyWithHavingDimFilterHavingSpec() QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() ).getCacheStrategy(query1); - final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( null, QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() @@ -379,7 +471,6 @@ public void testResultLevelCacheKeyWithSubTotalsSpec() QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() ).getCacheStrategy(query1); - final CacheStrategy strategy2 = new GroupByQueryQueryToolChest( null, QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() From 0744b1db57eb0d6a965be98f246035e1952d6387 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 11 Apr 2019 13:39:09 -0700 Subject: [PATCH 4/8] Use CacheKeyBuilder for HavingSpec's getCacheKey --- .../groupby/having/AlwaysHavingSpec.java | 3 +- .../query/groupby/having/AndHavingSpec.java | 18 ++----- .../groupby/having/DimFilterHavingSpec.java | 54 ++++++------------- .../having/DimensionSelectorHavingSpec.java | 19 +++---- .../groupby/having/EqualToHavingSpec.java | 37 +++---------- .../groupby/having/GreaterThanHavingSpec.java | 37 +++---------- .../query/groupby/having/HavingSpecUtil.java | 1 + .../groupby/having/LessThanHavingSpec.java | 37 +++---------- .../query/groupby/having/NeverHavingSpec.java | 3 +- .../query/groupby/having/NotHavingSpec.java | 10 ++-- .../query/groupby/having/OrHavingSpec.java | 19 ++----- .../query/groupby/having/HavingSpecTest.java | 12 ++--- 12 files changed, 63 insertions(+), 187 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java index ed70d182832c..8450589f814a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java @@ -20,6 +20,7 @@ package org.apache.druid.query.groupby.having; import org.apache.druid.data.input.Row; +import org.apache.druid.query.cache.CacheKeyBuilder; /** * A "having" spec that always evaluates to true @@ -35,6 +36,6 @@ public boolean eval(Row row) @Override public byte[] getCacheKey() { - return new byte[]{HavingSpecUtil.CACHE_TYPE_ID_ALWAYS}; + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_ALWAYS).build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java index 69e5f2829a57..f035db3dbde2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java @@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -115,19 +115,7 @@ public String toString() @Override public byte[] getCacheKey() { - final byte[][] havingBytes = new byte[havingSpecs.size()][]; - int havingBytesSize = 0; - int index = 0; - for (HavingSpec spec : havingSpecs) { - havingBytes[index] = spec.getCacheKey(); - havingBytesSize += havingBytes[index].length; - ++index; - } - ByteBuffer buffer = ByteBuffer.allocate(1 + havingBytesSize) - .put(HavingSpecUtil.CACHE_TYPE_ID_AND); - for (byte[] havingByte : havingBytes) { - buffer.put(havingByte); - } - return buffer.array(); + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_AND) + .appendCacheables(havingSpecs).build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java index 0b1db6259ad5..f4ff7a3ca241 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java @@ -22,11 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.Row; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.transform.RowFunction; @@ -35,14 +34,13 @@ import org.apache.druid.segment.transform.Transformer; import org.joda.time.DateTime; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class DimFilterHavingSpec extends BaseHavingSpec { @@ -177,41 +175,19 @@ public RowFunction getRowFunction() @Override public byte[] getCacheKey() { - try { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - outputStream.write(HavingSpecUtil.CACHE_TYPE_ID_DIM_FILTER); - outputStream.write(isFinalize() ? 1 : 0); - outputStream.write(dimFilter.getCacheKey()); - - for (Map.Entry entry : aggregators.entrySet()) { - final String key = entry.getKey(); - final AggregatorFactory val = entry.getValue(); - if (!Strings.isNullOrEmpty(key)) { - outputStream.write(StringUtils.toUtf8(key)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - byte[] aggregatorBytes = val.getCacheKey(); - outputStream.write(aggregatorBytes); - } - - for (Map.Entry entry : rowSignature.entrySet()) { - final String key = entry.getKey(); - final String val = entry.getValue().toString(); - if (!Strings.isNullOrEmpty(key)) { - outputStream.write(StringUtils.toUtf8(key)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - if (!Strings.isNullOrEmpty(val)) { - outputStream.write(StringUtils.toUtf8(val)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - } - return outputStream.toByteArray(); - } - catch (IOException ex) { - // If ByteArrayOutputStream.write has problems, that is a very bad thing - throw new RuntimeException(ex); - } + final List rowSignatureValues = rowSignature.values() + .stream() + .map(val -> val.toString()) + .collect(Collectors.toList()); + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_FILTER) + .appendCacheable(dimFilter) + .appendByte((byte) (isFinalize() ? 1 : 0)) + .appendStrings(rowSignature.keySet()) + .appendStrings(rowSignatureValues) + .appendStrings(aggregators.keySet()) + .appendCacheables(aggregators.values()) + .appendInt(evalCount) + .build(); } private static class RowAsInputRow implements InputRow diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java index 8c9d16bdc45b..4dfc6e63c607 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java @@ -24,11 +24,10 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.druid.data.input.Row; -import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.extraction.IdentityExtractionFn; -import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -123,16 +122,10 @@ public String toString() @Override public byte[] getCacheKey() { - byte[] dimensionBytes = StringUtils.toUtf8(dimension); - byte[] valueBytes = value == null ? new byte[]{} : StringUtils.toUtf8(value); - byte[] extractionFnBytes = extractionFn.getCacheKey(); - return ByteBuffer.allocate(3 + dimensionBytes.length + valueBytes.length + extractionFnBytes.length) - .put(HavingSpecUtil.CACHE_TYPE_ID_DIM_SELECTOR) - .put(dimensionBytes) - .put(HavingSpecUtil.STRING_SEPARATOR) - .put(valueBytes) - .put(HavingSpecUtil.STRING_SEPARATOR) - .put(extractionFnBytes) - .array(); + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_SELECTOR) + .appendString(dimension) + .appendString(value) + .appendByteArray(extractionFn == null ? new byte[0] : extractionFn.getCacheKey()) + .build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java index d5b30d26078e..9e6edd1a9127 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java @@ -21,13 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Strings; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.Map; /** @@ -131,32 +129,11 @@ public String toString() @Override public byte[] getCacheKey() { - try { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - outputStream.write(HavingSpecUtil.CACHE_TYPE_ID_EQUAL); - if (!Strings.isNullOrEmpty(aggregationName)) { - outputStream.write(StringUtils.toUtf8(aggregationName)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - outputStream.write(StringUtils.toUtf8(String.valueOf(value))); - if (aggregators == null) { - return outputStream.toByteArray(); - } - for (Map.Entry entry : aggregators.entrySet()) { - final String key = entry.getKey(); - final AggregatorFactory val = entry.getValue(); - if (!Strings.isNullOrEmpty(key)) { - outputStream.write(StringUtils.toUtf8(key)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - byte[] aggregatorBytes = val.getCacheKey(); - outputStream.write(aggregatorBytes); - } - return outputStream.toByteArray(); - } - catch (IOException ex) { - // If ByteArrayOutputStream.write has problems, that is a very bad thing - throw new RuntimeException(ex); - } + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_EQUAL) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .appendStrings(aggregators.keySet()) + .appendCacheables(aggregators.values()) + .build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java index 53974aee53d9..959d5615114e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -21,13 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Strings; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.Map; /** @@ -127,32 +125,11 @@ public String toString() @Override public byte[] getCacheKey() { - try { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - outputStream.write(HavingSpecUtil.CACHE_TYPE_ID_GREATER_THAN); - if (!Strings.isNullOrEmpty(aggregationName)) { - outputStream.write(StringUtils.toUtf8(aggregationName)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - outputStream.write(StringUtils.toUtf8(String.valueOf(value))); - if (aggregators == null) { - return outputStream.toByteArray(); - } - for (Map.Entry entry : aggregators.entrySet()) { - final String key = entry.getKey(); - final AggregatorFactory val = entry.getValue(); - if (!Strings.isNullOrEmpty(key)) { - outputStream.write(StringUtils.toUtf8(key)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - byte[] aggregatorBytes = val.getCacheKey(); - outputStream.write(aggregatorBytes); - } - return outputStream.toByteArray(); - } - catch (IOException ex) { - // If ByteArrayOutputStream.write has problems, that is a very bad thing - throw new RuntimeException(ex); - } + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_GREATER_THAN) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .appendStrings(aggregators.keySet()) + .appendCacheables(aggregators.values()) + .build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java index 96f47e8798fa..59ae42ae73ba 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java @@ -31,5 +31,6 @@ public class HavingSpecUtil static final byte CACHE_TYPE_ID_NEVER = 0x7; static final byte CACHE_TYPE_ID_NOT = 0x8; static final byte CACHE_TYPE_ID_OR = 0x9; + static final byte CACHE_TYPE_ID_COUNTING = 0xA; static final byte STRING_SEPARATOR = (byte) 0xFF; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java index 147317fc4dd6..271c282ae57a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java @@ -20,13 +20,11 @@ package org.apache.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Strings; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.Map; /** @@ -125,32 +123,11 @@ public String toString() @Override public byte[] getCacheKey() { - try { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - outputStream.write(HavingSpecUtil.CACHE_TYPE_ID_LESS_THAN); - if (!Strings.isNullOrEmpty(aggregationName)) { - outputStream.write(StringUtils.toUtf8(aggregationName)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - outputStream.write(StringUtils.toUtf8(String.valueOf(value))); - if (aggregators == null) { - return outputStream.toByteArray(); - } - for (Map.Entry entry : aggregators.entrySet()) { - final String key = entry.getKey(); - final AggregatorFactory val = entry.getValue(); - if (!Strings.isNullOrEmpty(key)) { - outputStream.write(StringUtils.toUtf8(key)); - } - outputStream.write(HavingSpecUtil.STRING_SEPARATOR); - byte[] aggregatorBytes = val.getCacheKey(); - outputStream.write(aggregatorBytes); - } - return outputStream.toByteArray(); - } - catch (IOException ex) { - // If ByteArrayOutputStream.write has problems, that is a very bad thing - throw new RuntimeException(ex); - } + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_LESS_THAN) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .appendStrings(aggregators.keySet()) + .appendCacheables(aggregators.values()) + .build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java index 42bb97ddb461..fa2c15d4bc48 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java @@ -20,6 +20,7 @@ package org.apache.druid.query.groupby.having; import org.apache.druid.data.input.Row; +import org.apache.druid.query.cache.CacheKeyBuilder; /** * A "having" spec that always evaluates to false @@ -35,6 +36,6 @@ public boolean eval(Row row) @Override public byte[] getCacheKey() { - return new byte[]{HavingSpecUtil.CACHE_TYPE_ID_NEVER}; + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_NEVER).build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java index fe77efcf4289..81d7a63eaf87 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java @@ -23,9 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; -import java.nio.ByteBuffer; import java.util.Map; /** @@ -103,10 +103,8 @@ public int hashCode() @Override public byte[] getCacheKey() { - byte[] havingSpecBytes = havingSpec.getCacheKey(); - ByteBuffer buffer = ByteBuffer.allocate(1 + havingSpecBytes.length) - .put(HavingSpecUtil.CACHE_TYPE_ID_NOT) - .put(havingSpecBytes); - return buffer.array(); + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_NOT) + .appendCacheable(havingSpec) + .build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java index 9a1405087f08..e6483490d7e0 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java @@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -115,19 +115,8 @@ public String toString() @Override public byte[] getCacheKey() { - final byte[][] havingBytes = new byte[havingSpecs.size()][]; - int havingBytesSize = 0; - int index = 0; - for (HavingSpec spec : havingSpecs) { - havingBytes[index] = spec.getCacheKey(); - havingBytesSize += havingBytes[index].length; - ++index; - } - ByteBuffer buffer = ByteBuffer.allocate(1 + havingBytesSize) - .put(HavingSpecUtil.CACHE_TYPE_ID_OR); - for (byte[] havingByte : havingBytes) { - buffer.put(havingByte); - } - return buffer.array(); + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_OR) + .appendCacheables(havingSpecs) + .build(); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java index 66412161fef6..a0e1d7457117 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java @@ -26,9 +26,9 @@ import org.apache.druid.data.input.Row; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.junit.Test; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -239,12 +239,10 @@ public boolean eval(Row row) @Override public byte[] getCacheKey() { - byte valueByte = (byte) (value ? 1 : 0); - byte[] counterBytes = StringUtils.toUtf8(String.valueOf(counter)); - return ByteBuffer.allocate(1 + counterBytes.length) - .put(valueByte) - .put(counterBytes) - .array(); + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_COUNTING) + .appendByte((byte) (value ? 1 : 0)) + .appendByteArray(StringUtils.toUtf8(String.valueOf(counter))) + .build(); } } From 7f8932d45c0c0e9d7431039aaf3fdce4d3364ed4 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 12 Apr 2019 11:17:14 -0700 Subject: [PATCH 5/8] Initialize aggregators map to avoid NPE --- .../apache/druid/query/groupby/having/EqualToHavingSpec.java | 3 ++- .../druid/query/groupby/having/GreaterThanHavingSpec.java | 3 ++- .../org/apache/druid/query/groupby/having/HavingSpecUtil.java | 1 - .../apache/druid/query/groupby/having/LessThanHavingSpec.java | 3 ++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java index 9e6edd1a9127..ce055f0a5e2c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java @@ -26,6 +26,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; +import java.util.HashMap; import java.util.Map; /** @@ -37,7 +38,7 @@ public class EqualToHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; - private volatile Map aggregators; + private volatile Map aggregators = new HashMap<>(); @JsonCreator public EqualToHavingSpec( diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java index 959d5615114e..c865d7704207 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -26,6 +26,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; +import java.util.HashMap; import java.util.Map; /** @@ -37,7 +38,7 @@ public class GreaterThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; - private volatile Map aggregators; + private volatile Map aggregators = new HashMap<>(); @JsonCreator public GreaterThanHavingSpec( diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java index 59ae42ae73ba..e1227e7f0437 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java @@ -32,5 +32,4 @@ public class HavingSpecUtil static final byte CACHE_TYPE_ID_NOT = 0x8; static final byte CACHE_TYPE_ID_OR = 0x9; static final byte CACHE_TYPE_ID_COUNTING = 0xA; - static final byte STRING_SEPARATOR = (byte) 0xFF; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java index 271c282ae57a..2de44701d6a4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java @@ -25,6 +25,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; +import java.util.HashMap; import java.util.Map; /** @@ -36,7 +37,7 @@ public class LessThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; - private volatile Map aggregators; + private volatile Map aggregators = new HashMap<>(); public LessThanHavingSpec( @JsonProperty("aggregation") String aggName, From 3562b5617d7e47dc47dd2241c992c48f40a4ddd5 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 12 Apr 2019 11:58:31 -0700 Subject: [PATCH 6/8] adjust cachekey builder for HavingSpec to ignore aggregators --- .../query/groupby/having/DimFilterHavingSpec.java | 10 ---------- .../druid/query/groupby/having/EqualToHavingSpec.java | 4 +--- .../query/groupby/having/GreaterThanHavingSpec.java | 5 +---- .../druid/query/groupby/having/LessThanHavingSpec.java | 4 +--- 4 files changed, 3 insertions(+), 20 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java index f4ff7a3ca241..cf916bf02556 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; public class DimFilterHavingSpec extends BaseHavingSpec { @@ -175,18 +174,9 @@ public RowFunction getRowFunction() @Override public byte[] getCacheKey() { - final List rowSignatureValues = rowSignature.values() - .stream() - .map(val -> val.toString()) - .collect(Collectors.toList()); return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_FILTER) .appendCacheable(dimFilter) .appendByte((byte) (isFinalize() ? 1 : 0)) - .appendStrings(rowSignature.keySet()) - .appendStrings(rowSignatureValues) - .appendStrings(aggregators.keySet()) - .appendCacheables(aggregators.values()) - .appendInt(evalCount) .build(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java index ce055f0a5e2c..86bce0648139 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java @@ -38,7 +38,7 @@ public class EqualToHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; - private volatile Map aggregators = new HashMap<>(); + private volatile Map aggregators; @JsonCreator public EqualToHavingSpec( @@ -133,8 +133,6 @@ public byte[] getCacheKey() return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_EQUAL) .appendString(aggregationName) .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) - .appendStrings(aggregators.keySet()) - .appendCacheables(aggregators.values()) .build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java index c865d7704207..aa276a9466ca 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -26,7 +26,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; -import java.util.HashMap; import java.util.Map; /** @@ -38,7 +37,7 @@ public class GreaterThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; - private volatile Map aggregators = new HashMap<>(); + private volatile Map aggregators; @JsonCreator public GreaterThanHavingSpec( @@ -129,8 +128,6 @@ public byte[] getCacheKey() return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_GREATER_THAN) .appendString(aggregationName) .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) - .appendStrings(aggregators.keySet()) - .appendCacheables(aggregators.values()) .build(); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java index 2de44701d6a4..dfe9dadd71e6 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java @@ -37,7 +37,7 @@ public class LessThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; - private volatile Map aggregators = new HashMap<>(); + private volatile Map aggregators; public LessThanHavingSpec( @JsonProperty("aggregation") String aggName, @@ -127,8 +127,6 @@ public byte[] getCacheKey() return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_LESS_THAN) .appendString(aggregationName) .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) - .appendStrings(aggregators.keySet()) - .appendCacheables(aggregators.values()) .build(); } } From 9bef29c55fb6264fa25ca07def5b126e4c210da7 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 12 Apr 2019 12:02:04 -0700 Subject: [PATCH 7/8] unused import --- .../org/apache/druid/query/groupby/having/EqualToHavingSpec.java | 1 - .../apache/druid/query/groupby/having/LessThanHavingSpec.java | 1 - 2 files changed, 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java index 86bce0648139..00c471b200b9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java @@ -26,7 +26,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; -import java.util.HashMap; import java.util.Map; /** diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java index dfe9dadd71e6..3c937cfba9fd 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java @@ -25,7 +25,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; -import java.util.HashMap; import java.util.Map; /** From 89d8efda1312ab1ef6fbc8d60229d60d2bc8c053 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 18 Apr 2019 10:44:11 -0700 Subject: [PATCH 8/8] PR comments --- .../druid/query/groupby/GroupByQueryQueryToolChest.java | 6 ++---- .../metadata/SegmentMetadataQueryQueryToolChest.java | 8 +++++++- .../query/timeseries/TimeseriesQueryQueryToolChest.java | 6 ++---- .../apache/druid/query/topn/TopNQueryQueryToolChest.java | 8 ++------ .../org/apache/druid/client/CachingClusteredClient.java | 3 +++ 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 062f5b55d9e7..c94d427b1bd2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -492,11 +492,9 @@ public byte[] computeResultLevelCacheKey(GroupByQuery query) .appendCacheables(query.getDimensions()) .appendCacheable(query.getVirtualColumns()) .appendCacheable(query.getHavingSpec()) - .appendCacheable(query.getLimitSpec()); + .appendCacheable(query.getLimitSpec()) + .appendCacheables(query.getPostAggregatorSpecs()); - if (!query.getPostAggregatorSpecs().isEmpty()) { - builder.appendCacheables(query.getPostAggregatorSpecs()); - } if (query.getSubtotalsSpec() != null && !query.getSubtotalsSpec().isEmpty()) { for (List subTotalSpec : query.getSubtotalsSpec()) { builder.appendStrings(subTotalSpec); diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 52c723e2f9a3..2813000ab760 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -50,6 +50,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.MetricManipulationFn; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -73,6 +74,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest MERGE_TRANSFORM_FN = new Function() { @Override @@ -197,7 +199,11 @@ public byte[] computeCacheKey(SegmentMetadataQuery query) @Override public byte[] computeResultLevelCacheKey(SegmentMetadataQuery query) { - return computeCacheKey(query); + // need to include query "merge" and "lenientAggregatorMerge" for result level cache key + return new CacheKeyBuilder(SEGMENT_METADATA_QUERY).appendByteArray(computeCacheKey(query)) + .appendBoolean(query.isMerge()) + .appendBoolean(query.isLenientAggregatorMerge()) + .build(); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 631f65773a95..f8f5aa0c4c7e 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -285,10 +285,8 @@ public byte[] computeResultLevelCacheKey(TimeseriesQuery query) .appendCacheables(query.getAggregatorSpecs()) .appendCacheable(query.getVirtualColumns()) .appendCacheables(query.getPostAggregatorSpecs()) - .appendInt(query.getLimit()); - if (query.isGrandTotal()) { - builder.appendBoolean(query.getContextValue(TimeseriesQuery.CTX_GRAND_TOTAL)); - } + .appendInt(query.getLimit()) + .appendBoolean(query.isGrandTotal()); return builder.build(); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index b20b08dafe1c..2c3bd2b2f758 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -336,12 +336,8 @@ public byte[] computeResultLevelCacheKey(TopNQuery query) .appendCacheable(query.getGranularity()) .appendCacheable(query.getDimensionsFilter()) .appendCacheables(query.getAggregatorSpecs()) - .appendCacheable(query.getVirtualColumns()); - - final List postAggregators = query.getPostAggregatorSpecs(); - if (!postAggregators.isEmpty()) { - builder.appendCacheables(query.getPostAggregatorSpecs()); - } + .appendCacheable(query.getVirtualColumns()) + .appendCacheables(query.getPostAggregatorSpecs()); return builder.build(); } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index cc72efcd15f1..1facccdf0fea 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -385,6 +385,9 @@ private String computeCurrentEtag(final Set segments, @Nullable break; } hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8); + // it is important to add the "query interval" as part ETag calculation + // to have result level cache work correctly for queries with different + // intervals covering the same set of segments hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8); }