diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 49f1b72917cc..e6995b333a49 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -36,6 +36,7 @@ import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.ColumnSelectorStrategyFactory; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.epinephelinae.column.DictionaryBuildingStringGroupByColumnSelectorStrategy; @@ -360,6 +361,8 @@ public Row apply(final Grouper.Entry entry) ); } + convertRowTypesToOutputTypes(query.getDimensions(), theMap); + // Add aggregations. for (int i = 0; i < entry.getValues().length; i++) { theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); @@ -402,6 +405,34 @@ public void close() } } + private static void convertRowTypesToOutputTypes(List dimensionSpecs, Map rowMap) + { + for (DimensionSpec dimSpec : dimensionSpecs) { + final ValueType outputType = dimSpec.getOutputType(); + rowMap.compute( + dimSpec.getOutputName(), + (dimName, baseVal) -> { + switch (outputType) { + case STRING: + baseVal = baseVal == null ? "" : baseVal.toString(); + break; + case LONG: + baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal); + baseVal = baseVal == null ? 0L : baseVal; + break; + case FLOAT: + baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal); + baseVal = baseVal == null ? 0.f : baseVal; + break; + default: + throw new IAE("Unsupported type: " + outputType); + } + return baseVal; + } + ); + } + } + private static class GroupByEngineKeySerde implements Grouper.KeySerde { private final int keySize; diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 85feca8e6800..fadd47198cf2 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import io.druid.collections.BlockingPool; import io.druid.collections.DefaultBlockingPool; @@ -48,6 +49,7 @@ import io.druid.js.JavaScriptConfig; import io.druid.query.BySegmentResultValue; import io.druid.query.BySegmentResultValueClass; +import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.DruidProcessingConfig; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; @@ -58,6 +60,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; @@ -9129,4 +9132,58 @@ public void testSubqueryWithMultipleIntervalsInOuterQueryWithLimitPushDown() Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, ""); } + + @Test + public void testTypeConversionWithMergingChainedExecutionRunner() + { + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { + expectedException.expect(UnsupportedOperationException.class); + expectedException.expectMessage("GroupBy v1 only supports dimensions with an outputType of STRING."); + } + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new ExtractionDimensionSpec("quality", "qualityLen", ValueType.LONG, StrlenExtractionFn.instance()) + )) + .setDimFilter(new SelectorDimFilter( + "quality", + "technology", + null + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "qualityLen", 10L, "rows", 2L, "idx", 156L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "qualityLen", 10L, "rows", 2L, "idx", 194L) + ); + + ChainedExecutionQueryRunner ceqr = new ChainedExecutionQueryRunner( + MoreExecutors.sameThreadExecutor(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + return; + } + }, + ImmutableList.>of(runner, runner) + ); + + QueryRunner mergingRunner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.>of(ceqr)); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, mergingRunner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } }