diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index 110d8bef6d7c..221794b91e8f 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -118,7 +118,7 @@ public Sequence apply(final Cursor cursor) @Override public RowIterator make() { - return new RowIterator(query, cursor, bufferHolder.get(), config.get()); + return new RowIterator(query, cursor, bufferHolder.get(), config.get(), false); } @Override @@ -143,15 +143,13 @@ public void close() throws IOException ); } - private static class RowUpdater + private static abstract class RowUpdater { private final ByteBuffer metricValues; private final BufferAggregator[] aggregators; private final PositionMaintainer positionMaintainer; - private final Map positions = Maps.newTreeMap(); - // GroupBy queries tend to do a lot of reads from this. We co-store a hash map to make those reads go faster. - private final Map positionsHash = Maps.newHashMap(); + protected final Map positions = Maps.newTreeMap(); public RowUpdater( ByteBuffer metricValues, @@ -164,15 +162,13 @@ public RowUpdater( this.positionMaintainer = positionMaintainer; } - public int getNumRows() - { + protected int getNumRows() { return positions.size(); } - public Map getPositions() - { - return positions; - } + protected abstract Map getPositions(); + + protected abstract void putPosition(ByteBuffer key, int position); private List updateValues( ByteBuffer key, @@ -205,7 +201,7 @@ private List updateValues( return retVal; } else { key.clear(); - Integer position = positionsHash.get(key); + Integer position = positions.get(key); int[] increments = positionMaintainer.getIncrements(); int thePosition; @@ -219,8 +215,8 @@ private List updateValues( return Lists.newArrayList(keyCopy); } - positions.put(keyCopy, position); - positionsHash.put(keyCopy, position); + putPosition(keyCopy, position); + thePosition = position; for (int i = 0; i < aggregators.length; ++i) { aggregators[i].init(metricValues, thePosition); @@ -238,6 +234,55 @@ private List updateValues( } } + private static class UnsortedRowUpdater extends RowUpdater + { + public UnsortedRowUpdater( + ByteBuffer metricValues, + BufferAggregator[] aggregators, + PositionMaintainer positionMaintainer + ) + { + super(metricValues, aggregators, positionMaintainer); + } + + protected final Map getPositions() + { + return positions; + } + + protected final void putPosition(ByteBuffer key, int position) + { + positions.put(key, position); + } + } + + private static class SortedUpdater extends RowUpdater + { + private final Map sortedPositions = Maps.newTreeMap(); + + public SortedUpdater( + ByteBuffer metricValues, + BufferAggregator[] aggregators, + PositionMaintainer positionMaintainer + ) + { + super(metricValues, aggregators, positionMaintainer); + } + + @Override + protected final Map getPositions() + { + return sortedPositions; + } + + @Override + protected final void putPosition(ByteBuffer key, int position) + { + positions.put(key, position); + sortedPositions.put(key, position); + } + } + private static class PositionMaintainer { private final int[] increments; @@ -292,6 +337,8 @@ private static class RowIterator implements CloseableIterator private final Cursor cursor; private final ByteBuffer metricsBuffer; private final int maxIntermediateRows; + private final GroupByQueryConfig config; + private final boolean sort; private final List dimensionSpecs; private final List dimensions; @@ -304,18 +351,25 @@ private static class RowIterator implements CloseableIterator private List unprocessedKeys; private Iterator delegate; - public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBuffer, GroupByQueryConfig config) + public RowIterator( + GroupByQuery query, + final Cursor cursor, + ByteBuffer metricsBuffer, + GroupByQueryConfig config, + boolean sort + ) { this.query = query; this.cursor = cursor; this.metricsBuffer = metricsBuffer; - this.maxIntermediateRows = Math.min( query.getContextValue( CTX_KEY_MAX_INTERMEDIATE_ROWS, config.getMaxIntermediateRows() ), config.getMaxIntermediateRows() ); + this.config = config; + this.sort = sort; unprocessedKeys = null; delegate = Iterators.emptyIterator(); @@ -362,7 +416,9 @@ public Row next() } final PositionMaintainer positionMaintainer = new PositionMaintainer(0, sizesRequired, metricsBuffer.remaining()); - final RowUpdater rowUpdater = new RowUpdater(metricsBuffer, aggregators, positionMaintainer); + final RowUpdater rowUpdater = sort + ? new SortedUpdater(metricsBuffer, aggregators, positionMaintainer) + : new UnsortedRowUpdater(metricsBuffer, aggregators, positionMaintainer); if (unprocessedKeys != null) { for (ByteBuffer key : unprocessedKeys) { final List unprocUnproc = rowUpdater.updateValues(key, ImmutableList.of());