diff --git a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java index 5580a46668b9..c9d10d25571f 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java @@ -126,6 +126,7 @@ private IncrementalIndex makeIncIndex() aggs, false, false, + true, maxRows ); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 4a94f8f5c04c..637dce3f077d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -43,6 +43,7 @@ public class GroupByQueryHelper { private static final String CTX_KEY_MAX_RESULTS = "maxResults"; + public final static String CTX_KEY_SORT_RESULTS = "sortResults"; public static Pair> createIndexAccumulatorPair( final GroupByQuery query, @@ -81,7 +82,9 @@ public String apply(DimensionSpec input) ); final IncrementalIndex index; - if (query.getContextBoolean("useOffheap", false)) { + final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true); + + if (query.getContextValue("useOffheap", false)) { index = new OffheapIncrementalIndex( // use granularity truncated min timestamp // since incoming truncated timestamps may precede timeStart @@ -90,6 +93,7 @@ public String apply(DimensionSpec input) aggs.toArray(new AggregatorFactory[aggs.size()]), false, true, + sortResults, Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()), bufferPool ); @@ -102,6 +106,7 @@ public String apply(DimensionSpec input) aggs.toArray(new AggregatorFactory[aggs.size()]), false, true, + sortResults, Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()) ); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 2b8824ec2258..a3c77245a832 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -161,7 +161,18 @@ private Sequence mergeGroupByResults( throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); } - final Sequence subqueryResult = mergeGroupByResults(subquery, runner, context); + final Sequence subqueryResult = mergeGroupByResults( + subquery.withOverriddenContext( + ImmutableMap.of( + //setting sort to false avoids unnecessary sorting while merging results. we only need to sort + //in the end when returning results to user. + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, + false + ) + ), + runner, + context + ); final Set aggs = Sets.newHashSet(); // Nested group-bys work by first running the inner query and then materializing the results in an incremental @@ -200,7 +211,14 @@ public boolean apply(AggregatorFactory agg) .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec())) .build(); - final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(innerQuery, subqueryResult); + final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex( + innerQuery.withOverriddenContext( + ImmutableMap.of( + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true + ) + ), + subqueryResult + ); //Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which //is ensured by QuerySegmentSpec. @@ -253,7 +271,10 @@ public Sequence apply(Interval interval) query.getContext() ).withOverriddenContext( ImmutableMap.of( - "finalize", false + "finalize", false, + //setting sort to false avoids unnecessary sorting while merging results. we only need to sort + //in the end when returning results to user. + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false ) ) , context diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 97888fd0ff14..b617f11082b8 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -71,6 +71,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -340,6 +341,7 @@ public int lookupId(String name) private final AggregatorType[] aggs; private final boolean deserializeComplexMetrics; private final boolean reportParseExceptions; + private final boolean sortFacts; private final Metadata metadata; private final Map metricDescs; @@ -374,7 +376,8 @@ public InputRow get() public IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, final boolean deserializeComplexMetrics, - final boolean reportParseExceptions + final boolean reportParseExceptions, + final boolean sortFacts ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -383,6 +386,7 @@ public IncrementalIndex( this.rowTransformers = new CopyOnWriteArrayList<>(); this.deserializeComplexMetrics = deserializeComplexMetrics; this.reportParseExceptions = reportParseExceptions; + this.sortFacts = sortFacts; this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics)); @@ -441,7 +445,7 @@ private DimDim newDimDim(String dimension, ValueType type) { // use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation protected abstract DimDim makeDimDim(String dimension, Object lock); - public abstract ConcurrentNavigableMap getFacts(); + public abstract ConcurrentMap getFacts(); public abstract boolean canAppendRow(); @@ -673,12 +677,20 @@ public int size() private long getMinTimeMillis() { - return getFacts().firstKey().getTimestamp(); + if (sortFacts) { + return ((ConcurrentNavigableMap) getFacts()).firstKey().getTimestamp(); + } else { + throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); + } } private long getMaxTimeMillis() { - return getFacts().lastKey().getTimestamp(); + if (sortFacts) { + return ((ConcurrentNavigableMap) getFacts()).lastKey().getTimestamp(); + } else { + throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); + } } private int[] getDimVals(final DimDim dimLookup, final List dimValues) @@ -831,7 +843,11 @@ public ColumnCapabilities getCapabilities(String column) public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) { - return getFacts().subMap(start, end); + if (sortFacts) { + return ((ConcurrentNavigableMap) getFacts()).subMap(start, end); + } else { + throw new UnsupportedOperationException("can't get subMap from unsorted facts data."); + } } public Metadata getMetadata() @@ -862,7 +878,14 @@ public Iterable iterableWithPostAggregations(final List pos public Iterator iterator() { final List dimensions = getDimensions(); - final ConcurrentNavigableMap facts = descending ? getFacts().descendingMap() : getFacts(); + + Map facts = null; + if (descending && sortFacts) { + facts = ((ConcurrentNavigableMap) getFacts()).descendingMap(); + } else { + facts = getFacts(); + } + return Iterators.transform( facts.entrySet().iterator(), new Function, Row>() diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 47f4c5ca4f1c..7aabc4ac06b4 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -38,7 +38,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -51,7 +52,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex private final List> aggBuffers = new ArrayList<>(); private final List indexAndOffsets = new ArrayList<>(); - private final ConcurrentNavigableMap facts; + private final ConcurrentMap facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); @@ -71,14 +72,20 @@ public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean sortFacts, int maxRowCount, StupidPool bufferPool ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts); this.maxRowCount = maxRowCount; this.bufferPool = bufferPool; - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + + if (sortFacts) { + this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + } else { + this.facts = new ConcurrentHashMap<>(); + } //check that stupid pool gives buffers that can hold at least one row's aggregators ResourceHolder bb = bufferPool.take(); @@ -100,6 +107,7 @@ public OffheapIncrementalIndex( final AggregatorFactory[] metrics, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean sortFacts, int maxRowCount, StupidPool bufferPool ) @@ -111,6 +119,7 @@ public OffheapIncrementalIndex( .build(), deserializeComplexMetrics, reportParseExceptions, + sortFacts, maxRowCount, bufferPool ); @@ -131,13 +140,14 @@ public OffheapIncrementalIndex( .build(), true, true, + true, maxRowCount, bufferPool ); } @Override - public ConcurrentNavigableMap getFacts() + public ConcurrentMap getFacts() { return facts; } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 5f4f58e0e182..a19cb30a5323 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -33,13 +33,11 @@ import io.druid.segment.FloatColumnSelector; import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.ValueType; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -48,7 +46,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex { private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); - private final ConcurrentNavigableMap facts; + private final ConcurrentMap facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); protected final int maxRowCount; private volatile Map selectors; @@ -59,12 +57,18 @@ public OnheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean sortFacts, int maxRowCount ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts); this.maxRowCount = maxRowCount; - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + + if (sortFacts) { + this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + } else { + this.facts = new ConcurrentHashMap<>(); + } } public OnheapIncrementalIndex( @@ -73,6 +77,7 @@ public OnheapIncrementalIndex( final AggregatorFactory[] metrics, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean sortFacts, int maxRowCount ) { @@ -83,6 +88,7 @@ public OnheapIncrementalIndex( .build(), deserializeComplexMetrics, reportParseExceptions, + sortFacts, maxRowCount ); } @@ -101,6 +107,7 @@ public OnheapIncrementalIndex( .build(), true, true, + true, maxRowCount ); } @@ -111,11 +118,11 @@ public OnheapIncrementalIndex( int maxRowCount ) { - this(incrementalIndexSchema, true, reportParseExceptions, maxRowCount); + this(incrementalIndexSchema, true, reportParseExceptions, true, maxRowCount); } @Override - public ConcurrentNavigableMap getFacts() + public ConcurrentMap getFacts() { return facts; } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index c56b4521bfc1..9c68fade306a 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -99,6 +99,7 @@ public static void setupClass() throws Exception }, true, true, + true, 5000 ); diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 163a4a3b886a..9b5a20df9016 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -71,7 +71,6 @@ import io.druid.segment.Segment; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; @@ -311,7 +310,7 @@ public void createIndex( List toMerge = new ArrayList<>(); try { - index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount); + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount); while (rows.hasNext()) { Object row = rows.next(); if (!index.canAppendRow()) { @@ -319,7 +318,7 @@ public void createIndex( toMerge.add(tmp); indexMerger.persist(index, tmp, new IndexSpec()); index.close(); - index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount); + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount); } if (row instanceof String && parser instanceof StringInputRowParser) { //Note: this is required because StringInputRowParser is InputRowParser as opposed to diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index fb219618077b..7d2676560b3b 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -120,7 +120,7 @@ private void createTestIndex(File segmentDir) throws Exception IncrementalIndex index = null; try { - index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, 5000); + index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, true, 5000); for (String line : rows) { index.add(parser.parse(line)); }