diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index fa7bbc0b86e8..8482fdd9e738 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -274,7 +274,10 @@ public Sequence apply(Interval interval) "finalize", false, //setting sort to false avoids unnecessary sorting while merging results. we only need to sort //in the end when returning results to user. - GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false, + //no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would return + //merged results + GROUP_BY_MERGE_KEY, false ) ) , context diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java new file mode 100644 index 000000000000..bc1b26f8b144 --- /dev/null +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -0,0 +1,188 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + * + */ + +package io.druid.query.groupby; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.collections.StupidPool; +import io.druid.data.input.Row; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.spec.LegacySegmentSpec; +import io.druid.segment.CloserRule; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.Segment; +import io.druid.segment.TestHelper; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import org.junit.Rule; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; + +/** + */ +public class GroupByQueryRunnerFactoryTest +{ + @Rule + public CloserRule closerRule = new CloserRule(true); + + @Test + public void testMergeRunnersEnsureGroupMerging() throws Exception + { + QueryRunnerFactory factory = createFactory(); + QueryRunner mergedRunner = factory.mergeRunners( + Executors.newSingleThreadExecutor(), + ImmutableList.of( + factory.createRunner(createSegment()), + factory.createRunner(createSegment()) + ) + ); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("xx") + .setQuerySegmentSpec(new LegacySegmentSpec("1970/3000")) + .setGranularity(QueryGranularity.ALL) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("tags", "tags"))) + .setAggregatorSpecs( + Arrays.asList( + new AggregatorFactory[] + { + new CountAggregatorFactory("count") + } + ) + ) + .build(); + + Sequence result = mergedRunner.run(query, Maps.newHashMap()); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t1", "count", 2L), + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t2", "count", 4L) + ); + + TestHelper.assertExpectedObjects(expectedResults, Sequences.toList(result, new ArrayList()), ""); + } + + private Segment createSegment() throws Exception + { + IncrementalIndex incrementalIndex = new OnheapIncrementalIndex( + 0, + QueryGranularity.NONE, + new AggregatorFactory[]{ + new CountAggregatorFactory("count") + }, + true, + true, + true, + 5000 + ); + + StringInputRowParser parser = new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), + "\t", + ImmutableList.of("timestamp", "product", "tags") + ), + "UTF-8" + ); + + String[] rows = new String[]{ + "2011-01-12T00:00:00.000Z,product_1,t1", + "2011-01-13T00:00:00.000Z,product_2,t2", + "2011-01-14T00:00:00.000Z,product_3,t2", + }; + + for (String row : rows) { + incrementalIndex.add(parser.parse(row)); + } + + closerRule.closeLater(incrementalIndex); + + return new IncrementalIndexSegment(incrementalIndex, "test"); + } + + private GroupByQueryRunnerFactory createFactory() + { + ObjectMapper mapper = new DefaultObjectMapper(); + + Supplier configSupplier = Suppliers.ofInstance(new GroupByQueryConfig()); + StupidPool pool = new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + }); + + QueryWatcher noopQueryWatcher = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + + GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool); + GroupByQueryQueryToolChest toolchest = new GroupByQueryQueryToolChest( + configSupplier, mapper, engine, pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + return new GroupByQueryRunnerFactory( + engine, + noopQueryWatcher, + configSupplier, + toolchest, + pool + ); + } +}