From 30669380e01772529932843ba3f004484e9e712a Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 25 Jan 2018 14:04:56 -0600 Subject: [PATCH 1/6] add subtotalsSpec attribute to groupBy query --- .../io/druid/query/groupby/GroupByQuery.java | 61 +++++++ .../groupby/GroupByQueryQueryToolChest.java | 26 ++- .../epinephelinae/GroupByRowProcessor.java | 118 ++++++-------- .../epinephelinae/RowBasedGrouperHelper.java | 38 ++++- .../groupby/strategy/GroupByStrategy.java | 11 ++ .../groupby/strategy/GroupByStrategyV1.java | 18 ++- .../groupby/strategy/GroupByStrategyV2.java | 153 +++++++++++++++--- .../query/groupby/GroupByQueryRunnerTest.java | 113 +++++++++++++ .../io/druid/sql/calcite/rel/DruidQuery.java | 1 + 9 files changed, 441 insertions(+), 98 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 9ca527ea17a1..80ee519d2a6b 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -99,6 +100,7 @@ public static Builder builder() private final List dimensions; private final List aggregatorSpecs; private final List postAggregatorSpecs; + private final List> subtotalsSpec; private final boolean applyLimitPushDown; private final Function, Sequence> postProcessingFn; @@ -115,6 +117,7 @@ public GroupByQuery( @JsonProperty("postAggregations") List postAggregatorSpecs, @JsonProperty("having") HavingSpec havingSpec, @JsonProperty("limitSpec") LimitSpec limitSpec, + @JsonProperty("subtotalsSpec") List> subtotalsSpec, @JsonProperty("context") Map context ) { @@ -129,6 +132,7 @@ public GroupByQuery( postAggregatorSpecs, havingSpec, limitSpec, + subtotalsSpec, null, context ); @@ -166,6 +170,7 @@ private GroupByQuery( final List postAggregatorSpecs, final HavingSpec havingSpec, final LimitSpec limitSpec, + final @Nullable List> subtotalsSpec, final @Nullable Function, Sequence> postProcessingFn, final Map context ) @@ -188,6 +193,32 @@ private GroupByQuery( this.havingSpec = havingSpec; this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec); + // if subtotalsSpec exists then validate that all are subsets of dimensions spec and are in same order. + // For example if we had {D1, D2, D3} in dimensions spec then + // {D2}, {D1, D2}, {D1, D3}, {D2, D3} etc are valid in subtotalsSpec while + // {D2, D1} is not as it is not in same order. + // {D4} is not as its not a subset. + // This restriction as enforced because implementation does sort merge on the results of top-level query + // results and expects that ordering of events does not change when dimension columns are removed from + // results of top level query. + if (subtotalsSpec != null) { + for (List subtotalSpec : subtotalsSpec) { + int i = 0; + for (String s : subtotalSpec) { + boolean found = false; + for (; i < dimensions.size(); i++) { + if (s.equals(dimensions.get(i).getOutputName())) { + found = true; + break; + } + } + if (!found) { + throw new IAE("Subtotal spec %s is either not a subset or items are in different order than in dimensiosn spec.", subtotalSpec); + } + } + } + } + this.subtotalsSpec = subtotalsSpec; // Verify no duplicate names between dimensions, aggregators, and postAggregators. // They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other. @@ -242,6 +273,13 @@ public LimitSpec getLimitSpec() return limitSpec; } + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("subtotalsSpec") + public List> getSubtotalsSpec() + { + return subtotalsSpec; + } + @Override public boolean hasFilters() { @@ -322,6 +360,10 @@ private boolean validateAndGetForceLimitPushDown() public boolean determineApplyLimitPushDown() { + if (subtotalsSpec != null) { + return false; + } + final boolean forceLimitPushDown = validateAndGetForceLimitPushDown(); if (limitSpec instanceof DefaultLimitSpec) { @@ -647,6 +689,16 @@ public GroupByQuery withLimitSpec(LimitSpec limitSpec) return new Builder(this).setLimitSpec(limitSpec).build(); } + public GroupByQuery withAggregatorSpecs(final List aggregatorSpecs) + { + return new Builder(this).setAggregatorSpecs(aggregatorSpecs).build(); + } + + public GroupByQuery withSubtotalsSpec(final List> subtotalsSpec) + { + return new Builder(this).setSubtotalsSpec(subtotalsSpec).build(); + } + public GroupByQuery withPostAggregatorSpecs(final List postAggregatorSpecs) { return new Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build(); @@ -706,6 +758,7 @@ public static class Builder private Map context; + private List> subtotalsSpec = null; private LimitSpec limitSpec = null; private Function, Sequence> postProcessingFn; private List orderByColumnSpecs = Lists.newArrayList(); @@ -727,6 +780,7 @@ public Builder(GroupByQuery query) postAggregatorSpecs = query.getPostAggregatorSpecs(); havingSpec = query.getHavingSpec(); limitSpec = query.getLimitSpec(); + subtotalsSpec = query.subtotalsSpec; postProcessingFn = query.postProcessingFn; context = query.getContext(); } @@ -807,6 +861,12 @@ public Builder setLimit(int limit) return this; } + public Builder setSubtotalsSpec(List> subtotalsSpec) + { + this.subtotalsSpec = subtotalsSpec; + return this; + } + public Builder addOrderByColumn(String dimension) { return addOrderByColumn(dimension, null); @@ -967,6 +1027,7 @@ public GroupByQuery build() postAggregatorSpecs, havingSpec, theLimitSpec, + subtotalsSpec, postProcessingFn, context ); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 28d5acb42de0..1d102f3022cd 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -216,9 +216,31 @@ private Sequence mergeGroupByResults( finalizingResults = subqueryResult; } - return groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults); + if (query.getSubtotalsSpec() != null) { + return groupByStrategy.processSubtotalsSpec( + query, + resource, + groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults) + ); + } else { + return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult( + subquery, + query, + resource, + finalizingResults + ), query); + } + } else { - return groupByStrategy.mergeResults(runner, query, context); + if (query.getSubtotalsSpec() != null) { + return groupByStrategy.processSubtotalsSpec( + query, + resource, + groupByStrategy.mergeResults(runner, query, context) + ); + } else { + return groupByStrategy.applyPostProcessing(groupByStrategy.mergeResults(runner, query, context), query); + } } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 8be060028e9d..6bdd335d85fd 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicate; import com.google.common.base.Supplier; -import com.google.common.collect.Lists; import io.druid.collections.ResourceHolder; import io.druid.common.guava.SettableSupplier; import io.druid.data.input.Row; @@ -30,7 +29,6 @@ import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.BaseSequence; -import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.FilteredSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.query.Query; @@ -51,7 +49,6 @@ import java.io.Closeable; import java.io.File; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -59,7 +56,7 @@ public class GroupByRowProcessor { - public static Sequence process( + public static Grouper createGrouper( final Query queryParam, final Sequence rows, final Map rowSignature, @@ -67,7 +64,8 @@ public static Sequence process( final GroupByQueryResource resource, final ObjectMapper spillMapper, final String processingTmpDir, - final int mergeBufferSize + final int mergeBufferSize, + final List closeOnExit ) { final GroupByQuery query = (GroupByQuery) queryParam; @@ -123,75 +121,59 @@ public boolean apply(Row input) } ); + final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( + temporaryStorageDirectory, + querySpecificConfig.getMaxOnDiskStorage() + ); + + closeOnExit.add(temporaryStorage); + + Pair, Accumulator> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( + query, + true, + rowSignature, + querySpecificConfig, + new Supplier() + { + @Override + public ByteBuffer get() + { + final ResourceHolder mergeBufferHolder = resource.getMergeBuffer(); + closeOnExit.add(mergeBufferHolder); + return mergeBufferHolder.get(); + } + }, + temporaryStorage, + spillMapper, + aggregatorFactories, + mergeBufferSize + ); + final Grouper grouper = pair.lhs; + final Accumulator accumulator = pair.rhs; + closeOnExit.add(grouper); + + final AggregateResult retVal = filteredSequence.accumulate(AggregateResult.ok(), accumulator); + if (!retVal.isOk()) { + throw new ResourceLimitExceededException(retVal.getReason()); + } + + return grouper; + } + + public static Sequence getRowsFromGrouper(GroupByQuery query, List subtotalSpec, Supplier grouper) + { return new BaseSequence<>( new BaseSequence.IteratorMaker>() { @Override public CloseableGrouperIterator make() { - // This contains all closeable objects which are closed when the returned iterator iterates all the elements, - // or an exceptions is thrown. The objects are closed in their reverse order. - final List closeOnExit = Lists.newArrayList(); - - try { - final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( - temporaryStorageDirectory, - querySpecificConfig.getMaxOnDiskStorage() - ); - - closeOnExit.add(temporaryStorage); - - Pair, Accumulator> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( - query, - true, - rowSignature, - querySpecificConfig, - new Supplier() - { - @Override - public ByteBuffer get() - { - final ResourceHolder mergeBufferHolder = resource.getMergeBuffer(); - closeOnExit.add(mergeBufferHolder); - return mergeBufferHolder.get(); - } - }, - temporaryStorage, - spillMapper, - aggregatorFactories, - mergeBufferSize - ); - final Grouper grouper = pair.lhs; - final Accumulator accumulator = pair.rhs; - closeOnExit.add(grouper); - - final AggregateResult retVal = filteredSequence.accumulate(AggregateResult.ok(), accumulator); - if (!retVal.isOk()) { - throw new ResourceLimitExceededException(retVal.getReason()); - } - - return RowBasedGrouperHelper.makeGrouperIterator( - grouper, - query, - new Closeable() - { - @Override - public void close() throws IOException - { - for (Closeable closeable : Lists.reverse(closeOnExit)) { - CloseQuietly.close(closeable); - } - } - } - ); - } - catch (Throwable e) { - // Exception caught while setting up the iterator; release resources. - for (Closeable closeable : Lists.reverse(closeOnExit)) { - CloseQuietly.close(closeable); - } - throw e; - } + return RowBasedGrouperHelper.makeGrouperIterator( + grouper.get(), + query, + subtotalSpec, + () -> {} + ); } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 290cf13753bf..269a1c07eef2 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -78,6 +78,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -411,6 +412,16 @@ public static CloseableGrouperIterator makeGrouperIterator( final GroupByQuery query, final Closeable closeable ) + { + return makeGrouperIterator(grouper, query, null, closeable); + } + + public static CloseableGrouperIterator makeGrouperIterator( + final Grouper grouper, + final GroupByQuery query, + final List dimsToInclude, + final Closeable closeable + ) { final boolean includeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query) == null; @@ -437,12 +448,27 @@ public Row apply(Grouper.Entry entry) } // Add dimensions. - for (int i = dimStart; i < entry.getKey().getKey().length; i++) { - Object dimVal = entry.getKey().getKey()[i]; - theMap.put( - query.getDimensions().get(i - dimStart).getOutputName(), - dimVal instanceof String ? Strings.emptyToNull((String) dimVal) : dimVal - ); + if (dimsToInclude == null) { + for (int i = dimStart; i < entry.getKey().getKey().length; i++) { + Object dimVal = entry.getKey().getKey()[i]; + theMap.put( + query.getDimensions().get(i - dimStart).getOutputName(), + dimVal instanceof String ? Strings.emptyToNull((String) dimVal) : dimVal + ); + } + } else { + Map dimensions = new HashMap<>(); + for (int i = dimStart; i < entry.getKey().getKey().length; i++) { + Object dimVal = entry.getKey().getKey()[i]; + dimensions.put( + query.getDimensions().get(i - dimStart).getOutputName(), + dimVal instanceof String ? Strings.emptyToNull((String) dimVal) : dimVal + ); + } + + for (String dimToInclude : dimsToInclude) { + theMap.put(dimToInclude, dimensions.get(dimToInclude)); + } } // Add aggregations. diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java index 2ba34178885c..d8586de6ad3b 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java @@ -73,6 +73,11 @@ Sequence mergeResults( Map responseContext ); + Sequence applyPostProcessing( + Sequence results, + GroupByQuery query + ); + Sequence processSubqueryResult( GroupByQuery subquery, GroupByQuery query, @@ -80,6 +85,12 @@ Sequence processSubqueryResult( Sequence subqueryResult ); + Sequence processSubtotalsSpec( + GroupByQuery query, + GroupByQueryResource resource, + Sequence queryResult + ); + QueryRunner mergeRunners( ListeningExecutorService exec, Iterable> queryRunners diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 180659999516..208d38105558 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -147,7 +147,15 @@ public Sequence mergeResults( true ); - return Sequences.withBaggage(query.postProcess(GroupByQueryHelper.postAggregate(query, index)), index); + return Sequences.withBaggage(GroupByQueryHelper.postAggregate(query, index), index); + } + + @Override + public Sequence applyPostProcessing( + Sequence results, GroupByQuery query + ) + { + return query.postProcess(results); } @Override @@ -255,6 +263,14 @@ public Sequence apply(Interval interval) ); } + @Override + public Sequence processSubtotalsSpec( + GroupByQuery query, GroupByQueryResource resource, Sequence queryResult + ) + { + throw new UnsupportedOperationException("subtotalsSpec is not supported for v1 groupBy strategy."); + } + @Override public QueryRunner mergeRunners( final ListeningExecutorService exec, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 5ad75004b760..5940451bdbf1 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListeningExecutorService; @@ -38,6 +40,7 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularity; +import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; @@ -54,6 +57,7 @@ import io.druid.query.ResourceLimitExceededException; import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryHelper; @@ -62,14 +66,18 @@ import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import io.druid.query.groupby.epinephelinae.GroupByRowProcessor; +import io.druid.query.groupby.epinephelinae.Grouper; import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.segment.StorageAdapter; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class GroupByStrategyV2 implements GroupByStrategy { @@ -131,7 +139,8 @@ public static DateTime getUniversalTimestamp(final GroupByQuery query) public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners) { if (!willMergeRunners) { - final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1); + final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) + + (query.getSubtotalsSpec() != null ? 1 : 0); if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { throw new ResourceLimitExceededException( @@ -241,6 +250,7 @@ protected BinaryFn createMergeFn(Query queryParam) // Don't do "having" clause until the end of this method. null, query.getLimitSpec(), + query.getSubtotalsSpec(), query.getContext() ).withOverriddenContext( ImmutableMap.of( @@ -253,7 +263,7 @@ protected BinaryFn createMergeFn(Query queryParam) ) ); - Sequence rowSequence = Sequences.map( + return Sequences.map( mergingQueryRunner.run( QueryPlus.wrap(newQuery), responseContext @@ -289,12 +299,18 @@ public Row apply(final Row row) } } ); + } + @Override + public Sequence applyPostProcessing( + Sequence results, GroupByQuery query + ) + { // Don't apply limit here for inner results, that will be pushed down to the BufferHashGrouper if (query.getContextBoolean(CTX_KEY_OUTERMOST, true)) { - return query.postProcess(rowSequence); + return query.postProcess(results); } else { - return rowSequence; + return results; } } @@ -306,24 +322,119 @@ public Sequence processSubqueryResult( Sequence subqueryResult ) { - final Sequence results = GroupByRowProcessor.process( - query, - subqueryResult, - GroupByQueryHelper.rowSignatureFor(subquery), - configSupplier.get(), - resource, - spillMapper, - processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes() - ); - return mergeResults(new QueryRunner() - { - @Override - public Sequence run(QueryPlus queryPlus, Map responseContext) - { - return results; + // This contains all closeable objects which are closed when the returned iterator iterates all the elements, + // or an exceptions is thrown. The objects are closed in their reverse order. + final List closeOnExit = Lists.newArrayList(); + + try { + Supplier grouperSupplier = Suppliers.memoize( + () -> GroupByRowProcessor.createGrouper( + query, + subqueryResult, + GroupByQueryHelper.rowSignatureFor(subquery), + configSupplier.get(), + resource, + spillMapper, + processingConfig.getTmpDir(), + processingConfig.intermediateComputeSizeBytes(), + closeOnExit + ) + ); + + return Sequences.withBaggage( + mergeResults(new QueryRunner() + { + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + return GroupByRowProcessor.getRowsFromGrouper( + query, + null, + grouperSupplier + ); + } + }, query, null), + () -> Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable)) + ); + } + catch (Exception ex) { + Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable)); + throw ex; + } + } + + @Override + public Sequence processSubtotalsSpec( + GroupByQuery query, + GroupByQueryResource resource, + Sequence queryResult + ) + { + // This contains all closeable objects which are closed when the returned iterator iterates all the elements, + // or an exceptions is thrown. The objects are closed in their reverse order. + final List closeOnExit = Lists.newArrayList(); + + try { + GroupByQuery queryWithoutSubtotalsSpec = query.withSubtotalsSpec(null); + List> subtotals = query.getSubtotalsSpec(); + + Supplier grouperSupplier = Suppliers.memoize( + () -> GroupByRowProcessor.createGrouper( + queryWithoutSubtotalsSpec.withAggregatorSpecs( + Lists.transform(queryWithoutSubtotalsSpec.getAggregatorSpecs(), (agg) -> agg.getCombiningFactory()) + ).withDimensionSpecs( + Lists.transform( + queryWithoutSubtotalsSpec.getDimensions(), + (dimSpec) -> new DefaultDimensionSpec( + dimSpec.getOutputName(), + dimSpec.getOutputName() + ) + ) + ), + queryResult, + GroupByQueryHelper.rowSignatureFor(queryWithoutSubtotalsSpec), + configSupplier.get(), + resource, + spillMapper, + processingConfig.getTmpDir(), + processingConfig.intermediateComputeSizeBytes(), + closeOnExit + ) + ); + List> subtotalsResults = new ArrayList<>(subtotals.size()); + + for (List subtotalSpec : subtotals) { + GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec.withDimensionSpecs( + subtotalSpec.stream().map(s -> new DefaultDimensionSpec(s, s)).collect(Collectors.toList()) + ); + + subtotalsResults.add(applyPostProcessing( + mergeResults(new QueryRunner() + { + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + return GroupByRowProcessor.getRowsFromGrouper( + queryWithoutSubtotalsSpec, + subtotalSpec, + grouperSupplier + ); + } + }, subtotalQuery, null), + subtotalQuery + ) + ); } - }, query, null); + + return Sequences.withBaggage( + Sequences.concat(subtotalsResults), + () -> Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable)) + ); + } + catch (Exception ex) { + Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable)); + throw ex; + } } @Override diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 0eda36786656..b6a2a1bd2ff9 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6117,6 +6117,119 @@ public void testSubqueryWithFirstLast() TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testGroupByWithSubtotalsSpec() + { + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", "market"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of("market"), + ImmutableList.of() + )) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L, "idxFloat", 135.88510131835938f, "idxDouble", 135.88510131835938d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L, "idxFloat", 118.57034, "idxDouble", 118.57034), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L, "idxFloat", 158.747224, "idxDouble", 158.747224), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L, "idxFloat", 120.134704, "idxDouble", 120.134704), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L, "idxFloat", 2871.8866900000003f, "idxDouble", 2871.8866900000003d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L, "idxFloat", 121.58358f, "idxDouble", 121.58358d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L, "idxFloat", 2900.798647f, "idxDouble", 2900.798647d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L, "idxFloat", 78.622547f, "idxDouble", 78.622547d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L, "idxFloat", 119.922742f, "idxDouble", 119.922742d), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L, "idxFloat", 147.42593f, "idxDouble", 147.42593d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L, "idxFloat", 112.987027f, "idxDouble", 112.987027d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L, "idxFloat", 166.016049f, "idxDouble", 166.016049d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L, "idxFloat", 113.446008f, "idxDouble", 113.446008d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L, "idxFloat", 2448.830613f, "idxDouble", 2448.830613d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L, "idxFloat", 114.290141f, "idxDouble", 114.290141d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L, "idxFloat", 2506.415148f, "idxDouble", 2506.415148d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L, "idxFloat", 97.387433f, "idxDouble", 97.387433d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L, "idxFloat", 126.411364f, "idxDouble", 126.411364d), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 643.043177, "idxFloat", 643.043212890625, "rows", 5L, "idx", 640L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "total_market", "idxDouble", 1314.839715, "idxFloat", 1314.8397, "rows", 1L, "idx", 1314L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "upfront", "idxDouble", 1447.34116, "idxFloat", 1447.3412, "rows", 1L, "idx", 1447L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 266.090949, "idxFloat", 266.0909423828125, "rows", 2L, "idx", 265L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "total_market", "idxDouble", 1522.043733, "idxFloat", 1522.0437, "rows", 1L, "idx", 1522L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "upfront", "idxDouble", 1234.247546, "idxFloat", 1234.2476, "rows", 1L, "idx", 1234L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 198.545289, "idxFloat", 198.5452880859375, "rows", 2L, "idx", 197L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 650.806953, "idxFloat", 650.8069458007812, "rows", 5L, "idx", 648L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "total_market", "idxDouble", 1193.556278, "idxFloat", 1193.5563, "rows", 1L, "idx", 1193L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "upfront", "idxDouble", 1144.342401, "idxFloat", 1144.3424, "rows", 1L, "idx", 1144L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 249.591647, "idxFloat", 249.59164428710938, "rows", 2L, "idx", 249L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "total_market", "idxDouble", 1321.375057, "idxFloat", 1321.375, "rows", 1L, "idx", 1321L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "upfront", "idxDouble", 1049.738585, "idxFloat", 1049.7385, "rows", 1L, "idx", 1049L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 223.798797, "idxFloat", 223.79879760742188, "rows", 2L, "idx", 223L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 6626.151575318359, "idxFloat", 6626.152f, "rows", 13L, "idx", 6619L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 5833.209713, "idxFloat", 5833.209f, "rows", 13L, "idx", 5827L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testGroupByWithSubtotalsSpecWithOrderLimit() + { + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", "market"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of("market"), + ImmutableList.of() + )) + .addOrderByColumn("idxDouble") + .setLimit(1) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L, "idxFloat", 78.622547f, "idxDouble", 78.622547d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 198.545289, "idxFloat", 198.5452880859375, "rows", 2L, "idx", 197L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 6626.151575318359, "idxFloat", 6626.152f, "rows", 13L, "idx", 6619L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + @Test public void testGroupByWithTimeColumn() { diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java index bca4481992ff..5c84eaba86b1 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java @@ -808,6 +808,7 @@ public GroupByQuery toGroupByQuery() grouping.getPostAggregators(), grouping.getHavingFilter() != null ? new DimFilterHavingSpec(grouping.getHavingFilter(), true) : null, limitSpec, + null, ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) ); } From c7e783ff5e2d4e05f6a4fd3bf66c1c79c59d0e1a Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 7 Feb 2018 11:34:39 -0600 Subject: [PATCH 2/6] dont sent subtotalsSpec to downstream nodes from broker and other updates --- .../io/druid/query/groupby/GroupByQuery.java | 31 ++++++++++++------- .../groupby/GroupByQueryQueryToolChest.java | 2 +- .../strategy/GroupByStrategySelector.java | 6 ++++ 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 80ee519d2a6b..10a677f56464 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -193,6 +193,21 @@ private GroupByQuery( this.havingSpec = havingSpec; this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec); + this.subtotalsSpec = verifySubtotalsSpec(subtotalsSpec, dimensions); + + // Verify no duplicate names between dimensions, aggregators, and postAggregators. + // They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other. + // We're not counting __time, even though that name is problematic. See: https://github.com/druid-io/druid/pull/3684 + verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); + + this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn(); + + // Check if limit push down configuration is valid and check if limit push down will be applied + this.applyLimitPushDown = determineApplyLimitPushDown(); + } + + private List> verifySubtotalsSpec(List> subtotalsSpec, List dimensions) + { // if subtotalsSpec exists then validate that all are subsets of dimensions spec and are in same order. // For example if we had {D1, D2, D3} in dimensions spec then // {D2}, {D1, D2}, {D1, D3}, {D2, D3} etc are valid in subtotalsSpec while @@ -213,22 +228,16 @@ private GroupByQuery( } } if (!found) { - throw new IAE("Subtotal spec %s is either not a subset or items are in different order than in dimensiosn spec.", subtotalSpec); + throw new IAE( + "Subtotal spec %s is either not a subset or items are in different order than in dimensiosn spec.", + subtotalSpec + ); } } } } - this.subtotalsSpec = subtotalsSpec; - - // Verify no duplicate names between dimensions, aggregators, and postAggregators. - // They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other. - // We're not counting __time, even though that name is problematic. See: https://github.com/druid-io/druid/pull/3684 - verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); - this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn(); - - // Check if limit push down configuration is valid and check if limit push down will be applied - this.applyLimitPushDown = determineApplyLimitPushDown(); + return subtotalsSpec; } @JsonProperty diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 1d102f3022cd..e5368b19b8ad 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -236,7 +236,7 @@ private Sequence mergeGroupByResults( return groupByStrategy.processSubtotalsSpec( query, resource, - groupByStrategy.mergeResults(runner, query, context) + groupByStrategy.mergeResults(runner, query.withSubtotalsSpec(null), context) ); } else { return groupByStrategy.applyPostProcessing(groupByStrategy.mergeResults(runner, query, context), query); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java index e222f14d0796..b5574d583096 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java @@ -21,6 +21,7 @@ import com.google.common.base.Supplier; import com.google.inject.Inject; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; @@ -55,6 +56,11 @@ public GroupByStrategy strategize(GroupByQuery query) return strategyV2; case STRATEGY_V1: + // Fail early if subtotals were asked from GroupBy V1 + if (query.getSubtotalsSpec() != null) { + throw new IAE("GroupBy Strategy [%s] does not support subtotalsSpec.", STRATEGY_V1); + } + return strategyV1; default: From e6746f0ddec72801956110db5a8b6dd8ab12756c Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 28 May 2018 22:33:15 -0700 Subject: [PATCH 3/6] address review comment --- .../io/druid/query/groupby/GroupByQuery.java | 2 +- .../groupby/strategy/GroupByStrategyV2.java | 14 +++- .../query/groupby/GroupByQueryRunnerTest.java | 75 +++++++++++++++++++ 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index d9835f68ea68..c0e953c0a7da 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -234,7 +234,7 @@ private List> verifySubtotalsSpec(List> subtotalsSpec, } if (!found) { throw new IAE( - "Subtotal spec %s is either not a subset or items are in different order than in dimensiosn spec.", + "Subtotal spec %s is either not a subset or items are in different order than in dimensions.", subtotalSpec ); } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index fc1b34979691..8ce0e4908128 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -58,6 +58,7 @@ import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryHelper; @@ -75,6 +76,7 @@ import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -387,7 +389,8 @@ public Sequence processSubtotalsSpec( queryWithoutSubtotalsSpec.getDimensions(), (dimSpec) -> new DefaultDimensionSpec( dimSpec.getOutputName(), - dimSpec.getOutputName() + dimSpec.getOutputName(), + dimSpec.getOutputType() ) ) ), @@ -403,9 +406,16 @@ public Sequence processSubtotalsSpec( ); List> subtotalsResults = new ArrayList<>(subtotals.size()); + Map queryDimensionSpecs = new HashMap(queryWithoutSubtotalsSpec.getDimensions().size()); + for (DimensionSpec dimSpec : queryWithoutSubtotalsSpec.getDimensions()) { + queryDimensionSpecs.put(dimSpec.getOutputName(), dimSpec); + } + for (List subtotalSpec : subtotals) { GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec.withDimensionSpecs( - subtotalSpec.stream().map(s -> new DefaultDimensionSpec(s, s)).collect(Collectors.toList()) + subtotalSpec.stream() + .map(s -> new DefaultDimensionSpec(s, s, queryDimensionSpecs.get(s).getOutputType())) + .collect(Collectors.toList()) ); subtotalsResults.add(applyPostProcessing( diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index da0be77e3ea2..d50dd3e34f30 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6252,6 +6252,81 @@ public void testGroupByWithSubtotalsSpec() TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() + { + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("qualityLong", "ql", ValueType.LONG), new DefaultDimensionSpec("market", "market"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("ql"), + ImmutableList.of("market"), + ImmutableList.of() + )) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 135.885094, "idxFloat", 135.8851, "ql", 1000L, "rows", 1L, "idx", 135L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 118.57034, "idxFloat", 118.57034, "ql", 1100L, "rows", 1L, "idx", 118L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 158.747224, "idxFloat", 158.74722, "ql", 1200L, "rows", 1L, "idx", 158L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 120.134704, "idxFloat", 120.134705, "ql", 1300L, "rows", 1L, "idx", 120L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 2871.8866900000003, "idxFloat", 2871.88671875, "ql", 1400L, "rows", 3L, "idx", 2870L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 121.583581, "idxFloat", 121.58358, "ql", 1500L, "rows", 1L, "idx", 121L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 2900.798647, "idxFloat", 2900.798583984375, "ql", 1600L, "rows", 3L, "idx", 2900L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 78.622547, "idxFloat", 78.62254, "ql", 1700L, "rows", 1L, "idx", 78L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 119.922742, "idxFloat", 119.922745, "ql", 1800L, "rows", 1L, "idx", 119L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 147.425935, "idxFloat", 147.42593, "ql", 1000L, "rows", 1L, "idx", 147L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 112.987027, "idxFloat", 112.98703, "ql", 1100L, "rows", 1L, "idx", 112L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 166.016049, "idxFloat", 166.01605, "ql", 1200L, "rows", 1L, "idx", 166L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 113.446008, "idxFloat", 113.44601, "ql", 1300L, "rows", 1L, "idx", 113L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 2448.830613, "idxFloat", 2448.83056640625, "ql", 1400L, "rows", 3L, "idx", 2447L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 114.290141, "idxFloat", 114.29014, "ql", 1500L, "rows", 1L, "idx", 114L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 2506.415148, "idxFloat", 2506.4150390625, "ql", 1600L, "rows", 3L, "idx", 2505L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 97.387433, "idxFloat", 97.387436, "ql", 1700L, "rows", 1L, "idx", 97L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 126.411364, "idxFloat", 126.41136, "ql", 1800L, "rows", 1L, "idx", 126L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 643.043177, "idxFloat", 643.043212890625, "rows", 5L, "idx", 640L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "total_market", "idxDouble", 1314.839715, "idxFloat", 1314.8397, "rows", 1L, "idx", 1314L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "upfront", "idxDouble", 1447.34116, "idxFloat", 1447.3412, "rows", 1L, "idx", 1447L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 266.090949, "idxFloat", 266.0909423828125, "rows", 2L, "idx", 265L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "total_market", "idxDouble", 1522.043733, "idxFloat", 1522.0437, "rows", 1L, "idx", 1522L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "upfront", "idxDouble", 1234.247546, "idxFloat", 1234.2476, "rows", 1L, "idx", 1234L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 198.545289, "idxFloat", 198.5452880859375, "rows", 2L, "idx", 197L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 650.806953, "idxFloat", 650.8069458007812, "rows", 5L, "idx", 648L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "total_market", "idxDouble", 1193.556278, "idxFloat", 1193.5563, "rows", 1L, "idx", 1193L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "upfront", "idxDouble", 1144.342401, "idxFloat", 1144.3424, "rows", 1L, "idx", 1144L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 249.591647, "idxFloat", 249.59164428710938, "rows", 2L, "idx", 249L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "total_market", "idxDouble", 1321.375057, "idxFloat", 1321.375, "rows", 1L, "idx", 1321L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "upfront", "idxDouble", 1049.738585, "idxFloat", 1049.7385, "rows", 1L, "idx", 1049L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 223.798797, "idxFloat", 223.79879760742188, "rows", 2L, "idx", 223L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 6626.151569, "idxFloat", 6626.1513671875, "rows", 13L, "idx", 6619L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 5833.209717999999, "idxFloat", 5833.20849609375, "rows", 13L, "idx", 5827L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + + for (Row row : results) { + System.out.println(row); + } + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + @Test public void testGroupByWithSubtotalsSpecWithOrderLimit() { From ce696f2cf30ed39097a4756c603d9e836a05f84e Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 21 Aug 2018 12:23:55 -0700 Subject: [PATCH 4/6] fix checkstyle issues after merge to master --- .../java/io/druid/query/groupby/GroupByQueryRunnerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 7108a52a7506..3043bd74b9c2 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -5597,7 +5597,7 @@ public void testGroupByWithSubtotalsSpec() .builder() .setDataSource(QueryRunnerTestHelper.dataSource) .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) - .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", "market"))) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", "market"))) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, @@ -5670,7 +5670,7 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() .builder() .setDataSource(QueryRunnerTestHelper.dataSource) .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) - .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("qualityLong", "ql", ValueType.LONG), new DefaultDimensionSpec("market", "market"))) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("qualityLong", "ql", ValueType.LONG), new DefaultDimensionSpec("market", "market"))) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, @@ -5745,7 +5745,7 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() .builder() .setDataSource(QueryRunnerTestHelper.dataSource) .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) - .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", "market"))) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", "market"))) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, From 9e8de8b4a39170e543020141e23a6252c0110054 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 23 Aug 2018 09:51:08 -0700 Subject: [PATCH 5/6] add docs for subtotalsSpec feature --- docs/content/querying/groupbyquery.md | 91 ++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index 4048551f84fb..15315c5a30a2 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -56,7 +56,7 @@ An example groupBy query object is shown below: } ``` -There are 11 main parts to a groupBy query: +Following are main parts to a groupBy query: |property|description|required?| |--------|-----------|---------| @@ -70,6 +70,7 @@ There are 11 main parts to a groupBy query: |aggregations|See [Aggregations](../querying/aggregations.html)|no| |postAggregations|See [Post Aggregations](../querying/post-aggregations.html)|no| |intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| +|subtotalsSpec| A JSON array of arrays to return additional result sets for groupings of subsets of top level `dimensions`. It is described later in more detail.|no| |context|An additional JSON Object which can be used to specify certain flags.|no| To pull it all together, the above query would return *n\*m* data points, up to a maximum of 5000 points, where n is the cardinality of the `country` dimension, m is the cardinality of the `device` dimension, each day between 2012-01-01 and 2012-01-03, from the `sample_datasource` table. Each data point contains the (long) sum of `total_usage` if the value of the data point is greater than 100, the (double) sum of `data_transfer` and the (double) result of `total_usage` divided by `data_transfer` for the filter set for a particular grouping of `country` and `device`. The output looks like this: @@ -113,6 +114,94 @@ improve performance. See [Multi-value dimensions](multi-value-dimensions.html) for more details. +### More on subtotalsSpec +you can have a groupBy query that looks something like below... + +```json +{ +"type": "groupBy", + ... + ... +"dimenstions": [ + { + "type" : "default", + "dimension" : "d1col", + "outputName": "D1" + }, + { + "type" : "extraction", + "dimension" : "d2col", + "outputName" : "D2", + "extractionFn" : extraction_func + }, + { + "type":"lookup", + "dimension":"d3col", + "outputName":"D3", + "name":"my_lookup" + } +], +... +... +"subtotalsSpec":[ ["D1", "D2", D3"], ["D1", "D3"], ["D3"]], +.. + +} +``` + +Response returned would be equivalent to concatenating result of 3 groupBy queries with "dimensions" field being ["D1", "D2", D3"], ["D1", "D3"] and ["D3"] with appropriate `DimensionSpec` json blob as used in above query. +Response for above query would look something like below... + +```json +[ + { + "version" : "v1", + "timestamp" : "t1", + "event" : { "D1": "..", "D2": "..", "D3": ".." } + } + }, + { + "version" : "v1", + "timestamp" : "t2", + "event" : { "D1": "..", "D2": "..", "D3": ".." } + } + }, + ... + ... + + { + "version" : "v1", + "timestamp" : "t1", + "event" : { "D1": "..", "D3": ".." } + } + }, + { + "version" : "v1", + "timestamp" : "t2", + "event" : { "D1": "..", "D3": ".." } + } + }, + ... + ... + + { + "version" : "v1", + "timestamp" : "t1", + "event" : { "D3": ".." } + } + }, + { + "version" : "v1", + "timestamp" : "t2", + "event" : { "D3": ".." } + } + }, +... +] +``` + +Note that "subtotalsSpec" must contain subsets of "outputName" from various `DimensionSpec` json blobs in `dimensions` attribute and also ordering of dimensions inside subtotal spec must be same as that inside top level "dimensions" attribute e.g. ["D2", "D1"] subtotal spec is not valid as it is not in same order. + ### Implementation details #### Strategies From 7135446054247a58b0f5e18de1e4c73b25f98d02 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 28 Aug 2018 10:54:24 -0700 Subject: [PATCH 6/6] address doc review comments --- docs/content/querying/groupbyquery.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index 15315c5a30a2..337415512c80 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -70,7 +70,7 @@ Following are main parts to a groupBy query: |aggregations|See [Aggregations](../querying/aggregations.html)|no| |postAggregations|See [Post Aggregations](../querying/post-aggregations.html)|no| |intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| -|subtotalsSpec| A JSON array of arrays to return additional result sets for groupings of subsets of top level `dimensions`. It is described later in more detail.|no| +|subtotalsSpec| A JSON array of arrays to return additional result sets for groupings of subsets of top level `dimensions`. It is [described later](groupbyquery.html#more-on-subtotalsspec) in more detail.|no| |context|An additional JSON Object which can be used to specify certain flags.|no| To pull it all together, the above query would return *n\*m* data points, up to a maximum of 5000 points, where n is the cardinality of the `country` dimension, m is the cardinality of the `device` dimension, each day between 2012-01-01 and 2012-01-03, from the `sample_datasource` table. Each data point contains the (long) sum of `total_usage` if the value of the data point is greater than 100, the (double) sum of `data_transfer` and the (double) result of `total_usage` divided by `data_transfer` for the filter set for a particular grouping of `country` and `device`. The output looks like this: @@ -115,14 +115,14 @@ improve performance. See [Multi-value dimensions](multi-value-dimensions.html) for more details. ### More on subtotalsSpec -you can have a groupBy query that looks something like below... +The subtotals feature allows computation of multiple sub-groupings in a single query. To use this feature, add a "subtotalsSpec" to your query, which should be a list of subgroup dimension sets. It should contain the "outputName" from dimensions in your "dimensions" attribute, in the same order as they appear in the "dimensions" attribute (although, of course, you may skip some). For example, consider a groupBy query like this one: ```json { "type": "groupBy", ... ... -"dimenstions": [ +"dimensions": [ { "type" : "default", "dimension" : "d1col", @@ -200,8 +200,6 @@ Response for above query would look something like below... ] ``` -Note that "subtotalsSpec" must contain subsets of "outputName" from various `DimensionSpec` json blobs in `dimensions` attribute and also ordering of dimensions inside subtotal spec must be same as that inside top level "dimensions" attribute e.g. ["D2", "D1"] subtotal spec is not valid as it is not in same order. - ### Implementation details #### Strategies @@ -271,6 +269,10 @@ With groupBy v2, cluster operators should make sure that the off-heap hash table will not exceed available memory for the maximum possible concurrent query load (given by druid.processing.numMergeBuffers). See [How much direct memory does Druid use?](../operations/performance-faq.html) for more details. +Brokers do not need merge buffers for basic groupBy queries. Queries with subqueries (using a "query" [dataSource](datasource.html#query-data-source)) require one merge buffer if there is a single subquery, or two merge buffers if there is more than one layer of nested subqueries. Queries with [subtotals](groupbyquery.html#more-on-subtotalsspec) need one merge buffer. These can stack on top of each other: a groupBy query with multiple layers of nested subqueries, and that also uses subtotals, will need three merge buffers. + +Historicals and ingestion tasks need one merge buffer for each groupBy query, unless [parallel combination](groupbyquery.html#parallel-combine) is enabled, in which case they need two merge buffers per query. + When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter druid.query.groupBy.maxResults. This is a cap on the maximum number of results in a result set. Queries that exceed this limit will fail with a "Resource limit exceeded" error indicating they exceeded their row limit. Cluster