Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public Sequence<Row> apply(final Cursor cursor)
@Override
public RowIterator make()
{
return new RowIterator(query, cursor, bufferHolder.get(), config.get());
return new RowIterator(query, cursor, bufferHolder.get(), config.get(), false);
}

@Override
Expand All @@ -143,15 +143,13 @@ public void close() throws IOException
);
}

private static class RowUpdater
private static abstract class RowUpdater
{
private final ByteBuffer metricValues;
private final BufferAggregator[] aggregators;
private final PositionMaintainer positionMaintainer;

private final Map<ByteBuffer, Integer> positions = Maps.newTreeMap();
// GroupBy queries tend to do a lot of reads from this. We co-store a hash map to make those reads go faster.
private final Map<ByteBuffer, Integer> positionsHash = Maps.newHashMap();
protected final Map<ByteBuffer, Integer> positions = Maps.newTreeMap();

public RowUpdater(
ByteBuffer metricValues,
Expand All @@ -164,15 +162,13 @@ public RowUpdater(
this.positionMaintainer = positionMaintainer;
}

public int getNumRows()
{
protected int getNumRows() {
return positions.size();
}

public Map<ByteBuffer, Integer> getPositions()
{
return positions;
}
protected abstract Map<ByteBuffer, Integer> getPositions();

protected abstract void putPosition(ByteBuffer key, int position);

private List<ByteBuffer> updateValues(
ByteBuffer key,
Expand Down Expand Up @@ -205,7 +201,7 @@ private List<ByteBuffer> updateValues(
return retVal;
} else {
key.clear();
Integer position = positionsHash.get(key);
Integer position = positions.get(key);
int[] increments = positionMaintainer.getIncrements();
int thePosition;

Expand All @@ -219,8 +215,8 @@ private List<ByteBuffer> updateValues(
return Lists.newArrayList(keyCopy);
}

positions.put(keyCopy, position);
positionsHash.put(keyCopy, position);
putPosition(keyCopy, position);

thePosition = position;
for (int i = 0; i < aggregators.length; ++i) {
aggregators[i].init(metricValues, thePosition);
Expand All @@ -238,6 +234,55 @@ private List<ByteBuffer> updateValues(
}
}

private static class UnsortedRowUpdater extends RowUpdater
{
public UnsortedRowUpdater(
ByteBuffer metricValues,
BufferAggregator[] aggregators,
PositionMaintainer positionMaintainer
)
{
super(metricValues, aggregators, positionMaintainer);
}

protected final Map<ByteBuffer, Integer> getPositions()
{
return positions;
}

protected final void putPosition(ByteBuffer key, int position)
{
positions.put(key, position);
}
}

private static class SortedUpdater extends RowUpdater
{
private final Map<ByteBuffer, Integer> sortedPositions = Maps.newTreeMap();

public SortedUpdater(
ByteBuffer metricValues,
BufferAggregator[] aggregators,
PositionMaintainer positionMaintainer
)
{
super(metricValues, aggregators, positionMaintainer);
}

@Override
protected final Map<ByteBuffer, Integer> getPositions()
{
return sortedPositions;
}

@Override
protected final void putPosition(ByteBuffer key, int position)
{
positions.put(key, position);
sortedPositions.put(key, position);
}
}

private static class PositionMaintainer
{
private final int[] increments;
Expand Down Expand Up @@ -292,6 +337,8 @@ private static class RowIterator implements CloseableIterator<Row>
private final Cursor cursor;
private final ByteBuffer metricsBuffer;
private final int maxIntermediateRows;
private final GroupByQueryConfig config;
private final boolean sort;

private final List<DimensionSpec> dimensionSpecs;
private final List<DimensionSelector> dimensions;
Expand All @@ -304,18 +351,25 @@ private static class RowIterator implements CloseableIterator<Row>
private List<ByteBuffer> unprocessedKeys;
private Iterator<Row> delegate;

public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBuffer, GroupByQueryConfig config)
public RowIterator(
GroupByQuery query,
final Cursor cursor,
ByteBuffer metricsBuffer,
GroupByQueryConfig config,
boolean sort
)
{
this.query = query;
this.cursor = cursor;
this.metricsBuffer = metricsBuffer;

this.maxIntermediateRows = Math.min(
query.getContextValue(
CTX_KEY_MAX_INTERMEDIATE_ROWS,
config.getMaxIntermediateRows()
), config.getMaxIntermediateRows()
);
this.config = config;
this.sort = sort;

unprocessedKeys = null;
delegate = Iterators.emptyIterator();
Expand Down Expand Up @@ -362,7 +416,9 @@ public Row next()
}

final PositionMaintainer positionMaintainer = new PositionMaintainer(0, sizesRequired, metricsBuffer.remaining());
final RowUpdater rowUpdater = new RowUpdater(metricsBuffer, aggregators, positionMaintainer);
final RowUpdater rowUpdater = sort
? new SortedUpdater(metricsBuffer, aggregators, positionMaintainer)
: new UnsortedRowUpdater(metricsBuffer, aggregators, positionMaintainer);
if (unprocessedKeys != null) {
for (ByteBuffer key : unprocessedKeys) {
final List<ByteBuffer> unprocUnproc = rowUpdater.updateValues(key, ImmutableList.<DimensionSelector>of());
Expand Down