From 660d156287e2918b9d9135e23214d7ef0202c4ff Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Mon, 20 Nov 2023 13:44:18 +0000 Subject: [PATCH 1/5] Fix resultcache multiple postaggregation restore Fixes #15393 --- .../druid/query/groupby/GroupByQueryQueryToolChest.java | 5 +---- .../query/groupby/GroupByQueryQueryToolChestTest.java | 9 +++++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 9c746dd41429..63b67c860d1f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -62,7 +62,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.aggregation.MetricManipulatorFns; -import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -651,9 +650,7 @@ public ResultRow apply(Object input) ); if (isResultLevelCache) { - Iterator postItr = query.getPostAggregatorSpecs().iterator(); - int postPos = 0; - while (postItr.hasNext() && results.hasNext()) { + for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) { resultRow.set(postAggregatorStart + postPos, results.next()); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 9b0a8e193af8..3a5a8d032afa 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -986,7 +986,8 @@ private void doTestCacheStrategy(final ColumnType valueType, final Object dimVal ) ) .setPostAggregatorSpecs( - ImmutableList.of(new ConstantPostAggregator("post", 10)) + new ConstantPostAggregator("post", 10), + new ConstantPostAggregator("post2", 20) ) .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); @@ -1017,14 +1018,14 @@ private void doTestCacheStrategy(final ColumnType valueType, final Object dimVal Assert.assertEquals(result1, fromCacheResult); // test timestamps that result in integer size millis - final ResultRow result2 = ResultRow.of(123L, dimValue, 1, dimValue, 10); + final ResultRow result2 = ResultRow.of(123L, dimValue, 1, dimValue, 10, 20); // Please see the comments on aggregator serde and type handling in CacheStrategy.fetchAggregatorsFromCache() final ResultRow typeAdjustedResult2; if (valueType.is(ValueType.FLOAT)) { - typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2.1d, 10); + typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2.1d, 10, 20); } else if (valueType.is(ValueType.LONG)) { - typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2, 10); + typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2, 10, 20); } else { typeAdjustedResult2 = result2; } From f744dd05ba390b920366632ae6e0601effbf0bfd Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Mon, 20 Nov 2023 15:31:02 +0000 Subject: [PATCH 2/5] updates --- .../groupby/GroupByQueryQueryToolChest.java | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 63b67c860d1f..aada0b910785 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -577,8 +577,6 @@ public Object apply(ResultRow resultRow) if (resultRowHasTimestamp) { retVal.add(resultRow.getLong(inPos++)); - } else { - retVal.add(query.getUniversalTimestamp().getMillis()); } for (int i = 0; i < dims.size(); i++) { @@ -612,32 +610,37 @@ public Function pullFromCache(boolean isResultLevelCache) @Override public ResultRow apply(Object input) { - Iterator results = ((List) input).iterator(); + List inputList = (List) input; + Iterator results = inputList.iterator(); - DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue()); final int size = isResultLevelCache ? query.getResultRowSizeWithPostAggregators() : query.getResultRowSizeWithoutPostAggregators(); + if (size != inputList.size()) { + throw new ISE( + "Stored field count[%d] / output field field count[%d] mismatch", + inputList.size(), + size + ); + } + final ResultRow resultRow = ResultRow.create(size); if (resultRowHasTimestamp) { + DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue()); resultRow.set(0, timestamp.getMillis()); } - final Iterator dimsIter = dims.iterator(); - int dimPos = 0; - while (dimsIter.hasNext() && results.hasNext()) { - final DimensionSpec dimensionSpec = dimsIter.next(); + for (int dimPos = 0; dimPos < dims.size(); dimPos++) { + final DimensionSpec dimensionSpec = dims.get(dimPos); // Must convert generic Jackson-deserialized type into the proper type. resultRow.set( dimensionStart + dimPos, DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType()) ); - - dimPos++; } CacheStrategy.fetchAggregatorsFromCache( @@ -654,13 +657,6 @@ public ResultRow apply(Object input) resultRow.set(postAggregatorStart + postPos, results.next()); } } - if (dimsIter.hasNext() || results.hasNext()) { - throw new ISE( - "Found left over objects while reading from cache!! dimsIter[%s] results[%s]", - dimsIter.hasNext(), - results.hasNext() - ); - } return resultRow; } From 137e8b515b539920c95e38086b2460698e286e7b Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Mon, 20 Nov 2023 20:47:01 +0000 Subject: [PATCH 3/5] Revert "updates" This reverts commit f744dd05ba390b920366632ae6e0601effbf0bfd. --- .../groupby/GroupByQueryQueryToolChest.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index aada0b910785..63b67c860d1f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -577,6 +577,8 @@ public Object apply(ResultRow resultRow) if (resultRowHasTimestamp) { retVal.add(resultRow.getLong(inPos++)); + } else { + retVal.add(query.getUniversalTimestamp().getMillis()); } for (int i = 0; i < dims.size(); i++) { @@ -610,37 +612,32 @@ public Function pullFromCache(boolean isResultLevelCache) @Override public ResultRow apply(Object input) { - List inputList = (List) input; - Iterator results = inputList.iterator(); + Iterator results = ((List) input).iterator(); + DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue()); final int size = isResultLevelCache ? query.getResultRowSizeWithPostAggregators() : query.getResultRowSizeWithoutPostAggregators(); - if (size != inputList.size()) { - throw new ISE( - "Stored field count[%d] / output field field count[%d] mismatch", - inputList.size(), - size - ); - } - final ResultRow resultRow = ResultRow.create(size); if (resultRowHasTimestamp) { - DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue()); resultRow.set(0, timestamp.getMillis()); } - for (int dimPos = 0; dimPos < dims.size(); dimPos++) { - final DimensionSpec dimensionSpec = dims.get(dimPos); + final Iterator dimsIter = dims.iterator(); + int dimPos = 0; + while (dimsIter.hasNext() && results.hasNext()) { + final DimensionSpec dimensionSpec = dimsIter.next(); // Must convert generic Jackson-deserialized type into the proper type. resultRow.set( dimensionStart + dimPos, DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType()) ); + + dimPos++; } CacheStrategy.fetchAggregatorsFromCache( @@ -657,6 +654,13 @@ public ResultRow apply(Object input) resultRow.set(postAggregatorStart + postPos, results.next()); } } + if (dimsIter.hasNext() || results.hasNext()) { + throw new ISE( + "Found left over objects while reading from cache!! dimsIter[%s] results[%s]", + dimsIter.hasNext(), + results.hasNext() + ); + } return resultRow; } From d63064b4cb63fec756f7ccb3bba88bc0ccc48f24 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Mon, 20 Nov 2023 20:47:45 +0000 Subject: [PATCH 4/5] add exception --- .../apache/druid/query/groupby/GroupByQueryQueryToolChest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 63b67c860d1f..86bda7678c1f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -651,6 +651,9 @@ public ResultRow apply(Object input) if (isResultLevelCache) { for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) { + if (!results.hasNext()) { + throw new ISE("Ran out of objects while reading postaggs from cache!"); + } resultRow.set(postAggregatorStart + postPos, results.next()); } } From b566997b3e9e17bd089ced7c060bc69e4a9d4fce Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Tue, 21 Nov 2023 08:32:54 +0000 Subject: [PATCH 5/5] use druidexception --- .../apache/druid/query/groupby/GroupByQueryQueryToolChest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 86bda7678c1f..96b8c7ec69f4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -35,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.data.input.Row; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.MemoryAllocatorFactory; @@ -652,7 +653,7 @@ public ResultRow apply(Object input) if (isResultLevelCache) { for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) { if (!results.hasNext()) { - throw new ISE("Ran out of objects while reading postaggs from cache!"); + throw DruidException.defensive("Ran out of objects while reading postaggs from cache!"); } resultRow.set(postAggregatorStart + postPos, results.next()); }