From ff95bb01a8dae1f00f3b256112463372f44500b4 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 17 Apr 2018 15:35:18 -0700 Subject: [PATCH] topN: Fix caching of Float dimension values. (#5653) Jackson would deserialize them as Doubles, leading to ClassCastExceptions in the topN processing pipeline when it attempted to treat them as Floats. --- .../query/topn/TopNQueryQueryToolChest.java | 7 +- .../topn/TopNQueryQueryToolChestTest.java | 124 ++++++++++++------ 2 files changed, 88 insertions(+), 43 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 81761afc6a37..774543d32394 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -384,6 +384,11 @@ public Result apply(Object input) Iterator inputIter = results.iterator(); DateTime timestamp = granularity.toDateTime(((Number) inputIter.next()).longValue()); + // Need a value transformer to convert generic Jackson-deserialized type into the proper type. + final Function dimValueTransformer = TopNMapFn.getValueTransformer( + query.getDimensionSpec().getOutputType() + ); + while (inputIter.hasNext()) { List result = (List) inputIter.next(); Map vals = Maps.newLinkedHashMap(); @@ -391,7 +396,7 @@ public Result apply(Object input) Iterator aggIter = aggs.iterator(); Iterator resultIter = result.iterator(); - vals.put(query.getDimensionSpec().getOutputName(), resultIter.next()); + vals.put(query.getDimensionSpec().getOutputName(), dimValueTransformer.apply(resultIter.next())); while (aggIter.hasNext() && resultIter.hasNext()) { final AggregatorFactory factory = aggIter.next(); diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java index 8938390986a5..8b85430345ac 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -47,9 +47,11 @@ import io.druid.segment.TestHelper; import io.druid.segment.TestIndex; import io.druid.segment.VirtualColumns; +import io.druid.segment.column.ValueType; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.Arrays; import java.util.Map; @@ -61,49 +63,15 @@ public class TopNQueryQueryToolChestTest @Test public void testCacheStrategy() throws Exception { - CacheStrategy, Object, TopNQuery> strategy = - new TopNQueryQueryToolChest(null, null).getCacheStrategy( - new TopNQuery( - new TableDataSource("dummy"), - VirtualColumns.EMPTY, - new DefaultDimensionSpec("test", "test"), - new NumericTopNMetricSpec("metric1"), - 3, - new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), - null, - Granularities.ALL, - ImmutableList.of(new CountAggregatorFactory("metric1")), - ImmutableList.of(new ConstantPostAggregator("post", 10)), - null - ) - ); - - final Result result = new Result<>( - // test timestamps that result in integer size millis - DateTimes.utc(123L), - new TopNResultValue( - Arrays.asList( - ImmutableMap.of( - "test", "val1", - "metric1", 2 - ) - ) - ) - ); - - Object preparedValue = strategy.prepareForCache().apply( - result - ); - - ObjectMapper objectMapper = TestHelper.makeJsonMapper(); - Object fromCacheValue = objectMapper.readValue( - objectMapper.writeValueAsBytes(preparedValue), - strategy.getCacheObjectClazz() - ); - - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + doTestCacheStrategy(ValueType.STRING, "val1"); + doTestCacheStrategy(ValueType.FLOAT, 2.1f); + doTestCacheStrategy(ValueType.DOUBLE, 2.1d); + doTestCacheStrategy(ValueType.LONG, 2L); + } - Assert.assertEquals(result, fromCacheResult); + @Test + public void testCacheStrategyWithFloatDimension() throws Exception + { } @Test @@ -215,6 +183,78 @@ public void testMinTopNThreshold() throws Exception Assert.assertEquals(2000, mockRunner.query.getThreshold()); } + private void doTestCacheStrategy(final ValueType valueType, final Object dimValue) throws IOException + { + CacheStrategy, Object, TopNQuery> strategy = + new TopNQueryQueryToolChest(null, null).getCacheStrategy( + new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test", valueType), + new NumericTopNMetricSpec("metric1"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), + null, + Granularities.ALL, + ImmutableList.of(new CountAggregatorFactory("metric1")), + ImmutableList.of(new ConstantPostAggregator("post", 10)), + null + ) + ); + + final Result result1 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TopNResultValue( + Arrays.asList( + ImmutableMap.of( + "test", dimValue, + "metric1", 2 + ) + ) + ) + ); + + Object preparedValue = strategy.prepareForCache().apply( + result1 + ); + + ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + Object fromCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedValue), + strategy.getCacheObjectClazz() + ); + + Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + + final Result result2 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TopNResultValue( + Arrays.asList( + ImmutableMap.of( + "test", dimValue, + "metric1", 2 + ) + ) + ) + ); + + Object preparedResultCacheValue = strategy.prepareForCache().apply( + result2 + ); + + Object fromResultCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultCacheValue), + strategy.getCacheObjectClazz() + ); + + Result fromResultCacheResult = strategy.pullFromCache().apply(fromResultCacheValue); + Assert.assertEquals(result2, fromResultCacheResult); + } + static class MockQueryRunner implements QueryRunner> { private final QueryRunner> runner;