From 1d37863cca7e00c1f2726722735afc426d3a1313 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 21 Apr 2016 15:54:42 -0700 Subject: [PATCH] GroupBy: When possible, sort results on read rather than continuously during insert. Leans on the logic from #2571 with respect to deciding when to sort and when not to sort. --- .../query/GroupByParallelQueryRunner.java | 6 ++- .../query/groupby/GroupByQueryHelper.java | 7 ++- .../groupby/GroupByQueryQueryToolChest.java | 16 ++++-- .../groupby/GroupByQueryRunnerFactory.java | 6 ++- .../segment/incremental/IncrementalIndex.java | 50 +++++++++++++++---- 5 files changed, 69 insertions(+), 16 deletions(-) diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 125b61774f34..d843d774144b 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -175,7 +175,11 @@ public Void call() throws Exception return new ResourceClosingSequence( Sequences.simple( Iterables.transform( - indexAccumulatorPair.lhs.iterableWithPostAggregations(null, query.isDescending()), + indexAccumulatorPair.lhs.iterableWithPostAggregations( + null, + false, + GroupByQueryHelper.isSortResults(query) + ), new Function() { @Override diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 637dce3f077d..e2e3e15eb4ab 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -45,6 +45,11 @@ public class GroupByQueryHelper private static final String CTX_KEY_MAX_RESULTS = "maxResults"; public final static String CTX_KEY_SORT_RESULTS = "sortResults"; + public static boolean isSortResults(GroupByQuery query) + { + return query.getContextValue(CTX_KEY_SORT_RESULTS, true); + } + public static Pair> createIndexAccumulatorPair( final GroupByQuery query, final GroupByQueryConfig config, @@ -82,7 +87,7 @@ public String apply(DimensionSpec input) ); final IncrementalIndex index; - final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true); + final boolean sortResults = isSortResults(query); if (query.getContextValue("useOffheap", false)) { index = new OffheapIncrementalIndex( diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index a3c77245a832..01119c018fdb 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -255,7 +255,13 @@ public Sequence apply(Interval interval) } else { final IncrementalIndex index = makeIncrementalIndex( - query, runner.run( + query.withOverriddenContext( + ImmutableMap.of( + // Don't force sorting here, postAggregate will do it. + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false + ) + ), + runner.run( new GroupByQuery( query.getDataSource(), query.getQuerySegmentSpec(), @@ -287,7 +293,11 @@ public Sequence apply(Interval interval) private Sequence postAggregate(final GroupByQuery query, IncrementalIndex index) { return Sequences.map( - Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs(), query.isDescending())), + Sequences.simple(index.iterableWithPostAggregations( + query.getPostAggregatorSpecs(), + false, + GroupByQueryHelper.isSortResults(query) + )), new Function() { @Override @@ -428,7 +438,7 @@ public Sequence run(Query query, Map responseContext) return runner.run(query, responseContext); } GroupByQuery groupByQuery = (GroupByQuery) query; - if (groupByQuery.getDimFilter() != null){ + if (groupByQuery.getDimFilter() != null) { groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize()); } final GroupByQuery delegateGroupByQuery = groupByQuery; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index dc93702e5871..b326b17ab15a 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -174,7 +174,11 @@ public Void call() throws Exception return Sequences.simple(bySegmentAccumulatorPair.lhs); } - return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null, query.isDescending())); + return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations( + null, + false, + GroupByQueryHelper.isSortResults(queryParam) + )); } }; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 673a79d66868..32e81ee0e700 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -434,7 +434,8 @@ public IncrementalIndex( } } - private DimDim newDimDim(String dimension, ValueType type) { + private DimDim newDimDim(String dimension, ValueType type) + { DimDim newDimDim; switch (type) { case LONG: @@ -561,7 +562,8 @@ public Map getDimensionDescs() * * @return the number of rows in the data set after adding the InputRow */ - public int add(InputRow row) throws IndexSizeExceededException { + public int add(InputRow row) throws IndexSizeExceededException + { TimeAndDims key = toTimeAndDims(row); final int rv = addToFacts( metrics, @@ -820,7 +822,12 @@ public void loadDimensionIterable(Iterable oldDimensionOrder) @GuardedBy("dimensionDescs") private DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities) { - DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim, capabilities.getType()), capabilities); + DimensionDesc desc = new DimensionDesc( + dimensionDescs.size(), + dim, + newDimDim(dim, capabilities.getType()), + capabilities + ); if (dimValues.size() != desc.getIndex()) { throw new ISE("dimensionDescs and dimValues for [%s] is out of sync!!", dim); } @@ -877,10 +884,14 @@ private static AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] a @Override public Iterator iterator() { - return iterableWithPostAggregations(null, false).iterator(); + return iterableWithPostAggregations(null, false, false).iterator(); } - public Iterable iterableWithPostAggregations(final List postAggs, final boolean descending) + public Iterable iterableWithPostAggregations( + final List postAggs, + final boolean descending, + final boolean sorted + ) { return new Iterable() { @@ -889,15 +900,34 @@ public Iterator iterator() { final List dimensions = getDimensions(); - Map facts = null; - if (descending && sortFacts) { - facts = ((ConcurrentNavigableMap) getFacts()).descendingMap(); + final Iterable> facts; + if (sorted && sortFacts) { + facts = descending ? ((ConcurrentNavigableMap) getFacts()).descendingMap().entrySet() + : getFacts().entrySet(); + } else if (sorted) { + // Materialize and sort + final Comparator comparator = descending + ? Ordering.from(dimsComparator()).reverse() + : dimsComparator(); + List> factsList = Lists.newArrayList(getFacts().entrySet()); + Collections.sort( + factsList, + new Comparator>() + { + @Override + public int compare(Map.Entry a, Map.Entry b) + { + return comparator.compare(a.getKey(), b.getKey()); + } + } + ); + facts = factsList; } else { - facts = getFacts(); + facts = getFacts().entrySet(); } return Iterators.transform( - facts.entrySet().iterator(), + facts.iterator(), new Function, Row>() { @Override