From c9e091da50ec7d4d3ce7d5c8a53cbd90a357594d Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 29 Jun 2017 15:05:35 -0700 Subject: [PATCH 1/4] Fix GroupBy type cast error when ChainedExecutionQueryRunner merges multiple runners --- .../epinephelinae/GroupByQueryEngineV2.java | 22 +++++++ .../query/groupby/GroupByQueryRunnerTest.java | 57 +++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 49f1b72917cc..c710c4aee2ce 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -36,6 +36,7 @@ import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.ColumnSelectorStrategyFactory; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.epinephelinae.column.DictionaryBuildingStringGroupByColumnSelectorStrategy; @@ -360,6 +361,27 @@ public Row apply(final Grouper.Entry entry) ); } + // Convert dimension types to specified output types + for (DimensionSpec dimSpec : query.getDimensions()) { + Object baseVal = theMap.get(dimSpec.getOutputName()); + switch (dimSpec.getOutputType()) { + case STRING: + baseVal = baseVal == null ? "" : baseVal.toString(); + break; + case LONG: + baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal); + baseVal = baseVal == null ? 0L : baseVal; + break; + case FLOAT: + baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal); + baseVal = baseVal == null ? 0.f : baseVal; + break; + default: + throw new IAE("Unsupported type: " + dimSpec.getOutputType()); + } + theMap.put(dimSpec.getOutputName(), baseVal); + } + // Add aggregations. for (int i = 0; i < entry.getValues().length; i++) { theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 75239edea177..c6607d8dea54 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import io.druid.collections.BlockingPool; import io.druid.collections.DefaultBlockingPool; @@ -49,6 +50,7 @@ import io.druid.js.JavaScriptConfig; import io.druid.query.BySegmentResultValue; import io.druid.query.BySegmentResultValueClass; +import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.DruidProcessingConfig; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; @@ -59,6 +61,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; @@ -9130,4 +9133,58 @@ public void testSubqueryWithMultipleIntervalsInOuterQueryWithLimitPushDown() Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, ""); } + + @Test + public void testTypeConversionWithMergingChainedExecutionRunner() + { + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { + expectedException.expect(UnsupportedOperationException.class); + expectedException.expectMessage("GroupBy v1 only supports dimensions with an outputType of STRING."); + } + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new ExtractionDimensionSpec("quality", "qualityLen", ValueType.LONG, StrlenExtractionFn.instance()) + )) + .setDimFilter(new SelectorDimFilter( + "quality", + "technology", + null + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "qualityLen", 10L, "rows", 2L, "idx", 156L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "qualityLen", 10L, "rows", 2L, "idx", 194L) + ); + + ChainedExecutionQueryRunner ceqr = new ChainedExecutionQueryRunner( + MoreExecutors.sameThreadExecutor(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + return; + } + }, + ImmutableList.>of(runner, runner) + ); + + QueryRunner mergingRunner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.>of(ceqr)); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, mergingRunner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } } From d0c9a9f778d21a3e31473300c93319121e8bbc3b Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 29 Jun 2017 16:47:04 -0700 Subject: [PATCH 2/4] Move conversion step to separate method --- .../epinephelinae/GroupByQueryEngineV2.java | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index c710c4aee2ce..971099ef7633 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -362,25 +362,7 @@ public Row apply(final Grouper.Entry entry) } // Convert dimension types to specified output types - for (DimensionSpec dimSpec : query.getDimensions()) { - Object baseVal = theMap.get(dimSpec.getOutputName()); - switch (dimSpec.getOutputType()) { - case STRING: - baseVal = baseVal == null ? "" : baseVal.toString(); - break; - case LONG: - baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal); - baseVal = baseVal == null ? 0L : baseVal; - break; - case FLOAT: - baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal); - baseVal = baseVal == null ? 0.f : baseVal; - break; - default: - throw new IAE("Unsupported type: " + dimSpec.getOutputType()); - } - theMap.put(dimSpec.getOutputName(), baseVal); - } + convertRowTypesToOutputTypes(query.getDimensions(), theMap); // Add aggregations. for (int i = 0; i < entry.getValues().length; i++) { @@ -424,6 +406,29 @@ public void close() } } + private static void convertRowTypesToOutputTypes(List dimensionSpecs, Map rowMap) + { + for (DimensionSpec dimSpec : dimensionSpecs) { + Object baseVal = rowMap.get(dimSpec.getOutputName()); + switch (dimSpec.getOutputType()) { + case STRING: + baseVal = baseVal == null ? "" : baseVal.toString(); + break; + case LONG: + baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal); + baseVal = baseVal == null ? 0L : baseVal; + break; + case FLOAT: + baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal); + baseVal = baseVal == null ? 0.f : baseVal; + break; + default: + throw new IAE("Unsupported type: " + dimSpec.getOutputType()); + } + rowMap.put(dimSpec.getOutputName(), baseVal); + } + } + private static class GroupByEngineKeySerde implements Grouper.KeySerde { private final int keySize; From 3ed105716f4db65785ff2d88e331744490f78c2e Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 29 Jun 2017 17:50:25 -0700 Subject: [PATCH 3/4] Remove unnecessary comment --- .../druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 971099ef7633..932d2af2b461 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -361,7 +361,6 @@ public Row apply(final Grouper.Entry entry) ); } - // Convert dimension types to specified output types convertRowTypesToOutputTypes(query.getDimensions(), theMap); // Add aggregations. From 9dfd18db697d22bf6907d186136fc641127248b3 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 30 Jun 2017 13:09:20 -0700 Subject: [PATCH 4/4] Use compute to update map --- .../epinephelinae/GroupByQueryEngineV2.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 932d2af2b461..e6995b333a49 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -408,23 +408,28 @@ public void close() private static void convertRowTypesToOutputTypes(List dimensionSpecs, Map rowMap) { for (DimensionSpec dimSpec : dimensionSpecs) { - Object baseVal = rowMap.get(dimSpec.getOutputName()); - switch (dimSpec.getOutputType()) { - case STRING: - baseVal = baseVal == null ? "" : baseVal.toString(); - break; - case LONG: - baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal); - baseVal = baseVal == null ? 0L : baseVal; - break; - case FLOAT: - baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal); - baseVal = baseVal == null ? 0.f : baseVal; - break; - default: - throw new IAE("Unsupported type: " + dimSpec.getOutputType()); - } - rowMap.put(dimSpec.getOutputName(), baseVal); + final ValueType outputType = dimSpec.getOutputType(); + rowMap.compute( + dimSpec.getOutputName(), + (dimName, baseVal) -> { + switch (outputType) { + case STRING: + baseVal = baseVal == null ? "" : baseVal.toString(); + break; + case LONG: + baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal); + baseVal = baseVal == null ? 0L : baseVal; + break; + case FLOAT: + baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal); + baseVal = baseVal == null ? 0.f : baseVal; + break; + default: + throw new IAE("Unsupported type: " + outputType); + } + return baseVal; + } + ); } }