From 8507aa089ce1d6dd23de295af2662b779c9ed4d9 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 7 May 2019 17:33:47 -0700 Subject: [PATCH 1/7] Fix exception when using complex aggs with result level caching --- .../groupby/GroupByQueryQueryToolChest.java | 16 +- .../TimeseriesQueryQueryToolChest.java | 14 +- .../query/topn/TopNQueryQueryToolChest.java | 13 +- .../GroupByQueryQueryToolChestTest.java | 153 +++++++++++++++++- .../TimeseriesQueryQueryToolChestTest.java | 18 ++- .../topn/TopNQueryQueryToolChestTest.java | 80 ++++++++- 6 files changed, 283 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index c94d427b1bd2..ebbb5fb19995 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -566,12 +566,24 @@ public Row apply(Object input) DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType()) ); } - Iterator aggsIter = aggs.iterator(); + + // When using the result level cache, the agg values seen here are + // finalized values generated by AggregatorFactory.finalizeComputation(). + // These finalized values are deserialized from the cache as generic Objects, which will + // later be reserialized and returned to the user without further modification. + // Because the agg values are deserialized as generic Objects, the values are subject to the same + // type consistency issues handled by DimensionHandlerUtils.convertObjectToType() above for + // dimension values (e.g., a Float would become Double). while (aggsIter.hasNext() && results.hasNext()) { final AggregatorFactory factory = aggsIter.next(); - event.put(factory.getName(), factory.deserialize(results.next())); + if (isResultLevelCache) { + event.put(factory.getName(), results.next()); + } else { + event.put(factory.getName(), factory.deserialize(results.next())); + } } + if (isResultLevelCache) { Iterator postItr = query.getPostAggregatorSpecs().iterator(); while (postItr.hasNext() && results.hasNext()) { diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index f8f5aa0c4c7e..fbe6a31611c0 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -334,10 +334,22 @@ public Result apply(@Nullable Object input) DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue()); + // When using the result level cache, the agg values seen here are + // finalized values generated by AggregatorFactory.finalizeComputation(). + // These finalized values are deserialized from the cache as generic Objects, which will + // later be reserialized and returned to the user without further modification. + // Because the agg values are deserialized as generic Objects, the values are subject to the same + // type consistency issues handled by DimensionHandlerUtils.convertObjectToType() above for + // dimension values (e.g., a Float would become Double). while (aggsIter.hasNext() && resultIter.hasNext()) { final AggregatorFactory factory = aggsIter.next(); - retVal.put(factory.getName(), factory.deserialize(resultIter.next())); + if (isResultLevelCache) { + retVal.put(factory.getName(), resultIter.next()); + } else { + retVal.put(factory.getName(), factory.deserialize(resultIter.next())); + } } + if (isResultLevelCache) { Iterator postItr = query.getPostAggregatorSpecs().iterator(); while (postItr.hasNext() && resultIter.hasNext()) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 2c3bd2b2f758..793a960c8e96 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -409,9 +409,20 @@ public Result apply(Object input) DimensionHandlerUtils.convertObjectToType(resultIter.next(), query.getDimensionSpec().getOutputType()) ); + // When using the result level cache, the agg values seen here are + // finalized values generated by AggregatorFactory.finalizeComputation(). + // These finalized values are deserialized from the cache as generic Objects, which will + // later be reserialized and returned to the user without further modification. + // Because the agg values are deserialized as generic Objects, the values are subject to the same + // type consistency issues handled by DimensionHandlerUtils.convertObjectToType() above for + // dimension values (e.g., a Float would become Double). while (aggIter.hasNext() && resultIter.hasNext()) { final AggregatorFactory factory = aggIter.next(); - vals.put(factory.getName(), factory.deserialize(resultIter.next())); + if (isResultLevelCache) { + vals.put(factory.getName(), resultIter.next()); + } else { + vals.put(factory.getName(), factory.deserialize(resultIter.next())); + } } for (PostAggregator postAgg : postAggs) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 2bad8f82f7f4..85323eb0c1fe 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -19,15 +19,26 @@ package org.apache.druid.query.groupby; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; +import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.expression.TestExprMacroTable; @@ -45,11 +56,15 @@ import org.apache.druid.query.groupby.having.OrHavingSpec; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; -import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.query.ordering.StringComparators;; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ValueType; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; public class GroupByQueryQueryToolChestTest @@ -483,4 +498,140 @@ public void testResultLevelCacheKeyWithSubTotalsSpec() )); } + @Test + public void testCacheStrategy() throws Exception + { + doTestCacheStrategy(ValueType.STRING, "val1"); + doTestCacheStrategy(ValueType.FLOAT, 2.1f); + doTestCacheStrategy(ValueType.DOUBLE, 2.1d); + doTestCacheStrategy(ValueType.LONG, 2L); + } + + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { + switch (valueType) { + case LONG: + return new LongLastAggregatorFactory("complexMetric", "test"); + case DOUBLE: + return new DoubleLastAggregatorFactory("complexMetric", "test"); + case FLOAT: + return new FloatLastAggregatorFactory("complexMetric", "test"); + case STRING: + return new StringLastAggregatorFactory("complexMetric", "test", null); + default: + throw new IllegalArgumentException("bad valueType: " + valueType); + } + } + + private SerializablePair getIntermediateComplexValue(final ValueType valueType, final Object dimValue) { + switch (valueType) { + case LONG: + case DOUBLE: + case FLOAT: + return new SerializablePair<>(123L, dimValue); + case STRING: + return new SerializablePairLongString(123L, (String) dimValue); + default: + throw new IllegalArgumentException("bad valueType: " + valueType); + } + } + + private void doTestCacheStrategy(final ValueType valueType, final Object dimValue) throws IOException + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Collections.singletonList( + new DefaultDimensionSpec("test", "test", valueType) + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + getComplexAggregatorFactoryForValueType(valueType) + ) + ) + .setPostAggregatorSpecs( + ImmutableList.of(new ConstantPostAggregator("post", 10)) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + CacheStrategy strategy = + new GroupByQueryQueryToolChest(null, null).getCacheStrategy( + query1 + ); + + final Row result1 = new MapBasedRow( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + ImmutableMap.of( + "test", dimValue, + "rows", 1, + "complexMetric", getIntermediateComplexValue(valueType, dimValue) + ) + ); + + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( + result1 + ); + + ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + Object fromCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedValue), + strategy.getCacheObjectClazz() + ); + + Row fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + + final Row result2 = new MapBasedRow( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + ImmutableMap.of( + "test", dimValue, + "rows", 1, + "complexMetric", dimValue, + "post", 10 + ) + ); + + final Row typeAdjustedResult2; + if (valueType == ValueType.FLOAT) { + typeAdjustedResult2 = new MapBasedRow( + DateTimes.utc(123L), + ImmutableMap.of( + "test", dimValue, + "rows", 1, + "complexMetric", 2.1d, + "post", 10 + ) + ); + } else if (valueType == ValueType.LONG) { + typeAdjustedResult2 = new MapBasedRow( + DateTimes.utc(123L), + ImmutableMap.of( + "test", dimValue, + "rows", 1, + "complexMetric", 2, + "post", 10 + ) + ); + } else { + typeAdjustedResult2 = result2; + } + + + Object preparedResultCacheValue = strategy.prepareForCache(true).apply( + result2 + ); + + Object fromResultCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultCacheValue), + strategy.getCacheObjectClazz() + ); + + Row fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue); + Assert.assertEquals(typeAdjustedResult2, fromResultCacheResult); + } } diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 6d07c59d269e..304ac9b5ff1f 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -32,6 +32,8 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; @@ -77,7 +79,8 @@ public void testCacheStrategy() throws Exception Granularities.ALL, ImmutableList.of( new CountAggregatorFactory("metric1"), - new LongSumAggregatorFactory("metric0", "metric0") + new LongSumAggregatorFactory("metric0", "metric0"), + new StringLastAggregatorFactory("complexMetric", "test", null) ), ImmutableList.of(new ConstantPostAggregator("post", 10)), 0, @@ -89,7 +92,11 @@ public void testCacheStrategy() throws Exception // test timestamps that result in integer size millis DateTimes.utc(123L), new TimeseriesResultValue( - ImmutableMap.of("metric1", 2, "metric0", 3) + ImmutableMap.of( + "metric1", 2, + "metric0", 3, + "complexMetric", new SerializablePairLongString(123L, "val1") + ) ) ); @@ -109,7 +116,12 @@ public void testCacheStrategy() throws Exception // test timestamps that result in integer size millis DateTimes.utc(123L), new TimeseriesResultValue( - ImmutableMap.of("metric1", 2, "metric0", 3, "post", 10) + ImmutableMap.of( + "metric1", 2, + "metric0", 3, + "complexMetric", "val1", + "post", 10 + ) ) ); diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index 81079df3c14c..4c4a97b0f436 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.CloseableStupidPool; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -35,8 +36,14 @@ import org.apache.druid.query.Result; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TestQueryRunners; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; @@ -269,6 +276,34 @@ public void testMinTopNThreshold() } } + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { + switch (valueType) { + case LONG: + return new LongLastAggregatorFactory("complexMetric", "test"); + case DOUBLE: + return new DoubleLastAggregatorFactory("complexMetric", "test"); + case FLOAT: + return new FloatLastAggregatorFactory("complexMetric", "test"); + case STRING: + return new StringLastAggregatorFactory("complexMetric", "test", null); + default: + throw new IllegalArgumentException("bad valueType: " + valueType); + } + } + + private SerializablePair getIntermediateComplexValue(final ValueType valueType, final Object dimValue) { + switch (valueType) { + case LONG: + case DOUBLE: + case FLOAT: + return new SerializablePair<>(123L, dimValue); + case STRING: + return new SerializablePairLongString(123L, (String) dimValue); + default: + throw new IllegalArgumentException("bad valueType: " + valueType); + } + } + private void doTestCacheStrategy(final ValueType valueType, final Object dimValue) throws IOException { CacheStrategy, Object, TopNQuery> strategy = @@ -282,7 +317,10 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), null, Granularities.ALL, - ImmutableList.of(new CountAggregatorFactory("metric1")), + ImmutableList.of( + new CountAggregatorFactory("metric1"), + getComplexAggregatorFactoryForValueType(valueType) + ), ImmutableList.of(new ConstantPostAggregator("post", 10)), null ) @@ -295,7 +333,8 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu Collections.singletonList( ImmutableMap.of( "test", dimValue, - "metric1", 2 + "metric1", 2, + "complexMetric", getIntermediateComplexValue(valueType, dimValue) ) ) ) @@ -323,12 +362,47 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu ImmutableMap.of( "test", dimValue, "metric1", 2, + "complexMetric", dimValue, "post", 10 ) ) ) ); + final Result typeAdjustedResult2; + if (valueType == ValueType.FLOAT) { + typeAdjustedResult2 = new Result<>( + DateTimes.utc(123L), + new TopNResultValue( + Collections.singletonList( + ImmutableMap.of( + "test", dimValue, + "metric1", 2, + "complexMetric", 2.1d, + "post", 10 + ) + ) + ) + ); + } else if (valueType == ValueType.LONG) { + typeAdjustedResult2 = new Result<>( + DateTimes.utc(123L), + new TopNResultValue( + Collections.singletonList( + ImmutableMap.of( + "test", dimValue, + "metric1", 2, + "complexMetric", 2, + "post", 10 + ) + ) + ) + ); + } else { + typeAdjustedResult2 = result2; + } + + Object preparedResultCacheValue = strategy.prepareForCache(true).apply( result2 ); @@ -339,7 +413,7 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu ); Result fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue); - Assert.assertEquals(result2, fromResultCacheResult); + Assert.assertEquals(typeAdjustedResult2, fromResultCacheResult); } static class MockQueryRunner implements QueryRunner> From e5f88ec7f1198e6939df77a910f0ed2159f4e00d Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 7 May 2019 19:48:24 -0700 Subject: [PATCH 2/7] Add test comments --- .../druid/query/groupby/GroupByQueryQueryToolChestTest.java | 1 + .../org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 85323eb0c1fe..007b6dc25983 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -596,6 +596,7 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu ) ); + // Please see the comments on aggregator serde and type handling in GroupByQueryQueryToolChest.pullFromCache() final Row typeAdjustedResult2; if (valueType == ValueType.FLOAT) { typeAdjustedResult2 = new MapBasedRow( diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index 4c4a97b0f436..2928228fecfd 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -369,6 +369,7 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu ) ); + // Please see the comments on aggregator serde and type handling in TopNQueryQueryToolChest.pullFromCache() final Result typeAdjustedResult2; if (valueType == ValueType.FLOAT) { typeAdjustedResult2 = new Result<>( From eab6817bc10dd327cf00c1548a3dbf1da399c2cf Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 8 May 2019 04:08:06 -0700 Subject: [PATCH 3/7] checkstyle --- .../query/groupby/GroupByQueryQueryToolChestTest.java | 8 +++++--- .../druid/query/topn/TopNQueryQueryToolChestTest.java | 6 ++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 007b6dc25983..aadbfd15bbe8 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -56,7 +56,7 @@ import org.apache.druid.query.groupby.having.OrHavingSpec; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; -import org.apache.druid.query.ordering.StringComparators;; +import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ValueType; import org.junit.Assert; @@ -507,7 +507,8 @@ public void testCacheStrategy() throws Exception doTestCacheStrategy(ValueType.LONG, 2L); } - private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) + { switch (valueType) { case LONG: return new LongLastAggregatorFactory("complexMetric", "test"); @@ -522,7 +523,8 @@ private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueTyp } } - private SerializablePair getIntermediateComplexValue(final ValueType valueType, final Object dimValue) { + private SerializablePair getIntermediateComplexValue(final ValueType valueType, final Object dimValue) + { switch (valueType) { case LONG: case DOUBLE: diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index 2928228fecfd..32a919926c8f 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -276,7 +276,8 @@ public void testMinTopNThreshold() } } - private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) + { switch (valueType) { case LONG: return new LongLastAggregatorFactory("complexMetric", "test"); @@ -291,7 +292,8 @@ private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueTyp } } - private SerializablePair getIntermediateComplexValue(final ValueType valueType, final Object dimValue) { + private SerializablePair getIntermediateComplexValue(final ValueType valueType, final Object dimValue) + { switch (valueType) { case LONG: case DOUBLE: From 64c8f50dc9a99e26fd142fa777c8d2a62e98c146 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 8 May 2019 13:24:06 -0700 Subject: [PATCH 4/7] Add helper function for getting aggs from cache --- .../apache/druid/query/QueryToolChest.java | 29 ++++++++++++++++++- .../groupby/GroupByQueryQueryToolChest.java | 26 +++++++---------- .../TimeseriesQueryQueryToolChest.java | 26 +++++++---------- .../query/topn/TopNQueryQueryToolChest.java | 26 +++++++---------- 4 files changed, 58 insertions(+), 49 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index 81f59e43f2e5..5298e600ba17 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -24,11 +24,14 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.base.Function; import org.apache.druid.guice.annotations.ExtensionPoint; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.timeline.LogicalSegment; import javax.annotation.Nullable; +import java.util.Iterator; import java.util.List; +import java.util.function.BiFunction; /** * The broker-side (also used by server in some cases) API for a specific Query type. @@ -88,7 +91,7 @@ public final JavaType getBySegmentResultType() * to allow for query-specific dimensions and metrics. That is, the ToolChest is expected to set some * meaningful dimensions for metrics given this query type. Examples might be the topN threshold for * a TopN query or the number of dimensions included for a groupBy query. - * + * *

QueryToolChests for query types in core (druid-processing) and public extensions (belonging to the Druid source * tree) should use delegate this method to {@link GenericQueryMetricsFactory#makeMetrics(Query)} on an injected * instance of {@link GenericQueryMetricsFactory}, as long as they don't need to emit custom dimensions and/or @@ -219,4 +222,28 @@ public List filterSegments(QueryType query, List aggIter, + Iterator resultIter, + boolean isResultLevelCache, + BiFunction addToResultFunction + ) + { + // When using the result level cache, the agg values seen here are + // finalized values generated by AggregatorFactory.finalizeComputation(). + // These finalized values are deserialized from the cache as generic Objects, which will + // later be reserialized and returned to the user without further modification. + // Because the agg values are deserialized as generic Objects, the values are subject to the same + // type consistency issues handled by DimensionHandlerUtils.convertObjectToType() above for + // dimension values (e.g., a Float would become Double). + while (aggIter.hasNext() && resultIter.hasNext()) { + final AggregatorFactory factory = aggIter.next(); + if (isResultLevelCache) { + addToResultFunction.apply(factory.getName(), resultIter.next()); + } else { + addToResultFunction.apply(factory.getName(), factory.deserialize(resultIter.next())); + } + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index ebbb5fb19995..16350ab037b9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -555,7 +555,7 @@ public Row apply(Object input) DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue()); - Map event = Maps.newLinkedHashMap(); + final Map event = Maps.newLinkedHashMap(); Iterator dimsIter = dims.iterator(); while (dimsIter.hasNext() && results.hasNext()) { final DimensionSpec dimensionSpec = dimsIter.next(); @@ -568,21 +568,15 @@ public Row apply(Object input) } Iterator aggsIter = aggs.iterator(); - // When using the result level cache, the agg values seen here are - // finalized values generated by AggregatorFactory.finalizeComputation(). - // These finalized values are deserialized from the cache as generic Objects, which will - // later be reserialized and returned to the user without further modification. - // Because the agg values are deserialized as generic Objects, the values are subject to the same - // type consistency issues handled by DimensionHandlerUtils.convertObjectToType() above for - // dimension values (e.g., a Float would become Double). - while (aggsIter.hasNext() && results.hasNext()) { - final AggregatorFactory factory = aggsIter.next(); - if (isResultLevelCache) { - event.put(factory.getName(), results.next()); - } else { - event.put(factory.getName(), factory.deserialize(results.next())); - } - } + QueryToolChest.fetchAggregatorsFromCache( + aggsIter, + results, + isResultLevelCache, + (aggName, aggValueObject) -> { + event.put(aggName, aggValueObject); + return null; + } + ); if (isResultLevelCache) { Iterator postItr = query.getPostAggregatorSpecs().iterator(); diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index fbe6a31611c0..036191545375 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -327,28 +327,22 @@ public Function> pullFromCache(boolean isR public Result apply(@Nullable Object input) { List results = (List) input; - Map retVal = Maps.newLinkedHashMap(); + final Map retVal = Maps.newLinkedHashMap(); Iterator aggsIter = aggs.iterator(); Iterator resultIter = results.iterator(); DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue()); - // When using the result level cache, the agg values seen here are - // finalized values generated by AggregatorFactory.finalizeComputation(). - // These finalized values are deserialized from the cache as generic Objects, which will - // later be reserialized and returned to the user without further modification. - // Because the agg values are deserialized as generic Objects, the values are subject to the same - // type consistency issues handled by DimensionHandlerUtils.convertObjectToType() above for - // dimension values (e.g., a Float would become Double). - while (aggsIter.hasNext() && resultIter.hasNext()) { - final AggregatorFactory factory = aggsIter.next(); - if (isResultLevelCache) { - retVal.put(factory.getName(), resultIter.next()); - } else { - retVal.put(factory.getName(), factory.deserialize(resultIter.next())); - } - } + QueryToolChest.fetchAggregatorsFromCache( + aggsIter, + resultIter, + isResultLevelCache, + (aggName, aggValueObject) -> { + retVal.put(aggName, aggValueObject); + return null; + } + ); if (isResultLevelCache) { Iterator postItr = query.getPostAggregatorSpecs().iterator(); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 793a960c8e96..e086665b76ea 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -398,7 +398,7 @@ public Result apply(Object input) while (inputIter.hasNext()) { List result = (List) inputIter.next(); - Map vals = Maps.newLinkedHashMap(); + final Map vals = Maps.newLinkedHashMap(); Iterator aggIter = aggs.iterator(); Iterator resultIter = result.iterator(); @@ -409,21 +409,15 @@ public Result apply(Object input) DimensionHandlerUtils.convertObjectToType(resultIter.next(), query.getDimensionSpec().getOutputType()) ); - // When using the result level cache, the agg values seen here are - // finalized values generated by AggregatorFactory.finalizeComputation(). - // These finalized values are deserialized from the cache as generic Objects, which will - // later be reserialized and returned to the user without further modification. - // Because the agg values are deserialized as generic Objects, the values are subject to the same - // type consistency issues handled by DimensionHandlerUtils.convertObjectToType() above for - // dimension values (e.g., a Float would become Double). - while (aggIter.hasNext() && resultIter.hasNext()) { - final AggregatorFactory factory = aggIter.next(); - if (isResultLevelCache) { - vals.put(factory.getName(), resultIter.next()); - } else { - vals.put(factory.getName(), factory.deserialize(resultIter.next())); - } - } + QueryToolChest.fetchAggregatorsFromCache( + aggIter, + resultIter, + isResultLevelCache, + (aggName, aggValueObject) -> { + vals.put(aggName, aggValueObject); + return null; + } + ); for (PostAggregator postAgg : postAggs) { vals.put(postAgg.getName(), postAgg.compute(vals)); From 6fbc7e82553f63b8bfd4e6c82a88c635f678c93f Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 8 May 2019 13:31:31 -0700 Subject: [PATCH 5/7] Move method to CacheStrategy --- .../org/apache/druid/query/CacheStrategy.java | 30 +++++++++++++++++++ .../apache/druid/query/QueryToolChest.java | 24 --------------- .../groupby/GroupByQueryQueryToolChest.java | 2 +- .../TimeseriesQueryQueryToolChest.java | 2 +- .../query/topn/TopNQueryQueryToolChest.java | 2 +- 5 files changed, 33 insertions(+), 27 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java index 8b106a6f2b7a..f93a3955aa6d 100644 --- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java @@ -22,8 +22,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import org.apache.druid.guice.annotations.ExtensionPoint; +import org.apache.druid.query.aggregation.AggregatorFactory; +import java.util.Iterator; import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; /** */ @@ -98,4 +101,31 @@ default Function pullFromSegmentLevelCache() { return pullFromCache(false); } + + /** + * Helper function used by TopN, GroupBy, Timeseries queries in {@link #pullFromCache(boolean)}. + * When using the result level cache, the agg values seen here are + * finalized values generated by AggregatorFactory.finalizeComputation(). + * These finalized values are deserialized from the cache as generic Objects, which will + * later be reserialized and returned to the user without further modification. + * Because the agg values are deserialized as generic Objects, the values are subject to the same + * type consistency issues handled by DimensionHandlerUtils.convertObjectToType() in the pullFromCache implementations + * for dimension values (e.g., a Float would become Double). + */ + static void fetchAggregatorsFromCache( + Iterator aggIter, + Iterator resultIter, + boolean isResultLevelCache, + BiFunction addToResultFunction + ) + { + while (aggIter.hasNext() && resultIter.hasNext()) { + final AggregatorFactory factory = aggIter.next(); + if (isResultLevelCache) { + addToResultFunction.apply(factory.getName(), resultIter.next()); + } else { + addToResultFunction.apply(factory.getName(), factory.deserialize(resultIter.next())); + } + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index 5298e600ba17..645b25948c80 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -222,28 +222,4 @@ public List filterSegments(QueryType query, List aggIter, - Iterator resultIter, - boolean isResultLevelCache, - BiFunction addToResultFunction - ) - { - // When using the result level cache, the agg values seen here are - // finalized values generated by AggregatorFactory.finalizeComputation(). - // These finalized values are deserialized from the cache as generic Objects, which will - // later be reserialized and returned to the user without further modification. - // Because the agg values are deserialized as generic Objects, the values are subject to the same - // type consistency issues handled by DimensionHandlerUtils.convertObjectToType() above for - // dimension values (e.g., a Float would become Double). - while (aggIter.hasNext() && resultIter.hasNext()) { - final AggregatorFactory factory = aggIter.next(); - if (isResultLevelCache) { - addToResultFunction.apply(factory.getName(), resultIter.next()); - } else { - addToResultFunction.apply(factory.getName(), factory.deserialize(resultIter.next())); - } - } - } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 16350ab037b9..e7d1e27efb43 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -568,7 +568,7 @@ public Row apply(Object input) } Iterator aggsIter = aggs.iterator(); - QueryToolChest.fetchAggregatorsFromCache( + CacheStrategy.fetchAggregatorsFromCache( aggsIter, results, isResultLevelCache, diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 036191545375..d625c318ce2a 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -334,7 +334,7 @@ public Result apply(@Nullable Object input) DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue()); - QueryToolChest.fetchAggregatorsFromCache( + CacheStrategy.fetchAggregatorsFromCache( aggsIter, resultIter, isResultLevelCache, diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index e086665b76ea..d87a178dc077 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -409,7 +409,7 @@ public Result apply(Object input) DimensionHandlerUtils.convertObjectToType(resultIter.next(), query.getDimensionSpec().getOutputType()) ); - QueryToolChest.fetchAggregatorsFromCache( + CacheStrategy.fetchAggregatorsFromCache( aggIter, resultIter, isResultLevelCache, From f4bbdd3296e27c50ece80ffd270a43964455cc14 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 8 May 2019 13:34:03 -0700 Subject: [PATCH 6/7] Revert QueryToolChest changes --- .../src/main/java/org/apache/druid/query/QueryToolChest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index 645b25948c80..81f59e43f2e5 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -24,14 +24,11 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.base.Function; import org.apache.druid.guice.annotations.ExtensionPoint; -import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.timeline.LogicalSegment; import javax.annotation.Nullable; -import java.util.Iterator; import java.util.List; -import java.util.function.BiFunction; /** * The broker-side (also used by server in some cases) API for a specific Query type. @@ -91,7 +88,7 @@ public final JavaType getBySegmentResultType() * to allow for query-specific dimensions and metrics. That is, the ToolChest is expected to set some * meaningful dimensions for metrics given this query type. Examples might be the topN threshold for * a TopN query or the number of dimensions included for a groupBy query. - * + * *

QueryToolChests for query types in core (druid-processing) and public extensions (belonging to the Druid source * tree) should use delegate this method to {@link GenericQueryMetricsFactory#makeMetrics(Query)} on an injected * instance of {@link GenericQueryMetricsFactory}, as long as they don't need to emit custom dimensions and/or From 8032732a76ad28287571144b6a0f2310f5008326 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 8 May 2019 13:35:34 -0700 Subject: [PATCH 7/7] Update test comments --- .../druid/query/groupby/GroupByQueryQueryToolChestTest.java | 2 +- .../apache/druid/query/topn/TopNQueryQueryToolChestTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index aadbfd15bbe8..94842e08925b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -598,7 +598,7 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu ) ); - // Please see the comments on aggregator serde and type handling in GroupByQueryQueryToolChest.pullFromCache() + // Please see the comments on aggregator serde and type handling in CacheStrategy.fetchAggregatorsFromCache() final Row typeAdjustedResult2; if (valueType == ValueType.FLOAT) { typeAdjustedResult2 = new MapBasedRow( diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index 32a919926c8f..191cc557f095 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -371,7 +371,7 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu ) ); - // Please see the comments on aggregator serde and type handling in TopNQueryQueryToolChest.pullFromCache() + // Please see the comments on aggregator serde and type handling in CacheStrategy.fetchAggregatorsFromCache() final Result typeAdjustedResult2; if (valueType == ValueType.FLOAT) { typeAdjustedResult2 = new Result<>(