From 7cd82903e51d361d453da0d35d2d7d235136f8dd Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 8 Jun 2016 23:14:53 +0900 Subject: [PATCH] Replace ByteBuffer with int[] for group-by key --- .../query/groupby/GroupByQueryEngine.java | 91 ++++++++++++------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index 110d8bef6d7c..7bdf9b9f5809 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -52,11 +52,10 @@ import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -143,15 +142,38 @@ public void close() throws IOException ); } + // for hash map + private static class IntArrayWrapper + { + private final int[] array; + + private IntArrayWrapper(int[] array) + { + this.array = array; + } + + @Override + public final int hashCode() + { + return Arrays.hashCode(array); + } + + @Override + public final boolean equals(Object o) + { + return Arrays.equals(array, ((IntArrayWrapper) o).array); + } + } + private static class RowUpdater { private final ByteBuffer metricValues; private final BufferAggregator[] aggregators; private final PositionMaintainer positionMaintainer; - private final Map positions = Maps.newTreeMap(); + private final Map positions = Maps.newTreeMap(Ints.lexicographicalComparator()); // GroupBy queries tend to do a lot of reads from this. We co-store a hash map to make those reads go faster. - private final Map positionsHash = Maps.newHashMap(); + private final Map positionsHash = Maps.newHashMap(); public RowUpdater( ByteBuffer metricValues, @@ -169,31 +191,35 @@ public int getNumRows() return positions.size(); } - public Map getPositions() + public Map getPositions() { return positions; } - private List updateValues( - ByteBuffer key, + private List updateValues( + int[] key, List dims ) { - if (dims.size() > 0) { - List retVal = null; - List unaggregatedBuffers = null; + int size = dims.size(); + int index = key.length - size; + if (size > 0) { + List retVal = null; + List unaggregatedBuffers = null; final DimensionSelector dimSelector = dims.get(0); final IndexedInts row = dimSelector.getRow(); if (row == null || row.size() == 0) { - ByteBuffer newKey = key.duplicate(); - newKey.putInt(dimSelector.getValueCardinality()); - unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size())); + key[index] = dimSelector.getValueCardinality(); + unaggregatedBuffers = updateValues(key, dims.subList(1, size)); + } else if (row.size() == 1) { + key[index] = row.get(0); + unaggregatedBuffers = updateValues(key, dims.subList(1, size)); } else { for (Integer dimValue : row) { - ByteBuffer newKey = key.duplicate(); - newKey.putInt(dimValue); - unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size())); + int[] newKey = Arrays.copyOf(key, key.length); + newKey[index] = dimValue; + unaggregatedBuffers = updateValues(newKey, dims.subList(1, size)); } } if (unaggregatedBuffers != null) { @@ -204,23 +230,20 @@ private List updateValues( } return retVal; } else { - key.clear(); - Integer position = positionsHash.get(key); + IntArrayWrapper wrapper = new IntArrayWrapper(key); + + Integer position = positionsHash.get(wrapper); int[] increments = positionMaintainer.getIncrements(); int thePosition; if (position == null) { - ByteBuffer keyCopy = ByteBuffer.allocate(key.limit()); - keyCopy.put(key.asReadOnlyBuffer()); - keyCopy.clear(); - position = positionMaintainer.getNext(); if (position == null) { - return Lists.newArrayList(keyCopy); + return Lists.newArrayList(key); } - positions.put(keyCopy, position); - positionsHash.put(keyCopy, position); + positions.put(key, position); + positionsHash.put(wrapper, position); thePosition = position; for (int i = 0; i < aggregators.length; ++i) { aggregators[i].init(metricValues, thePosition); @@ -295,13 +318,13 @@ private static class RowIterator implements CloseableIterator private final List dimensionSpecs; private final List dimensions; - private final ArrayList dimNames; + private final List dimNames; private final List aggregatorSpecs; private final BufferAggregator[] aggregators; private final String[] metricNames; private final int[] sizesRequired; - private List unprocessedKeys; + private List unprocessedKeys; private Iterator delegate; public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBuffer, GroupByQueryConfig config) @@ -364,8 +387,8 @@ public Row next() final PositionMaintainer positionMaintainer = new PositionMaintainer(0, sizesRequired, metricsBuffer.remaining()); final RowUpdater rowUpdater = new RowUpdater(metricsBuffer, aggregators, positionMaintainer); if (unprocessedKeys != null) { - for (ByteBuffer key : unprocessedKeys) { - final List unprocUnproc = rowUpdater.updateValues(key, ImmutableList.of()); + for (int[] key : unprocessedKeys) { + final List unprocUnproc = rowUpdater.updateValues(key, ImmutableList.of()); if (unprocUnproc != null) { throw new ISE("Not enough memory to process the request."); } @@ -373,7 +396,7 @@ public Row next() cursor.advance(); } while (!cursor.isDone() && rowUpdater.getNumRows() < maxIntermediateRows) { - ByteBuffer key = ByteBuffer.allocate(dimensions.size() * Ints.BYTES); + int[] key = new int[dimensions.size()]; unprocessedKeys = rowUpdater.updateValues(key, dimensions); if (unprocessedKeys != null) { @@ -393,20 +416,20 @@ public Row next() delegate = FunctionalIterator .create(rowUpdater.getPositions().entrySet().iterator()) .transform( - new Function, Row>() + new Function, Row>() { private final DateTime timestamp = cursor.getTime(); private final int[] increments = positionMaintainer.getIncrements(); @Override - public Row apply(@Nullable Map.Entry input) + public Row apply(Map.Entry input) { Map theEvent = Maps.newLinkedHashMap(); - ByteBuffer keyBuffer = input.getKey().duplicate(); + int[] keyArray = input.getKey(); for (int i = 0; i < dimensions.size(); ++i) { final DimensionSelector dimSelector = dimensions.get(i); - final int dimVal = keyBuffer.getInt(); + final int dimVal = keyArray[i]; if (dimSelector.getValueCardinality() != dimVal) { theEvent.put(dimNames.get(i), dimSelector.lookupName(dimVal)); }