From 0cfbce9b373f8feea6ad79ae2cf55e505cd6fa7f Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 18 Jul 2019 15:36:48 -0700 Subject: [PATCH 01/16] fix merging of groupBy subtotal spec results --- .../druid/java/util/common/collect/Utils.java | 11 + .../druid/query/groupby/GroupByQuery.java | 17 +- .../groupby/strategy/GroupByStrategyV2.java | 137 ++++-- .../query/groupby/GroupByQueryRunnerTest.java | 432 +++++++----------- 4 files changed, 287 insertions(+), 310 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java index bd5444aa78bb..de7a2399f136 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java @@ -70,4 +70,15 @@ public static List nullableListOf(@Nullable T... elements) } return list; } + + public static boolean isPrefix(List small, List big) + { + for (int i = 0; i < small.size(); i++) { + if (!small.get(i).equals(big.get(i))) { + return false; + } + } + + return true; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index 29ec5ec332cc..5e3f19d488e4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -220,28 +220,20 @@ private List> verifySubtotalsSpec( List dimensions ) { - // if subtotalsSpec exists then validate that all are subsets of dimensions spec and are in same order. - // For example if we had {D1, D2, D3} in dimensions spec then - // {D2}, {D1, D2}, {D1, D3}, {D2, D3} etc are valid in subtotalsSpec while - // {D2, D1} is not as it is not in same order. - // {D4} is not as its not a subset. - // This restriction as enforced because implementation does sort merge on the results of top-level query - // results and expects that ordering of events does not change when dimension columns are removed from - // results of top level query. + // if subtotalsSpec exists then validate that all are subsets of dimensions spec. if (subtotalsSpec != null) { for (List subtotalSpec : subtotalsSpec) { - int i = 0; for (String s : subtotalSpec) { boolean found = false; - for (; i < dimensions.size(); i++) { - if (s.equals(dimensions.get(i).getOutputName())) { + for (DimensionSpec ds : dimensions) { + if (s.equals(ds.getOutputName())) { found = true; break; } } if (!found) { throw new IAE( - "Subtotal spec %s is either not a subset or items are in different order than in dimensions.", + "Subtotal spec %s is either not a subset of top level dimensions.", subtotalSpec ); } @@ -902,7 +894,6 @@ public Builder addOrderByColumn(OrderByColumnSpec columnSpec) public Builder setLimitSpec(LimitSpec limitSpec) { - Preconditions.checkNotNull(limitSpec); ensureFluentLimitsNotSet(); this.limitSpec = limitSpec; this.postProcessingFn = null; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index cd6988392e6c..925b13632cae 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -24,6 +24,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -38,6 +39,7 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.collect.Utils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.CloseQuietly; @@ -56,6 +58,7 @@ import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.ResultMergeQueryRunner; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; @@ -142,7 +145,7 @@ public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMerg { if (!willMergeRunners) { final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) + - (query.getSubtotalsSpec() != null ? 1 : 0); + numMergeBuffersForSubtotalsSpec(query); if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { throw new ResourceLimitExceededException( @@ -390,23 +393,29 @@ public Sequence processSubtotalsSpec( final List closeOnExit = new ArrayList<>(); try { - GroupByQuery queryWithoutSubtotalsSpec = query.withSubtotalsSpec(null).withDimFilter(null); + GroupByQuery queryWithoutSubtotalsSpec = query + .withDimensionSpecs(query.getDimensions().stream().map( + dimSpec -> new DefaultDimensionSpec( + dimSpec.getOutputName(), + dimSpec.getOutputName(), + dimSpec.getOutputType() + )).collect(Collectors.toList()) + ) + .withAggregatorSpecs( + query.getAggregatorSpecs() + .stream() + .map(AggregatorFactory::getCombiningFactory) + .collect(Collectors.toList()) + ) + .withLimitSpec(null) + .withSubtotalsSpec(null) + .withDimFilter(null); + List> subtotals = query.getSubtotalsSpec(); Supplier grouperSupplier = Suppliers.memoize( () -> GroupByRowProcessor.createGrouper( - queryWithoutSubtotalsSpec.withAggregatorSpecs( - Lists.transform(queryWithoutSubtotalsSpec.getAggregatorSpecs(), (agg) -> agg.getCombiningFactory()) - ).withDimensionSpecs( - Lists.transform( - queryWithoutSubtotalsSpec.getDimensions(), - (dimSpec) -> new DefaultDimensionSpec( - dimSpec.getOutputName(), - dimSpec.getOutputName(), - dimSpec.getOutputType() - ) - ) - ), + queryWithoutSubtotalsSpec, queryResult, GroupByQueryHelper.rowSignatureFor(queryWithoutSubtotalsSpec), configSupplier.get(), @@ -419,36 +428,86 @@ public Sequence processSubtotalsSpec( false ) ); + + List> subtotalsResults = new ArrayList<>(subtotals.size()); Map queryDimensionSpecs = new HashMap(queryWithoutSubtotalsSpec.getDimensions().size()); + List queryDimNames = new ArrayList(queryWithoutSubtotalsSpec.getDimensions().size()); + for (DimensionSpec dimSpec : queryWithoutSubtotalsSpec.getDimensions()) { queryDimensionSpecs.put(dimSpec.getOutputName(), dimSpec); + queryDimNames.add(dimSpec.getOutputName()); } for (List subtotalSpec : subtotals) { GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec.withDimensionSpecs( subtotalSpec.stream() - .map(s -> new DefaultDimensionSpec(s, s, queryDimensionSpecs.get(s).getOutputType())) + .map(queryDimensionSpecs::get) .collect(Collectors.toList()) ); - subtotalsResults.add(applyPostProcessing( - mergeResults(new QueryRunner() - { - @Override - public Sequence run(QueryPlus queryPlus, Map responseContext) + if (Utils.isPrefix(subtotalSpec, queryDimNames)) { + subtotalsResults.add(applyPostProcessing( + mergeResults(new QueryRunner() { - return GroupByRowProcessor.getRowsFromGrouper( - queryWithoutSubtotalsSpec, - subtotalSpec, - grouperSupplier - ); - } - }, subtotalQuery, null), - subtotalQuery - ) - ); + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + return GroupByRowProcessor.getRowsFromGrouper( + queryWithoutSubtotalsSpec, + subtotalSpec, + grouperSupplier + ); + } + }, subtotalQuery, null), + subtotalQuery + ) + ); + } else { + subtotalsResults.add(applyPostProcessing( + mergeResults(new QueryRunner() + { + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + List closeables = new ArrayList<>(); + + Sequence result = GroupByRowProcessor.getRowsFromGrouper( + subtotalQuery, + subtotalSpec, + Suppliers.memoize(() -> GroupByRowProcessor.createGrouper( + subtotalQuery.withAggregatorSpecs( + Lists.transform( + queryWithoutSubtotalsSpec.getAggregatorSpecs(), + (agg) -> agg.getCombiningFactory() + ) + ), + GroupByRowProcessor.getRowsFromGrouper( + queryWithoutSubtotalsSpec, + subtotalSpec, + grouperSupplier + ), + GroupByQueryHelper.rowSignatureFor(subtotalQuery), + configSupplier.get(), + resource, + spillMapper, + processingConfig.getTmpDir(), + processingConfig.intermediateComputeSizeBytes(), + closeables, + false, + false + ) + ) + ); + + return Sequences.withBaggage(result, () -> Lists.reverse(closeables).forEach(closeable -> CloseQuietly.close(closeable))); + } + }, subtotalQuery, null), + subtotalQuery + ) + ); + } } return Sequences.withBaggage( @@ -462,6 +521,24 @@ public Sequence run(QueryPlus queryPlus, Map responseC } } + private int numMergeBuffersForSubtotalsSpec(GroupByQuery query) + { + List> subtotalSpecs = query.getSubtotalsSpec(); + if (subtotalSpecs == null) { + return 0; + } + + List queryDimOutputNames = query.getDimensions().stream().map(DimensionSpec::getOutputName).collect( + Collectors.toList()); + for (List subtotalSpec : subtotalSpecs) { + if (!Utils.isPrefix(subtotalSpec, queryDimOutputNames)) { + return 2; + } + } + + return 1; + } + @Override public QueryRunner mergeRunners(ListeningExecutorService exec, Iterable> queryRunners) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 3eb69c0dda41..560049cc9f87 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6278,7 +6278,7 @@ public void testSubqueryWithFirstLast() } @Test - public void testGroupByWithSubtotalsSpec() + public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() { // Cannot vectorize due to usage of expressions. cannotVectorize(); @@ -6292,22 +6292,128 @@ public void testGroupByWithSubtotalsSpec() .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ValueType.STRING, TestExprMacroTable.INSTANCE)) .setDimensions(Lists.newArrayList( - new DefaultDimensionSpec("quality", "quality"), - new DefaultDimensionSpec("market", "market"), - new DefaultDimensionSpec("alias", "alias") + new DefaultDimensionSpec("market", "market2"), + new DefaultDimensionSpec("alias", "alias2") )) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory("idx", "index"), - new FloatSumAggregatorFactory("idxFloat", "indexFloat"), - new DoubleSumAggregatorFactory("idxDouble", "index") + new LongSumAggregatorFactory("idx", "index") ) ) .setGranularity(QueryRunnerTestHelper.dayGran) .setSubtotalsSpec(ImmutableList.of( - ImmutableList.of("alias"), - ImmutableList.of("market"), + ImmutableList.of("market2"), + ImmutableList.of() + )) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-01T00:00:00.000Z", + "market2", + "spot", + "rows", + 9L, + "idx", + 1102L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-01T00:00:00.000Z", + "market2", + "total_market", + "rows", + 2L, + "idx", + 2836L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-01T00:00:00.000Z", + "market2", + "upfront", + "rows", + 2L, + "idx", + 2681L + ), + + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", + "market2", + "spot", + "rows", + 9L, + "idx", + 1120L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", + "market2", + "total_market", + "rows", + 2L, + "idx", + 2514L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", + "market2", + "upfront", + "rows", + 2L, + "idx", + 2193L + ), + + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-01T00:00:00.000Z", + "rows", + 13L, + "idx", + 6619L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", + "rows", + 13L, + "idx", + 5827L + ) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "subtotal"); + } + + @Test + public void testGroupByWithSubtotalsSpecGeneral() + { + // Cannot vectorize due to usage of expressions. + cannotVectorize(); + + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ValueType.STRING, TestExprMacroTable.INSTANCE)) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "quality2"), + new DefaultDimensionSpec("market", "market2"), + new DefaultDimensionSpec("alias", "alias2") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias2"), + ImmutableList.of("market2"), ImmutableList.of() )) .build(); @@ -6315,430 +6421,226 @@ public void testGroupByWithSubtotalsSpec() List expectedResults = Arrays.asList( GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "automotive", "rows", 1L, "idx", - 135L, - "idxFloat", - 135.88510131835938f, - "idxDouble", - 135.88510131835938d + 135L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "business", "rows", 1L, "idx", - 118L, - "idxFloat", - 118.57034, - "idxDouble", - 118.57034 + 118L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "entertainment", "rows", 1L, "idx", - 158L, - "idxFloat", - 158.747224, - "idxDouble", - 158.747224 + 158L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "health", "rows", 1L, "idx", - 120L, - "idxFloat", - 120.134704, - "idxDouble", - 120.134704 + 120L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "mezzanine", "rows", 3L, "idx", - 2870L, - "idxFloat", - 2871.8866900000003f, - "idxDouble", - 2871.8866900000003d + 2870L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "news", "rows", 1L, "idx", - 121L, - "idxFloat", - 121.58358f, - "idxDouble", - 121.58358d + 121L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "premium", "rows", 3L, "idx", - 2900L, - "idxFloat", - 2900.798647f, - "idxDouble", - 2900.798647d + 2900L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "technology", "rows", 1L, "idx", - 78L, - "idxFloat", - 78.622547f, - "idxDouble", - 78.622547d + 78L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "travel", "rows", 1L, "idx", - 119L, - "idxFloat", - 119.922742f, - "idxDouble", - 119.922742d + 119L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "automotive", "rows", 1L, "idx", - 147L, - "idxFloat", - 147.42593f, - "idxDouble", - 147.42593d + 147L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "business", "rows", 1L, "idx", - 112L, - "idxFloat", - 112.987027f, - "idxDouble", - 112.987027d + 112L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "entertainment", "rows", 1L, "idx", - 166L, - "idxFloat", - 166.016049f, - "idxDouble", - 166.016049d + 166L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "health", "rows", 1L, "idx", - 113L, - "idxFloat", - 113.446008f, - "idxDouble", - 113.446008d + 113L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "mezzanine", "rows", 3L, "idx", - 2447L, - "idxFloat", - 2448.830613f, - "idxDouble", - 2448.830613d + 2447L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "news", "rows", 1L, "idx", - 114L, - "idxFloat", - 114.290141f, - "idxDouble", - 114.290141d + 114L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "premium", "rows", 3L, "idx", - 2505L, - "idxFloat", - 2506.415148f, - "idxDouble", - 2506.415148d + 2505L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "technology", "rows", 1L, "idx", - 97L, - "idxFloat", - 97.387433f, - "idxDouble", - 97.387433d + 97L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "travel", "rows", 1L, "idx", - 126L, - "idxFloat", - 126.411364f, - "idxDouble", - 126.411364d + 126L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "spot", - "idxDouble", - 643.043177, - "idxFloat", - 643.043212890625, "rows", - 5L, + 9L, "idx", - 640L + 1102L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "total_market", - "idxDouble", - 1314.839715, - "idxFloat", - 1314.8397, - "rows", - 1L, - "idx", - 1314L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-01T00:00:00.000Z", - "market", - "upfront", - "idxDouble", - 1447.34116, - "idxFloat", - 1447.3412, - "rows", - 1L, - "idx", - 1447L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-01T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 266.090949, - "idxFloat", - 266.0909423828125, "rows", 2L, "idx", - 265L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-01T00:00:00.000Z", - "market", - "total_market", - "idxDouble", - 1522.043733, - "idxFloat", - 1522.0437, - "rows", - 1L, - "idx", - 1522L + 2836L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "upfront", - "idxDouble", - 1234.247546, - "idxFloat", - 1234.2476, - "rows", - 1L, - "idx", - 1234L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-01T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 198.545289, - "idxFloat", - 198.5452880859375, "rows", 2L, "idx", - 197L + 2681L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "spot", - "idxDouble", - 650.806953, - "idxFloat", - 650.8069458007812, "rows", - 5L, + 9L, "idx", - 648L + 1120L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "total_market", - "idxDouble", - 1193.556278, - "idxFloat", - 1193.5563, - "rows", - 1L, - "idx", - 1193L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-02T00:00:00.000Z", - "market", - "upfront", - "idxDouble", - 1144.342401, - "idxFloat", - 1144.3424, - "rows", - 1L, - "idx", - 1144L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-02T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 249.591647, - "idxFloat", - 249.59164428710938, "rows", 2L, "idx", - 249L + 2514L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", - "total_market", - "idxDouble", - 1321.375057, - "idxFloat", - 1321.375, - "rows", - 1L, - "idx", - 1321L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-02T00:00:00.000Z", - "market", + "market2", "upfront", - "idxDouble", - 1049.738585, - "idxFloat", - 1049.7385, - "rows", - 1L, - "idx", - 1049L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-02T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 223.798797, - "idxFloat", - 223.79879760742188, "rows", 2L, "idx", - 223L + 2193L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 6626.151575318359, - "idxFloat", - 6626.152f, "rows", 13L, "idx", @@ -6746,10 +6648,6 @@ public void testGroupByWithSubtotalsSpec() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 5833.209713, - "idxFloat", - 5833.209f, "rows", 13L, "idx", From 9f5436df0012644a96e5b29ed55c32ee90d0cc4b Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 18 Jul 2019 15:51:38 -0700 Subject: [PATCH 02/16] add post agg to subtotals spec ut --- .../query/groupby/GroupByQueryRunnerTest.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 560049cc9f87..2a3c433dff02 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6410,6 +6410,11 @@ public void testGroupByWithSubtotalsSpecGeneral() new LongSumAggregatorFactory("idx", "index") ) ) + .setPostAggregatorSpecs( + Collections.singletonList( + new FieldAccessPostAggregator("idxPostAgg","idx") + ) + ) .setGranularity(QueryRunnerTestHelper.dayGran) .setSubtotalsSpec(ImmutableList.of( ImmutableList.of("alias2"), @@ -6426,6 +6431,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 135L, + "idxPostAgg", 135L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6435,6 +6442,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 118L, + "idxPostAgg", 118L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6444,6 +6453,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 158L, + "idxPostAgg", 158L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6453,6 +6464,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 120L, + "idxPostAgg", 120L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6462,6 +6475,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 3L, "idx", + 2870L, + "idxPostAgg", 2870L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6471,6 +6486,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 121L, + "idxPostAgg", 121L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6480,6 +6497,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 3L, "idx", + 2900L, + "idxPostAgg", 2900L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6489,6 +6508,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 78L, + "idxPostAgg", 78L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6498,6 +6519,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 119L, + "idxPostAgg", 119L ), @@ -6508,6 +6531,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 147L, + "idxPostAgg", 147L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6517,6 +6542,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 112L, + "idxPostAgg", 112L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6526,6 +6553,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 166L, + "idxPostAgg", 166L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6535,6 +6564,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 113L, + "idxPostAgg", 113L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6544,6 +6575,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 3L, "idx", + 2447L, + "idxPostAgg", 2447L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6553,6 +6586,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 114L, + "idxPostAgg", 114L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6562,6 +6597,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 3L, "idx", + 2505L, + "idxPostAgg", 2505L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6571,6 +6608,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 97L, + "idxPostAgg", 97L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6580,6 +6619,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 1L, "idx", + 126L, + "idxPostAgg", 126L ), @@ -6590,6 +6631,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 9L, "idx", + 1102L, + "idxPostAgg", 1102L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6599,6 +6642,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 2L, "idx", + 2836L, + "idxPostAgg", 2836L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6608,6 +6653,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 2L, "idx", + 2681L, + "idxPostAgg", 2681L ), @@ -6618,6 +6665,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 9L, "idx", + 1120L, + "idxPostAgg", 1120L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6627,6 +6676,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 2L, "idx", + 2514L, + "idxPostAgg", 2514L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6636,6 +6687,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 2L, "idx", + 2193L, + "idxPostAgg", 2193L ), @@ -6644,6 +6697,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 13L, "idx", + 6619L, + "idxPostAgg", 6619L ), GroupByQueryRunnerTestHelper.createExpectedRow( @@ -6651,6 +6706,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "rows", 13L, "idx", + 5827L, + "idxPostAgg", 5827L ) ); From 30d9510d103e872a931ddd921e3cec36459c73da Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 18 Jul 2019 15:58:56 -0700 Subject: [PATCH 03/16] add comment --- .../query/groupby/strategy/GroupByStrategyV2.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 925b13632cae..fdeda902851a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -388,6 +388,16 @@ public Sequence processSubtotalsSpec( Sequence queryResult ) { + // How it works? + // First we accumulate the result of top level query aka queryResult arg inside a Grouper#1 object. + // Next for each subtotalSpec + // If subtotalSpec is a prefix of top level dims then we iterate on rows in Grouper#1 object which are still + // sorted by subtotalSpec, stream merge them and return. + // + // If subtotalSpec is not a prefix of top level dims then we create a Grouper#2 object filled with rows from + // Grouper#1 object with only dims from subtotalSpec. Then we iterate on rows in Grouper#2 object which are + // of course sorted by subtotalSpec, stream merge them and return. + // This contains all closeable objects which are closed when the returned iterator iterates all the elements, // or an exceptions is thrown. The objects are closed in their reverse order. final List closeOnExit = new ArrayList<>(); From 4b80b4ac5d71bb16e0f2a02701bc25150d991172 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 18 Jul 2019 16:12:56 -0700 Subject: [PATCH 04/16] remove unnecessary agg transformation --- .../druid/query/groupby/strategy/GroupByStrategyV2.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index fdeda902851a..c3e48150d53d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -487,12 +487,7 @@ public Sequence run(QueryPlus queryPlus, Map responseC subtotalQuery, subtotalSpec, Suppliers.memoize(() -> GroupByRowProcessor.createGrouper( - subtotalQuery.withAggregatorSpecs( - Lists.transform( - queryWithoutSubtotalsSpec.getAggregatorSpecs(), - (agg) -> agg.getCombiningFactory() - ) - ), + subtotalQuery, GroupByRowProcessor.getRowsFromGrouper( queryWithoutSubtotalsSpec, subtotalSpec, From 526454bf9a6306ea0713c371105f9b1feef4b721 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 18 Jul 2019 16:30:50 -0700 Subject: [PATCH 05/16] fix build --- .../apache/druid/query/groupby/strategy/GroupByStrategyV2.java | 1 - .../org/apache/druid/query/groupby/GroupByQueryRunnerTest.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index c3e48150d53d..8a127c4adbf1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -24,7 +24,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 2a3c433dff02..1b599668fe78 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6412,7 +6412,7 @@ public void testGroupByWithSubtotalsSpecGeneral() ) .setPostAggregatorSpecs( Collections.singletonList( - new FieldAccessPostAggregator("idxPostAgg","idx") + new FieldAccessPostAggregator("idxPostAgg", "idx") ) ) .setGranularity(QueryRunnerTestHelper.dayGran) From fe84d24e26bbcb0ff805b751bab2b2bce4cf6936 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 19 Jul 2019 14:43:51 -0700 Subject: [PATCH 06/16] fix test --- .../druid/query/groupby/GroupByQuery.java | 1 + .../groupby/strategy/GroupByStrategyV2.java | 1 - .../query/groupby/GroupByQueryRunnerTest.java | 270 ++---------------- 3 files changed, 25 insertions(+), 247 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index 5e3f19d488e4..a08f0cf35b25 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -894,6 +894,7 @@ public Builder addOrderByColumn(OrderByColumnSpec columnSpec) public Builder setLimitSpec(LimitSpec limitSpec) { + Preconditions.checkNotNull(limitSpec); ensureFluentLimitsNotSet(); this.limitSpec = limitSpec; this.postProcessingFn = null; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 8a127c4adbf1..8121b18cb098 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -416,7 +416,6 @@ public Sequence processSubtotalsSpec( .map(AggregatorFactory::getCombiningFactory) .collect(Collectors.toList()) ) - .withLimitSpec(null) .withSubtotalsSpec(null) .withDimFilter(null); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 1b599668fe78..af816cbf48e9 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6818,20 +6818,18 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) .setDimensions(Lists.newArrayList( new DefaultDimensionSpec("qualityLong", "ql", ValueType.LONG), - new DefaultDimensionSpec("market", "market") + new DefaultDimensionSpec("market", "market2") )) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory("idx", "index"), - new FloatSumAggregatorFactory("idxFloat", "indexFloat"), - new DoubleSumAggregatorFactory("idxDouble", "index") + new LongSumAggregatorFactory("idx", "index") ) ) .setGranularity(QueryRunnerTestHelper.dayGran) .setSubtotalsSpec(ImmutableList.of( ImmutableList.of("ql"), - ImmutableList.of("market"), + ImmutableList.of("market2"), ImmutableList.of() )) .build(); @@ -6839,10 +6837,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() List expectedResults = Arrays.asList( GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 135.885094, - "idxFloat", - 135.8851, "ql", 1000L, "rows", @@ -6852,10 +6846,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 118.57034, - "idxFloat", - 118.57034, "ql", 1100L, "rows", @@ -6865,10 +6855,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 158.747224, - "idxFloat", - 158.74722, "ql", 1200L, "rows", @@ -6878,10 +6864,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 120.134704, - "idxFloat", - 120.134705, "ql", 1300L, "rows", @@ -6891,10 +6873,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 2871.8866900000003, - "idxFloat", - 2871.88671875, "ql", 1400L, "rows", @@ -6904,10 +6882,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 121.583581, - "idxFloat", - 121.58358, "ql", 1500L, "rows", @@ -6917,10 +6891,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 2900.798647, - "idxFloat", - 2900.798583984375, "ql", 1600L, "rows", @@ -6930,10 +6900,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 78.622547, - "idxFloat", - 78.62254, "ql", 1700L, "rows", @@ -6943,10 +6909,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 119.922742, - "idxFloat", - 119.922745, "ql", 1800L, "rows", @@ -6956,10 +6918,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 147.425935, - "idxFloat", - 147.42593, "ql", 1000L, "rows", @@ -6969,10 +6927,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 112.987027, - "idxFloat", - 112.98703, "ql", 1100L, "rows", @@ -6982,10 +6936,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 166.016049, - "idxFloat", - 166.01605, "ql", 1200L, "rows", @@ -6995,10 +6945,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 113.446008, - "idxFloat", - 113.44601, "ql", 1300L, "rows", @@ -7008,10 +6954,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 2448.830613, - "idxFloat", - 2448.83056640625, "ql", 1400L, "rows", @@ -7021,10 +6963,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 114.290141, - "idxFloat", - 114.29014, "ql", 1500L, "rows", @@ -7034,10 +6972,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 2506.415148, - "idxFloat", - 2506.4150390625, "ql", 1600L, "rows", @@ -7047,10 +6981,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 97.387433, - "idxFloat", - 97.387436, "ql", 1700L, "rows", @@ -7060,10 +6990,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 126.411364, - "idxFloat", - 126.41136, "ql", 1800L, "rows", @@ -7074,193 +7000,63 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "spot", - "idxDouble", - 643.043177, - "idxFloat", - 643.043212890625, "rows", - 5L, + 9L, "idx", - 640L + 1102L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "total_market", - "idxDouble", - 1314.839715, - "idxFloat", - 1314.8397, - "rows", - 1L, - "idx", - 1314L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-01T00:00:00.000Z", - "market", - "upfront", - "idxDouble", - 1447.34116, - "idxFloat", - 1447.3412, - "rows", - 1L, - "idx", - 1447L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-01T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 266.090949, - "idxFloat", - 266.0909423828125, "rows", 2L, "idx", - 265L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-01T00:00:00.000Z", - "market", - "total_market", - "idxDouble", - 1522.043733, - "idxFloat", - 1522.0437, - "rows", - 1L, - "idx", - 1522L + 2836L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "upfront", - "idxDouble", - 1234.247546, - "idxFloat", - 1234.2476, - "rows", - 1L, - "idx", - 1234L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-01T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 198.545289, - "idxFloat", - 198.5452880859375, "rows", 2L, "idx", - 197L + 2681L ), + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "spot", - "idxDouble", - 650.806953, - "idxFloat", - 650.8069458007812, "rows", - 5L, + 9L, "idx", - 648L + 1120L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "total_market", - "idxDouble", - 1193.556278, - "idxFloat", - 1193.5563, - "rows", - 1L, - "idx", - 1193L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-02T00:00:00.000Z", - "market", - "upfront", - "idxDouble", - 1144.342401, - "idxFloat", - 1144.3424, - "rows", - 1L, - "idx", - 1144L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-02T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 249.591647, - "idxFloat", - 249.59164428710938, "rows", 2L, "idx", - 249L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-02T00:00:00.000Z", - "market", - "total_market", - "idxDouble", - 1321.375057, - "idxFloat", - 1321.375, - "rows", - 1L, - "idx", - 1321L + 2514L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "upfront", - "idxDouble", - 1049.738585, - "idxFloat", - 1049.7385, - "rows", - 1L, - "idx", - 1049L - ), - GroupByQueryRunnerTestHelper.createExpectedRow( - "2011-04-02T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 223.798797, - "idxFloat", - 223.79879760742188, "rows", 2L, "idx", - 223L + 2193L ), + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 6626.151569, - "idxFloat", - 6626.1513671875, "rows", 13L, "idx", @@ -7268,10 +7064,6 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 5833.209717999999, - "idxFloat", - 5833.20849609375, "rows", 13L, "idx", @@ -7300,9 +7092,7 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory("idx", "index"), - new FloatSumAggregatorFactory("idxFloat", "indexFloat"), - new DoubleSumAggregatorFactory("idxDouble", "index") + new LongSumAggregatorFactory("idx", "index") ) ) .setGranularity(QueryRunnerTestHelper.dayGran) @@ -7311,7 +7101,7 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() ImmutableList.of("market"), ImmutableList.of() )) - .addOrderByColumn("idxDouble") + .addOrderByColumn("idx") .setLimit(1) .build(); @@ -7323,31 +7113,19 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() "rows", 1L, "idx", - 78L, - "idxFloat", - 78.622547f, - "idxDouble", - 78.622547d + 78L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", "market", "spot", - "idxDouble", - 198.545289, - "idxFloat", - 198.5452880859375, "rows", - 2L, + 9L, "idx", - 197L + 1102L ), GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 6626.151575318359, - "idxFloat", - 6626.152f, "rows", 13L, "idx", From 52e6e8ba71522731334052f896a5c9e729d48979 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 22 Jul 2019 14:37:35 -0700 Subject: [PATCH 07/16] ignore unknown columns in ordering spec --- .../query/groupby/orderby/DefaultLimitSpec.java | 12 +++++------- .../druid/query/groupby/GroupByQueryRunnerTest.java | 2 ++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java index 7a4c5f3a66af..382b9d3693e2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -248,15 +248,13 @@ public int compare(Row left, Row right) nextOrdering = dimensionOrdering(columnName, columnSpec.getDimensionComparator()); } - if (nextOrdering == null) { - throw new ISE("Unknown column in order clause[%s]", columnSpec); - } + if (nextOrdering != null) { + if (columnSpec.getDirection() == OrderByColumnSpec.Direction.DESCENDING) { + nextOrdering = nextOrdering.reverse(); + } - if (columnSpec.getDirection() == OrderByColumnSpec.Direction.DESCENDING) { - nextOrdering = nextOrdering.reverse(); + ordering = ordering == null ? nextOrdering : ordering.compound(nextOrdering); } - - ordering = ordering == null ? nextOrdering : ordering.compound(nextOrdering); } if (ordering != null) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index af816cbf48e9..7de048b9f8d9 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -7102,6 +7102,8 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() ImmutableList.of() )) .addOrderByColumn("idx") + .addOrderByColumn("alias") + .addOrderByColumn("market") .setLimit(1) .build(); From 014751367d3c62457a4729e1e8bb92ceed0a60ac Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 22 Jul 2019 15:10:00 -0700 Subject: [PATCH 08/16] change variable names based on comment for easy read --- .../groupby/strategy/GroupByStrategyV2.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 8121b18cb098..782cd277102e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -388,9 +388,9 @@ public Sequence processSubtotalsSpec( ) { // How it works? - // First we accumulate the result of top level query aka queryResult arg inside a Grouper#1 object. + // First we accumulate the result of top level query aka queryResult arg inside a GrouperOne object. // Next for each subtotalSpec - // If subtotalSpec is a prefix of top level dims then we iterate on rows in Grouper#1 object which are still + // If subtotalSpec is a prefix of top level dims then we iterate on rows in GrouperTwo object which are still // sorted by subtotalSpec, stream merge them and return. // // If subtotalSpec is not a prefix of top level dims then we create a Grouper#2 object filled with rows from @@ -421,7 +421,7 @@ public Sequence processSubtotalsSpec( List> subtotals = query.getSubtotalsSpec(); - Supplier grouperSupplier = Suppliers.memoize( + Supplier grouperOneSupplier = Suppliers.memoize( () -> GroupByRowProcessor.createGrouper( queryWithoutSubtotalsSpec, queryResult, @@ -465,7 +465,7 @@ public Sequence run(QueryPlus queryPlus, Map responseC return GroupByRowProcessor.getRowsFromGrouper( queryWithoutSubtotalsSpec, subtotalSpec, - grouperSupplier + grouperOneSupplier ); } }, subtotalQuery, null), @@ -481,29 +481,32 @@ public Sequence run(QueryPlus queryPlus, Map responseC { List closeables = new ArrayList<>(); + Supplier grouperTwoSupplier = Suppliers.memoize(() -> GroupByRowProcessor.createGrouper( + subtotalQuery, + GroupByRowProcessor.getRowsFromGrouper( + queryWithoutSubtotalsSpec, + subtotalSpec, + grouperOneSupplier + ), + GroupByQueryHelper.rowSignatureFor(subtotalQuery), + configSupplier.get(), + resource, + spillMapper, + processingConfig.getTmpDir(), + processingConfig.intermediateComputeSizeBytes(), + closeables, + false, + false + )); + Sequence result = GroupByRowProcessor.getRowsFromGrouper( subtotalQuery, subtotalSpec, - Suppliers.memoize(() -> GroupByRowProcessor.createGrouper( - subtotalQuery, - GroupByRowProcessor.getRowsFromGrouper( - queryWithoutSubtotalsSpec, - subtotalSpec, - grouperSupplier - ), - GroupByQueryHelper.rowSignatureFor(subtotalQuery), - configSupplier.get(), - resource, - spillMapper, - processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes(), - closeables, - false, - false - ) - ) + grouperTwoSupplier ); + // Close all the resources associated with grouperTwo as soon as all results for this + // subtotal spec are read. return Sequences.withBaggage(result, () -> Lists.reverse(closeables).forEach(closeable -> CloseQuietly.close(closeable))); } }, subtotalQuery, null), @@ -513,6 +516,7 @@ public Sequence run(QueryPlus queryPlus, Map responseC } } + // Close all resources associated with grouperOne when all results have been read. return Sequences.withBaggage( Sequences.concat(subtotalsResults), () -> Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable)) From 1d8bcec73b4612a12d748dcf31e81ab2fd8d0d0f Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 22 Jul 2019 15:11:14 -0700 Subject: [PATCH 09/16] formatting --- .../groupby/strategy/GroupByStrategyV2.java | 120 ++++++++++-------- 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 782cd277102e..ba5e5a585979 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -456,62 +456,76 @@ public Sequence processSubtotalsSpec( ); if (Utils.isPrefix(subtotalSpec, queryDimNames)) { - subtotalsResults.add(applyPostProcessing( - mergeResults(new QueryRunner() - { - @Override - public Sequence run(QueryPlus queryPlus, Map responseContext) - { - return GroupByRowProcessor.getRowsFromGrouper( - queryWithoutSubtotalsSpec, - subtotalSpec, - grouperOneSupplier - ); - } - }, subtotalQuery, null), - subtotalQuery - ) + subtotalsResults.add( + applyPostProcessing( + mergeResults( + new QueryRunner() + { + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + return GroupByRowProcessor.getRowsFromGrouper( + queryWithoutSubtotalsSpec, + subtotalSpec, + grouperOneSupplier + ); + } + }, + subtotalQuery, + null + ), + subtotalQuery + ) ); } else { - subtotalsResults.add(applyPostProcessing( - mergeResults(new QueryRunner() - { - @Override - public Sequence run(QueryPlus queryPlus, Map responseContext) - { - List closeables = new ArrayList<>(); - - Supplier grouperTwoSupplier = Suppliers.memoize(() -> GroupByRowProcessor.createGrouper( - subtotalQuery, - GroupByRowProcessor.getRowsFromGrouper( - queryWithoutSubtotalsSpec, - subtotalSpec, - grouperOneSupplier - ), - GroupByQueryHelper.rowSignatureFor(subtotalQuery), - configSupplier.get(), - resource, - spillMapper, - processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes(), - closeables, - false, - false - )); - - Sequence result = GroupByRowProcessor.getRowsFromGrouper( + subtotalsResults.add( + applyPostProcessing( + mergeResults( + new QueryRunner() + { + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + List closeables = new ArrayList<>(); + + Supplier grouperTwoSupplier = Suppliers.memoize(() -> GroupByRowProcessor.createGrouper( + subtotalQuery, + GroupByRowProcessor.getRowsFromGrouper( + queryWithoutSubtotalsSpec, + subtotalSpec, + grouperOneSupplier + ), + GroupByQueryHelper.rowSignatureFor(subtotalQuery), + configSupplier.get(), + resource, + spillMapper, + processingConfig.getTmpDir(), + processingConfig.intermediateComputeSizeBytes(), + closeables, + false, + false + )); + + Sequence result = GroupByRowProcessor.getRowsFromGrouper( + subtotalQuery, + subtotalSpec, + grouperTwoSupplier + ); + + // Close all the resources associated with grouperTwo as soon as all results for this + // subtotal spec are read. + return Sequences.withBaggage( + result, + () -> Lists.reverse(closeables) + .forEach(closeable -> CloseQuietly.close(closeable)) + ); + } + }, subtotalQuery, - subtotalSpec, - grouperTwoSupplier - ); - - // Close all the resources associated with grouperTwo as soon as all results for this - // subtotal spec are read. - return Sequences.withBaggage(result, () -> Lists.reverse(closeables).forEach(closeable -> CloseQuietly.close(closeable))); - } - }, subtotalQuery, null), - subtotalQuery - ) + null + ), + subtotalQuery + ) ); } } From 2e81323c186af9d4ae18a8f647a60020107534cf Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 23 Jul 2019 14:13:19 -0700 Subject: [PATCH 10/16] don't ignore unknown columns in DefaultLimitSpec to not change existing behavior --- .../query/groupby/orderby/DefaultLimitSpec.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java index 382b9d3693e2..7a4c5f3a66af 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -248,13 +248,15 @@ public int compare(Row left, Row right) nextOrdering = dimensionOrdering(columnName, columnSpec.getDimensionComparator()); } - if (nextOrdering != null) { - if (columnSpec.getDirection() == OrderByColumnSpec.Direction.DESCENDING) { - nextOrdering = nextOrdering.reverse(); - } + if (nextOrdering == null) { + throw new ISE("Unknown column in order clause[%s]", columnSpec); + } - ordering = ordering == null ? nextOrdering : ordering.compound(nextOrdering); + if (columnSpec.getDirection() == OrderByColumnSpec.Direction.DESCENDING) { + nextOrdering = nextOrdering.reverse(); } + + ordering = ordering == null ? nextOrdering : ordering.compound(nextOrdering); } if (ordering != null) { From 540ae0234b9bb631ee4f55b48ff0608a0696df08 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 23 Jul 2019 14:45:18 -0700 Subject: [PATCH 11/16] handle limit spec columns correctly --- .../groupby/orderby/DefaultLimitSpec.java | 10 ++++ .../query/groupby/orderby/LimitSpec.java | 10 ++++ .../query/groupby/orderby/NoopLimitSpec.java | 7 +++ .../groupby/strategy/GroupByStrategyV2.java | 52 +++++++++++++++++-- .../query/groupby/GroupByQueryRunnerTest.java | 4 +- 5 files changed, 76 insertions(+), 7 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java index 7a4c5f3a66af..828fee02270e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** */ @@ -204,6 +205,15 @@ private ValueType getOrderByType(final OrderByColumnSpec columnSpec, final List< throw new ISE("Unknown column in order clause[%s]", columnSpec); } + @Override + public LimitSpec filterColumns(Set names) + { + return new DefaultLimitSpec( + columns.stream().filter(c -> names.contains(c.getDimension())).collect(Collectors.toList()), + limit + ); + } + private Ordering makeComparator( List dimensions, List aggs, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/LimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/LimitSpec.java index d7243359e3a8..4b3716966cb3 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/LimitSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/LimitSpec.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Set; /** */ @@ -66,4 +67,13 @@ Function, Sequence> build( ); LimitSpec merge(LimitSpec other); + + /** + * Discard sorting columns not contained in given set. This is used when generating new queries, e.g. to process + * subtotal spec in GroupBy query. + * + * @param names columns names to keep + * @return new LimitSpec that works with fitlered set of columns + */ + LimitSpec filterColumns(Set names); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/NoopLimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/NoopLimitSpec.java index fcfc2919ee83..4bebfca92ffb 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/NoopLimitSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/NoopLimitSpec.java @@ -30,6 +30,7 @@ import org.apache.druid.query.dimension.DimensionSpec; import java.util.List; +import java.util.Set; /** */ @@ -67,6 +68,12 @@ public LimitSpec merge(LimitSpec other) return other; } + @Override + public LimitSpec filterColumns(Set names) + { + return this; + } + @Override public String toString() { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index ba5e5a585979..58adeafa8094 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -70,6 +70,8 @@ import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor; import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.groupby.orderby.LimitSpec; +import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.resource.GroupByQueryResource; import org.apache.druid.segment.StorageAdapter; import org.joda.time.DateTime; @@ -79,8 +81,10 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class GroupByStrategyV2 implements GroupByStrategy @@ -448,12 +452,32 @@ public Sequence processSubtotalsSpec( queryDimNames.add(dimSpec.getOutputName()); } + // Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec. + Set aggsAndPostAggs = null; + if (queryWithoutSubtotalsSpec.getLimitSpec() != null && !(queryWithoutSubtotalsSpec.getLimitSpec() instanceof NoopLimitSpec)) { + aggsAndPostAggs = getAggregatorAndPostAggregatorNames(queryWithoutSubtotalsSpec); + } + for (List subtotalSpec : subtotals) { - GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec.withDimensionSpecs( - subtotalSpec.stream() - .map(queryDimensionSpecs::get) - .collect(Collectors.toList()) - ); + + // Create appropriate LimitSpec for subtotal query + LimitSpec subtotalQueryLimitSpec = NoopLimitSpec.instance(); + if (queryWithoutSubtotalsSpec.getLimitSpec() != null && !(queryWithoutSubtotalsSpec.getLimitSpec() instanceof NoopLimitSpec)) { + Set columns = new HashSet(aggsAndPostAggs); + columns.addAll(subtotalSpec); + + subtotalQueryLimitSpec = queryWithoutSubtotalsSpec.getLimitSpec().filterColumns(columns); + } + + GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec + .withLimitSpec(subtotalQueryLimitSpec) + .withDimensionSpecs( + subtotalSpec.stream() + .map(queryDimensionSpecs::get) + .collect(Collectors.toList()) + ); + + if (Utils.isPrefix(subtotalSpec, queryDimNames)) { subtotalsResults.add( @@ -542,6 +566,24 @@ public Sequence run(QueryPlus queryPlus, Map responseC } } + private Set getAggregatorAndPostAggregatorNames(GroupByQuery query) + { + Set aggsAndPostAggs = new HashSet(); + if (query.getAggregatorSpecs() != null) { + for (AggregatorFactory af : query.getAggregatorSpecs()) { + aggsAndPostAggs.add(af.getName()); + } + } + + if (query.getPostAggregatorSpecs() != null) { + for (PostAggregator pa : query.getPostAggregatorSpecs()) { + aggsAndPostAggs.add(pa.getName()); + } + } + + return aggsAndPostAggs; + } + private int numMergeBuffersForSubtotalsSpec(GroupByQuery query) { List> subtotalSpecs = query.getSubtotalsSpec(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 7de048b9f8d9..e63e23acf5c8 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -7102,8 +7102,8 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() ImmutableList.of() )) .addOrderByColumn("idx") - .addOrderByColumn("alias") - .addOrderByColumn("market") +// .addOrderByColumn("alias") +// .addOrderByColumn("market") .setLimit(1) .build(); From 52b05ec3442655c37b2c2ffbd21161bff9d43d5e Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 23 Jul 2019 15:25:24 -0700 Subject: [PATCH 12/16] uncomment inadvertantly commented lines --- .../apache/druid/query/groupby/GroupByQueryRunnerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index e63e23acf5c8..7de048b9f8d9 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -7102,8 +7102,8 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() ImmutableList.of() )) .addOrderByColumn("idx") -// .addOrderByColumn("alias") -// .addOrderByColumn("market") + .addOrderByColumn("alias") + .addOrderByColumn("market") .setLimit(1) .build(); From d7d0210ee7363cf212006c47d3822794b5f186b9 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 2 Aug 2019 10:08:20 -0700 Subject: [PATCH 13/16] GroupByStrategyV2 changes --- .../groupby/strategy/GroupByStrategyV2.java | 193 ++++++++++++++---- 1 file changed, 152 insertions(+), 41 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 4c417b0b5a8b..8e215e01beb3 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -33,6 +33,7 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.collect.Utils; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -61,6 +62,8 @@ import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor; +import org.apache.druid.query.groupby.orderby.LimitSpec; +import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.resource.GroupByQueryResource; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.StorageAdapter; @@ -68,9 +71,13 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.BinaryOperator; +import java.util.stream.Collectors; public class GroupByStrategyV2 implements GroupByStrategy { @@ -109,7 +116,7 @@ public GroupByStrategyV2( public GroupByQueryResource prepareResource(GroupByQuery query) { final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) + - (query.getSubtotalsSpec() != null ? 1 : 0); + numMergeBuffersNeededForSubtotalsSpec(query); if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { throw new ResourceLimitExceededException( @@ -346,35 +353,41 @@ public Sequence processSubtotalsSpec( Sequence queryResult ) { - // Note: the approach used here is not always correct; see https://github.com/apache/incubator-druid/issues/8091. + // How it works? + // First we accumulate the result of top level base query aka queryResult arg inside a resultSupplierOne object. + // Next for each subtotalSpec + // If subtotalSpec is a prefix of top level dims then we iterate on rows in resultSupplierOne object which are still + // sorted by subtotalSpec, stream merge them and return. + // + // If subtotalSpec is not a prefix of top level dims then we create a resultSupplierTwo object filled with rows from + // resultSupplierOne object with only dims from subtotalSpec. Then we iterate on rows in resultSupplierTwo object which are + // of course sorted by subtotalSpec, stream merge them and return. // Keep a reference to resultSupplier outside the "try" so we can close it if something goes wrong // while creating the sequence. - GroupByRowProcessor.ResultSupplier resultSupplier = null; + GroupByRowProcessor.ResultSupplier resultSupplierOne = null; try { - GroupByQuery queryWithoutSubtotalsSpec = query.withSubtotalsSpec(null).withDimFilter(null); - List> subtotals = query.getSubtotalsSpec(); - - resultSupplier = GroupByRowProcessor.process( - queryWithoutSubtotalsSpec - .withAggregatorSpecs( - Lists.transform( - queryWithoutSubtotalsSpec.getAggregatorSpecs(), - AggregatorFactory::getCombiningFactory - ) - ) - .withDimensionSpecs( - Lists.transform( - queryWithoutSubtotalsSpec.getDimensions(), - dimSpec -> - new DefaultDimensionSpec( - dimSpec.getOutputName(), - dimSpec.getOutputName(), - dimSpec.getOutputType() - ) - ) - ), + GroupByQuery queryWithoutSubtotalsSpec = query + .withDimensionSpecs(query.getDimensions().stream().map( + dimSpec -> new DefaultDimensionSpec( + dimSpec.getOutputName(), + dimSpec.getOutputName(), + dimSpec.getOutputType() + )).collect(Collectors.toList()) + ) + .withAggregatorSpecs( + query.getAggregatorSpecs() + .stream() + .map(AggregatorFactory::getCombiningFactory) + .collect(Collectors.toList()) + ) + .withSubtotalsSpec(null) + .withDimFilter(null); + + + resultSupplierOne = GroupByRowProcessor.process( + queryWithoutSubtotalsSpec, queryWithoutSubtotalsSpec, queryResult, configSupplier.get(), @@ -383,8 +396,20 @@ public Sequence processSubtotalsSpec( processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes() ); + + List queryDimNames = queryWithoutSubtotalsSpec.getDimensions().stream().map(DimensionSpec::getOutputName) + .collect(Collectors.toList()); + + // Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec. + Set aggsAndPostAggs = null; + if (queryWithoutSubtotalsSpec.getLimitSpec() != null && !(queryWithoutSubtotalsSpec.getLimitSpec() instanceof NoopLimitSpec)) { + aggsAndPostAggs = getAggregatorAndPostAggregatorNames(queryWithoutSubtotalsSpec); + } + + List> subtotals = query.getSubtotalsSpec(); List> subtotalsResults = new ArrayList<>(subtotals.size()); + // Iterate through each subtotalSpec, build results for it and add to subtotalsResults for (List subtotalSpec : subtotals) { final ImmutableSet dimsInSubtotalSpec = ImmutableSet.copyOf(subtotalSpec); final List dimensions = query.getDimensions(); @@ -411,32 +436,118 @@ public Sequence processSubtotalsSpec( } } - GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec.withDimensionSpecs(newDimensions); - - final GroupByRowProcessor.ResultSupplier finalResultSupplier = resultSupplier; - subtotalsResults.add( - applyPostProcessing( - mergeResults( - (queryPlus, responseContext) -> finalResultSupplier.results(subtotalSpec), - subtotalQuery, - null - ), - subtotalQuery - ) - ); + // Create appropriate LimitSpec for subtotal query + LimitSpec subtotalQueryLimitSpec = NoopLimitSpec.instance(); + if (queryWithoutSubtotalsSpec.getLimitSpec() != null && !(queryWithoutSubtotalsSpec.getLimitSpec() instanceof NoopLimitSpec)) { + Set columns = new HashSet(aggsAndPostAggs); + columns.addAll(subtotalSpec); + + subtotalQueryLimitSpec = queryWithoutSubtotalsSpec.getLimitSpec().filterColumns(columns); + } + + GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec + .withDimensionSpecs(newDimensions) + .withLimitSpec(subtotalQueryLimitSpec); + + if (Utils.isPrefix(subtotalSpec, queryDimNames)) { + // Since subtotalSpec is a prefix of base query dimensions, so results from base query are also sorted + // by subtotalSpec as needed by stream merging. + subtotalsResults.add( + getSubtotalQueryResultFromSortedResultsSupplier(resultSupplierOne, subtotalSpec, subtotalQuery) + ); + } else { + // Since subtotalSpec is not a prefix of base query dimensions, so results from base query are not sorted + // by subtotalSpec. So we first add the result of base query into another resultSupplier which are sorted + // by subtotalSpec and then stream merge them. + + GroupByRowProcessor.ResultSupplier resultSupplierTwo = null; + try { + resultSupplierTwo = GroupByRowProcessor.process( + queryWithoutSubtotalsSpec, + subtotalQuery, + resultSupplierOne.results(subtotalSpec), + configSupplier.get(), + resource, + spillMapper, + processingConfig.getTmpDir(), + processingConfig.intermediateComputeSizeBytes() + ); + + subtotalsResults.add( + Sequences.withBaggage( + getSubtotalQueryResultFromSortedResultsSupplier(resultSupplierTwo, subtotalSpec, subtotalQuery), + resultSupplierTwo //this will close resources allocated by resultSupplierTwo after sequence read + ) + ); + } catch(Exception ex) { + CloseQuietly.close(resultSupplierTwo); + throw ex; + } + } } return Sequences.withBaggage( Sequences.concat(subtotalsResults), - resultSupplier + resultSupplierOne //this will close resources allocated by resultSupplierOne after sequence read ); } - catch (Exception ex) { - CloseQuietly.close(resultSupplier); + catch(Exception ex) { + CloseQuietly.close(resultSupplierOne); throw ex; } } + private Sequence getSubtotalQueryResultFromSortedResultsSupplier( + final GroupByRowProcessor.ResultSupplier baseResultsSupplier, + List dimsToInclude, GroupByQuery subtotalQuery + ) + { + return applyPostProcessing( + mergeResults( + (queryPlus, responseContext) -> baseResultsSupplier.results(dimsToInclude), + subtotalQuery, + null + ), + subtotalQuery + ); + } + + private Set getAggregatorAndPostAggregatorNames(GroupByQuery query) + { + Set aggsAndPostAggs = new HashSet(); + if (query.getAggregatorSpecs() != null) { + for (AggregatorFactory af : query.getAggregatorSpecs()) { + aggsAndPostAggs.add(af.getName()); + } + } + + if (query.getPostAggregatorSpecs() != null) { + for (PostAggregator pa : query.getPostAggregatorSpecs()) { + aggsAndPostAggs.add(pa.getName()); + } + } + + return aggsAndPostAggs; + } + + private int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query) + { + List> subtotalSpecs = query.getSubtotalsSpec(); + if (subtotalSpecs == null || subtotalSpecs.size() == 0) { + return 0; + } + + List queryDimOutputNames = query.getDimensions().stream().map(DimensionSpec::getOutputName).collect( + Collectors.toList()); + for (List subtotalSpec : subtotalSpecs) { + if (!Utils.isPrefix(subtotalSpec, queryDimOutputNames)) { + return 2; + } + } + + return 1; + } + @Override public QueryRunner mergeRunners( final ListeningExecutorService exec, From 1bfdc1bbe8ad2372b4d5607120e955c79037da79 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 2 Aug 2019 10:44:49 -0700 Subject: [PATCH 14/16] test changes wip --- .../query/groupby/GroupByQueryRunnerTest.java | 953 ++++++------------ 1 file changed, 301 insertions(+), 652 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index d1574885a60d..c9ca649ea72b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6385,7 +6385,7 @@ public void testSubqueryWithFirstLast() } @Test - public void testGroupByWithSubtotalsSpec() + public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() { // Cannot vectorize due to usage of expressions. cannotVectorize(); @@ -6397,513 +6397,430 @@ public void testGroupByWithSubtotalsSpec() GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.dataSource) .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) - .setVirtualColumns(new ExpressionVirtualColumn( - "alias", - "quality", - ValueType.STRING, - TestExprMacroTable.INSTANCE - )) + .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ValueType.STRING, TestExprMacroTable.INSTANCE)) .setDimensions(Lists.newArrayList( - new DefaultDimensionSpec("quality", "quality"), - new DefaultDimensionSpec("market", "market"), - new DefaultDimensionSpec("alias", "alias") + new DefaultDimensionSpec("market", "market2"), + new DefaultDimensionSpec("alias", "alias2") )) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory("idx", "index"), - new FloatSumAggregatorFactory("idxFloat", "indexFloat"), - new DoubleSumAggregatorFactory("idxDouble", "index") + new LongSumAggregatorFactory("idx", "index") ) ) .setGranularity(QueryRunnerTestHelper.dayGran) .setSubtotalsSpec(ImmutableList.of( - ImmutableList.of("alias"), - ImmutableList.of("market"), + ImmutableList.of("market2"), ImmutableList.of() )) .build(); - List expectedResults = Arrays.asList( + List expectedResults = Arrays.asList( makeRow( query, + "2011-04-01T00:00:00.000Z", + "market2", + "spot", + "rows", + 9L, + "idx", + 1102L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-01T00:00:00.000Z", + "market2", + "total_market", + "rows", + 2L, + "idx", + 2836L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-01T00:00:00.000Z", + "market2", + "upfront", + "rows", + 2L, + "idx", + 2681L + ), + + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", + "market2", + "spot", + "rows", + 9L, + "idx", + 1120L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", + "market2", + "total_market", + "rows", + 2L, + "idx", + 2514L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", + "market2", + "upfront", + "rows", + 2L, + "idx", + 2193L + ), + + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-01T00:00:00.000Z", + "rows", + 13L, + "idx", + 6619L + ), + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", + "rows", + 13L, + "idx", + 5827L + ) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "subtotal"); + } + + @Test + public void testGroupByWithSubtotalsSpecGeneral() + { + // Cannot vectorize due to usage of expressions. + cannotVectorize(); + + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ValueType.STRING, TestExprMacroTable.INSTANCE)) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "quality2"), + new DefaultDimensionSpec("market", "market2"), + new DefaultDimensionSpec("alias", "alias2") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setPostAggregatorSpecs( + Collections.singletonList( + new FieldAccessPostAggregator("idxPostAgg", "idx") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias2"), + ImmutableList.of("market2"), + ImmutableList.of() + )) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "automotive", "rows", 1L, "idx", 135L, - "idxFloat", - 135.88510131835938f, - "idxDouble", - 135.88510131835938d + "idxPostAgg", + 135L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "business", "rows", 1L, "idx", 118L, - "idxFloat", - 118.57034, - "idxDouble", - 118.57034 + "idxPostAgg", + 118L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "entertainment", "rows", 1L, "idx", 158L, - "idxFloat", - 158.747224, - "idxDouble", - 158.747224 + "idxPostAgg", + 158L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "health", "rows", 1L, "idx", 120L, - "idxFloat", - 120.134704, - "idxDouble", - 120.134704 + "idxPostAgg", + 120L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "mezzanine", "rows", 3L, "idx", 2870L, - "idxFloat", - 2871.8866900000003f, - "idxDouble", - 2871.8866900000003d + "idxPostAgg", + 2870L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "news", "rows", 1L, "idx", 121L, - "idxFloat", - 121.58358f, - "idxDouble", - 121.58358d + "idxPostAgg", + 121L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "premium", "rows", 3L, "idx", 2900L, - "idxFloat", - 2900.798647f, - "idxDouble", - 2900.798647d + "idxPostAgg", + 2900L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "technology", "rows", 1L, "idx", 78L, - "idxFloat", - 78.622547f, - "idxDouble", - 78.622547d + "idxPostAgg", + 78L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", - "alias", + "alias2", "travel", "rows", 1L, "idx", 119L, - "idxFloat", - 119.922742f, - "idxDouble", - 119.922742d + "idxPostAgg", + 119L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "automotive", "rows", 1L, "idx", 147L, - "idxFloat", - 147.42593f, - "idxDouble", - 147.42593d + "idxPostAgg", + 147L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "business", "rows", 1L, "idx", 112L, - "idxFloat", - 112.987027f, - "idxDouble", - 112.987027d + "idxPostAgg", + 112L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "entertainment", "rows", 1L, "idx", 166L, - "idxFloat", - 166.016049f, - "idxDouble", - 166.016049d + "idxPostAgg", + 166L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "health", "rows", 1L, "idx", 113L, - "idxFloat", - 113.446008f, - "idxDouble", - 113.446008d + "idxPostAgg", + 113L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "mezzanine", "rows", 3L, "idx", 2447L, - "idxFloat", - 2448.830613f, - "idxDouble", - 2448.830613d + "idxPostAgg", + 2447L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "news", "rows", 1L, "idx", 114L, - "idxFloat", - 114.290141f, - "idxDouble", - 114.290141d + "idxPostAgg", + 114L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "premium", "rows", 3L, "idx", 2505L, - "idxFloat", - 2506.415148f, - "idxDouble", - 2506.415148d + "idxPostAgg", + 2505L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "technology", "rows", 1L, "idx", 97L, - "idxFloat", - 97.387433f, - "idxDouble", - 97.387433d + "idxPostAgg", + 97L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", - "alias", + "alias2", "travel", "rows", 1L, "idx", 126L, - "idxFloat", - 126.411364f, - "idxDouble", - 126.411364d + "idxPostAgg", + 126L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "spot", - "idxDouble", - 643.043177, - "idxFloat", - 643.043212890625, "rows", - 5L, + 9L, "idx", - 640L + 1102L, + "idxPostAgg", + 1102L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "total_market", - "idxDouble", - 1314.839715, - "idxFloat", - 1314.8397, - "rows", - 1L, - "idx", - 1314L - ), - makeRow( - query, - "2011-04-01T00:00:00.000Z", - "market", - "upfront", - "idxDouble", - 1447.34116, - "idxFloat", - 1447.3412, - "rows", - 1L, - "idx", - 1447L - ), - makeRow( - query, - "2011-04-01T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 266.090949, - "idxFloat", - 266.0909423828125, "rows", 2L, "idx", - 265L + 2836L, + "idxPostAgg", + 2836L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", - "total_market", - "idxDouble", - 1522.043733, - "idxFloat", - 1522.0437, - "rows", - 1L, - "idx", - 1522L - ), - makeRow( - query, - "2011-04-01T00:00:00.000Z", - "market", + "market2", "upfront", - "idxDouble", - 1234.247546, - "idxFloat", - 1234.2476, - "rows", - 1L, - "idx", - 1234L - ), - makeRow( - query, - "2011-04-01T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 198.545289, - "idxFloat", - 198.5452880859375, "rows", 2L, "idx", - 197L + 2681L, + "idxPostAgg", + 2681L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "spot", - "idxDouble", - 650.806953, - "idxFloat", - 650.8069458007812, "rows", - 5L, + 9L, "idx", - 648L + 1120L, + "idxPostAgg", + 1120L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "total_market", - "idxDouble", - 1193.556278, - "idxFloat", - 1193.5563, - "rows", - 1L, - "idx", - 1193L - ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "market", - "upfront", - "idxDouble", - 1144.342401, - "idxFloat", - 1144.3424, - "rows", - 1L, - "idx", - 1144L - ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 249.591647, - "idxFloat", - 249.59164428710938, "rows", 2L, "idx", - 249L + 2514L, + "idxPostAgg", + 2514L ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "market", - "total_market", - "idxDouble", - 1321.375057, - "idxFloat", - 1321.375, - "rows", - 1L, - "idx", - 1321L - ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "upfront", - "idxDouble", - 1049.738585, - "idxFloat", - 1049.7385, - "rows", - 1L, - "idx", - 1049L - ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 223.798797, - "idxFloat", - 223.79879760742188, "rows", 2L, "idx", - 223L + 2193L, + "idxPostAgg", + 2193L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 6626.151575318359, - "idxFloat", - 6626.152f, "rows", 13L, "idx", + 6619L, + "idxPostAgg", 6619L ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "idxDouble", - 5833.209713, - "idxFloat", - 5833.209f, + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-02T00:00:00.000Z", "rows", 13L, "idx", + 5827L, + "idxPostAgg", 5827L ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal"); } @@ -6911,22 +6828,15 @@ public void testGroupByWithSubtotalsSpec() @Test public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() { - // Cannot vectorize due to expression virtual columns. - cannotVectorize(); - if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { return; } - GroupByQuery query = makeQueryBuilder() + GroupByQuery query = GroupByQuery + .builder() .setDataSource(QueryRunnerTestHelper.dataSource) .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) - .setVirtualColumns(new ExpressionVirtualColumn( - "alias", - "quality", - ValueType.STRING, - TestExprMacroTable.INSTANCE - )) + .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ValueType.STRING, TestExprMacroTable.INSTANCE)) .setDimensions(Lists.newArrayList( new DefaultDimensionSpec("quality", "quality"), new DefaultDimensionSpec("market", "market"), @@ -6948,9 +6858,8 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() )) .build(); - List expectedResults = Arrays.asList( - makeRow( - query, + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", "alias_renamed", "automotive", @@ -6963,8 +6872,7 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() "idxDouble", 135.88510131835938d ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02", "alias_renamed", "automotive", @@ -6978,8 +6886,7 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() 147.42593d ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", "rows", 1L, @@ -6990,8 +6897,7 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() "idxDouble", 135.88510131835938d ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", "rows", 1L, @@ -7004,7 +6910,7 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal"); } @@ -7020,32 +6926,25 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) .setDimensions(Lists.newArrayList( new DefaultDimensionSpec("qualityLong", "ql", ValueType.LONG), - new DefaultDimensionSpec("market", "market") + new DefaultDimensionSpec("market", "market2") )) .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory("idx", "index"), - new FloatSumAggregatorFactory("idxFloat", "indexFloat"), - new DoubleSumAggregatorFactory("idxDouble", "index") + new LongSumAggregatorFactory("idx", "index") ) ) .setGranularity(QueryRunnerTestHelper.dayGran) .setSubtotalsSpec(ImmutableList.of( ImmutableList.of("ql"), - ImmutableList.of("market"), + ImmutableList.of("market2"), ImmutableList.of() )) .build(); - List expectedResults = Arrays.asList( - makeRow( - query, + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 135.885094, - "idxFloat", - 135.8851, "ql", 1000L, "rows", @@ -7053,13 +6952,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 135L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 118.57034, - "idxFloat", - 118.57034, "ql", 1100L, "rows", @@ -7067,13 +6961,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 118L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 158.747224, - "idxFloat", - 158.74722, "ql", 1200L, "rows", @@ -7081,13 +6970,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 158L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 120.134704, - "idxFloat", - 120.134705, "ql", 1300L, "rows", @@ -7095,13 +6979,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 120L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 2871.8866900000003, - "idxFloat", - 2871.88671875, "ql", 1400L, "rows", @@ -7109,13 +6988,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2870L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 121.583581, - "idxFloat", - 121.58358, "ql", 1500L, "rows", @@ -7123,13 +6997,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 121L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 2900.798647, - "idxFloat", - 2900.798583984375, "ql", 1600L, "rows", @@ -7137,13 +7006,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2900L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 78.622547, - "idxFloat", - 78.62254, "ql", 1700L, "rows", @@ -7151,13 +7015,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 78L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 119.922742, - "idxFloat", - 119.922745, "ql", 1800L, "rows", @@ -7165,13 +7024,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 119L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 147.425935, - "idxFloat", - 147.42593, "ql", 1000L, "rows", @@ -7179,13 +7033,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 147L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 112.987027, - "idxFloat", - 112.98703, "ql", 1100L, "rows", @@ -7193,13 +7042,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 112L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 166.016049, - "idxFloat", - 166.01605, "ql", 1200L, "rows", @@ -7207,13 +7051,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 166L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 113.446008, - "idxFloat", - 113.44601, "ql", 1300L, "rows", @@ -7221,13 +7060,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 113L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 2448.830613, - "idxFloat", - 2448.83056640625, "ql", 1400L, "rows", @@ -7235,13 +7069,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2447L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 114.290141, - "idxFloat", - 114.29014, "ql", 1500L, "rows", @@ -7249,13 +7078,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 114L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 2506.415148, - "idxFloat", - 2506.4150390625, "ql", 1600L, "rows", @@ -7263,13 +7087,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2505L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 97.387433, - "idxFloat", - 97.387436, "ql", 1700L, "rows", @@ -7277,13 +7096,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 97L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 126.411364, - "idxFloat", - 126.41136, "ql", 1800L, "rows", @@ -7292,222 +7106,72 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() 126L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "spot", - "idxDouble", - 643.043177, - "idxFloat", - 643.043212890625, "rows", - 5L, + 9L, "idx", - 640L + 1102L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "total_market", - "idxDouble", - 1314.839715, - "idxFloat", - 1314.8397, - "rows", - 1L, - "idx", - 1314L - ), - makeRow( - query, - "2011-04-01T00:00:00.000Z", - "market", - "upfront", - "idxDouble", - 1447.34116, - "idxFloat", - 1447.3412, - "rows", - 1L, - "idx", - 1447L - ), - makeRow( - query, - "2011-04-01T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 266.090949, - "idxFloat", - 266.0909423828125, "rows", 2L, "idx", - 265L - ), - makeRow( - query, - "2011-04-01T00:00:00.000Z", - "market", - "total_market", - "idxDouble", - 1522.043733, - "idxFloat", - 1522.0437, - "rows", - 1L, - "idx", - 1522L + 2836L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "market", + "market2", "upfront", - "idxDouble", - 1234.247546, - "idxFloat", - 1234.2476, - "rows", - 1L, - "idx", - 1234L - ), - makeRow( - query, - "2011-04-01T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 198.545289, - "idxFloat", - 198.5452880859375, "rows", 2L, "idx", - 197L + 2681L ), - makeRow( - query, + + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "spot", - "idxDouble", - 650.806953, - "idxFloat", - 650.8069458007812, "rows", - 5L, + 9L, "idx", - 648L + 1120L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "total_market", - "idxDouble", - 1193.556278, - "idxFloat", - 1193.5563, - "rows", - 1L, - "idx", - 1193L - ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "market", - "upfront", - "idxDouble", - 1144.342401, - "idxFloat", - 1144.3424, - "rows", - 1L, - "idx", - 1144L - ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 249.591647, - "idxFloat", - 249.59164428710938, "rows", 2L, "idx", - 249L - ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "market", - "total_market", - "idxDouble", - 1321.375057, - "idxFloat", - 1321.375, - "rows", - 1L, - "idx", - 1321L + 2514L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "market", + "market2", "upfront", - "idxDouble", - 1049.738585, - "idxFloat", - 1049.7385, - "rows", - 1L, - "idx", - 1049L - ), - makeRow( - query, - "2011-04-02T00:00:00.000Z", - "market", - "spot", - "idxDouble", - 223.798797, - "idxFloat", - 223.79879760742188, "rows", 2L, "idx", - 223L + 2193L ), - makeRow( - query, + + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 6626.151569, - "idxFloat", - 6626.1513671875, "rows", 13L, "idx", 6619L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-02T00:00:00.000Z", - "idxDouble", - 5833.209717999999, - "idxFloat", - 5833.20849609375, "rows", 13L, "idx", @@ -7515,7 +7179,7 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-long-dim"); } @@ -7536,9 +7200,7 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() .setAggregatorSpecs( Arrays.asList( QueryRunnerTestHelper.rowsCount, - new LongSumAggregatorFactory("idx", "index"), - new FloatSumAggregatorFactory("idxFloat", "indexFloat"), - new DoubleSumAggregatorFactory("idxDouble", "index") + new LongSumAggregatorFactory("idx", "index") ) ) .setGranularity(QueryRunnerTestHelper.dayGran) @@ -7547,46 +7209,33 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() ImmutableList.of("market"), ImmutableList.of() )) - .addOrderByColumn("idxDouble") + .addOrderByColumn("idx") + .addOrderByColumn("alias") + .addOrderByColumn("market") .setLimit(1) .build(); - List expectedResults = Arrays.asList( - makeRow( - query, + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01", "alias", "technology", "rows", 1L, "idx", - 78L, - "idxFloat", - 78.622547f, - "idxDouble", - 78.622547d + 78L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", "market", "spot", - "idxDouble", - 198.545289, - "idxFloat", - 198.5452880859375, "rows", - 2L, + 9L, "idx", - 197L + 1102L ), - makeRow( - query, + GroupByQueryRunnerTestHelper.createExpectedRow( "2011-04-01T00:00:00.000Z", - "idxDouble", - 6626.151575318359, - "idxFloat", - 6626.152f, "rows", 13L, "idx", @@ -7594,7 +7243,7 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit"); } From e4b1fb7d94d301d8364d2abe04ccef65ca179b64 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 2 Aug 2019 12:43:19 -0700 Subject: [PATCH 15/16] more fixes to handle merge buffer closing and limit spec --- .../groupby/strategy/GroupByStrategyV2.java | 90 ++++---- .../query/groupby/GroupByQueryRunnerTest.java | 218 ++++++++++++------ 2 files changed, 193 insertions(+), 115 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 8e215e01beb3..13f36703d919 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -21,9 +21,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import org.apache.druid.collections.BlockingPool; @@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.collect.Utils; import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DataSource; @@ -71,7 +72,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -446,43 +446,37 @@ public Sequence processSubtotalsSpec( } GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec - .withDimensionSpecs(newDimensions) - .withLimitSpec(subtotalQueryLimitSpec); + .withLimitSpec(subtotalQueryLimitSpec) + .withDimensionSpecs(newDimensions); + final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal = resultSupplierOne; if (Utils.isPrefix(subtotalSpec, queryDimNames)) { // Since subtotalSpec is a prefix of base query dimensions, so results from base query are also sorted // by subtotalSpec as needed by stream merging. subtotalsResults.add( - getSubtotalQueryResultFromSortedResultsSupplier(resultSupplierOne, subtotalSpec, subtotalQuery) + processSubtotalsResultAndOptionallyClose(() -> resultSupplierOneFinal, subtotalSpec, subtotalQuery, false) ); } else { // Since subtotalSpec is not a prefix of base query dimensions, so results from base query are not sorted // by subtotalSpec. So we first add the result of base query into another resultSupplier which are sorted // by subtotalSpec and then stream merge them. - GroupByRowProcessor.ResultSupplier resultSupplierTwo = null; - try { - resultSupplierTwo = GroupByRowProcessor.process( - queryWithoutSubtotalsSpec, - subtotalQuery, - resultSupplierOne.results(subtotalSpec), - configSupplier.get(), - resource, - spillMapper, - processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes() - ); + // Also note, we can't create the ResultSupplier eagerly here or as we don't want to eagerly allocate + // merge buffers for processing subtotal. + Supplier resultSupplierTwo = () -> GroupByRowProcessor.process( + queryWithoutSubtotalsSpec, + subtotalQuery, + resultSupplierOneFinal.results(subtotalSpec), + configSupplier.get(), + resource, + spillMapper, + processingConfig.getTmpDir(), + processingConfig.intermediateComputeSizeBytes() + ); - subtotalsResults.add( - Sequences.withBaggage( - getSubtotalQueryResultFromSortedResultsSupplier(resultSupplierTwo, subtotalSpec, subtotalQuery), - resultSupplierTwo //this will close resources allocated by resultSupplierTwo after sequence read - ) - ); - } catch(Exception ex) { - CloseQuietly.close(resultSupplierTwo); - throw ex; - } + subtotalsResults.add( + processSubtotalsResultAndOptionallyClose(resultSupplierTwo, subtotalSpec, subtotalQuery, true) + ); } } @@ -491,25 +485,43 @@ public Sequence processSubtotalsSpec( resultSupplierOne //this will close resources allocated by resultSupplierOne after sequence read ); } - catch(Exception ex) { + catch (Exception ex) { CloseQuietly.close(resultSupplierOne); throw ex; } } - private Sequence getSubtotalQueryResultFromSortedResultsSupplier( - final GroupByRowProcessor.ResultSupplier baseResultsSupplier, - List dimsToInclude, GroupByQuery subtotalQuery + private Sequence processSubtotalsResultAndOptionallyClose( + Supplier baseResultsSupplier, + List dimsToInclude, + GroupByQuery subtotalQuery, + boolean closeOnSequenceRead ) { - return applyPostProcessing( - mergeResults( - (queryPlus, responseContext) -> baseResultsSupplier.results(dimsToInclude), - subtotalQuery, - null - ), - subtotalQuery - ); + // This closes the ResultSupplier in case of any exception here or arranges for it to be closed + // on sequence read if closeOnSequenceRead is true. + try { + Supplier memoizedSupplier = Suppliers.memoize(baseResultsSupplier); + return applyPostProcessing( + mergeResults( + (queryPlus, responseContext) -> + new LazySequence<>( + () -> Sequences.withBaggage( + memoizedSupplier.get().results(dimsToInclude), + closeOnSequenceRead ? () -> CloseQuietly.close(memoizedSupplier.get()) : () -> {} + ) + ), + subtotalQuery, + null + ), + subtotalQuery + ); + + } + catch (Exception ex) { + //CloseQuietly.close(baseResultsSupplier.get()); + throw ex; + } } private Set getAggregatorAndPostAggregatorNames(GroupByQuery query) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index c9ca649ea72b..6187804a6ede 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6415,7 +6415,7 @@ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() )) .build(); - List expectedResults = Arrays.asList( + List expectedResults = Arrays.asList( makeRow( query, "2011-04-01T00:00:00.000Z", @@ -6426,7 +6426,8 @@ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() "idx", 1102L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market2", "total_market", @@ -6435,7 +6436,8 @@ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() "idx", 2836L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market2", "upfront", @@ -6445,7 +6447,8 @@ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() 2681L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "spot", @@ -6454,7 +6457,8 @@ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() "idx", 1120L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "total_market", @@ -6463,7 +6467,8 @@ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() "idx", 2514L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "upfront", @@ -6473,14 +6478,16 @@ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() 2193L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "rows", 13L, "idx", 6619L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "rows", 13L, @@ -6489,7 +6496,7 @@ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal"); } @@ -6531,8 +6538,9 @@ public void testGroupByWithSubtotalsSpecGeneral() )) .build(); - List expectedResults = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow( + List expectedResults = Arrays.asList( + makeRow( + query, "2011-04-01", "alias2", "automotive", @@ -6543,7 +6551,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 135L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01", "alias2", "business", @@ -6554,7 +6563,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 118L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01", "alias2", "entertainment", @@ -6565,7 +6575,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 158L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01", "alias2", "health", @@ -6576,7 +6587,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 120L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01", "alias2", "mezzanine", @@ -6587,7 +6599,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 2870L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01", "alias2", "news", @@ -6598,7 +6611,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 121L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01", "alias2", "premium", @@ -6609,7 +6623,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 2900L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01", "alias2", "technology", @@ -6620,7 +6635,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 78L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01", "alias2", "travel", @@ -6632,7 +6648,8 @@ public void testGroupByWithSubtotalsSpecGeneral() 119L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "automotive", @@ -6643,7 +6660,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 147L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "business", @@ -6654,7 +6672,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 112L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "entertainment", @@ -6665,7 +6684,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 166L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "health", @@ -6676,7 +6696,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 113L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "mezzanine", @@ -6687,7 +6708,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 2447L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "news", @@ -6698,7 +6720,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 114L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "premium", @@ -6709,7 +6732,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 2505L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "technology", @@ -6720,7 +6744,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 97L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias2", "travel", @@ -6732,7 +6757,8 @@ public void testGroupByWithSubtotalsSpecGeneral() 126L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market2", "spot", @@ -6743,7 +6769,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 1102L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market2", "total_market", @@ -6754,7 +6781,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 2836L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market2", "upfront", @@ -6766,7 +6794,8 @@ public void testGroupByWithSubtotalsSpecGeneral() 2681L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "spot", @@ -6777,7 +6806,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 1120L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "total_market", @@ -6788,7 +6818,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 2514L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "upfront", @@ -6800,7 +6831,8 @@ public void testGroupByWithSubtotalsSpecGeneral() 2193L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "rows", 13L, @@ -6809,7 +6841,8 @@ public void testGroupByWithSubtotalsSpecGeneral() "idxPostAgg", 6619L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "rows", 13L, @@ -6820,7 +6853,7 @@ public void testGroupByWithSubtotalsSpecGeneral() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal"); } @@ -6858,8 +6891,9 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() )) .build(); - List expectedResults = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow( + List expectedResults = Arrays.asList( + makeRow( + query, "2011-04-01", "alias_renamed", "automotive", @@ -6872,7 +6906,8 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() "idxDouble", 135.88510131835938d ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02", "alias_renamed", "automotive", @@ -6886,7 +6921,8 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() 147.42593d ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "rows", 1L, @@ -6897,7 +6933,8 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() "idxDouble", 135.88510131835938d ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "rows", 1L, @@ -6910,7 +6947,7 @@ public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal"); } @@ -6942,8 +6979,9 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() )) .build(); - List expectedResults = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow( + List expectedResults = Arrays.asList( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1000L, @@ -6952,7 +6990,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 135L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1100L, @@ -6961,7 +7000,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 118L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1200L, @@ -6970,7 +7010,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 158L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1300L, @@ -6979,7 +7020,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 120L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1400L, @@ -6988,7 +7030,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2870L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1500L, @@ -6997,7 +7040,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 121L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1600L, @@ -7006,7 +7050,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2900L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1700L, @@ -7015,7 +7060,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 78L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "ql", 1800L, @@ -7024,7 +7070,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 119L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1000L, @@ -7033,7 +7080,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 147L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1100L, @@ -7042,7 +7090,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 112L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1200L, @@ -7051,7 +7100,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 166L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1300L, @@ -7060,7 +7110,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 113L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1400L, @@ -7069,7 +7120,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2447L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1500L, @@ -7078,7 +7130,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 114L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1600L, @@ -7087,7 +7140,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2505L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1700L, @@ -7096,7 +7150,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 97L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "ql", 1800L, @@ -7106,7 +7161,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() 126L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market2", "spot", @@ -7115,7 +7171,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 1102L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market2", "total_market", @@ -7124,7 +7181,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2836L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market2", "upfront", @@ -7134,7 +7192,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() 2681L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "spot", @@ -7143,7 +7202,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 1120L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "total_market", @@ -7152,7 +7212,8 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() "idx", 2514L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "market2", "upfront", @@ -7163,14 +7224,16 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "rows", 13L, "idx", 6619L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-02T00:00:00.000Z", "rows", 13L, @@ -7179,7 +7242,7 @@ public void testGroupByWithSubtotalsSpecWithLongDimensionColumn() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-long-dim"); } @@ -7215,8 +7278,9 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() .setLimit(1) .build(); - List expectedResults = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow( + List expectedResults = Arrays.asList( + makeRow( + query, "2011-04-01", "alias", "technology", @@ -7225,7 +7289,8 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() "idx", 78L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "market", "spot", @@ -7234,7 +7299,8 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() "idx", 1102L ), - GroupByQueryRunnerTestHelper.createExpectedRow( + makeRow( + query, "2011-04-01T00:00:00.000Z", "rows", 13L, @@ -7243,7 +7309,7 @@ public void testGroupByWithSubtotalsSpecWithOrderLimit() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit"); } From c7ea70a85fc54b81236b31a14ae08e20f96b648a Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 5 Aug 2019 09:18:00 -0700 Subject: [PATCH 16/16] uncomment line commented accidentally --- .../apache/druid/query/groupby/strategy/GroupByStrategyV2.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 13f36703d919..34efb62e5528 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -519,7 +519,7 @@ private Sequence processSubtotalsResultAndOptionallyClose( } catch (Exception ex) { - //CloseQuietly.close(baseResultsSupplier.get()); + CloseQuietly.close(baseResultsSupplier.get()); throw ex; } }