Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -143,15 +142,38 @@ public void close() throws IOException
);
}

// for hash map
private static class IntArrayWrapper
{
private final int[] array;

private IntArrayWrapper(int[] array)
{
this.array = array;
}

@Override
public final int hashCode()
{
return Arrays.hashCode(array);
}

@Override
public final boolean equals(Object o)
{
return Arrays.equals(array, ((IntArrayWrapper) o).array);
}
}

private static class RowUpdater
{
private final ByteBuffer metricValues;
private final BufferAggregator[] aggregators;
private final PositionMaintainer positionMaintainer;

private final Map<ByteBuffer, Integer> positions = Maps.newTreeMap();
private final Map<int[], Integer> positions = Maps.newTreeMap(Ints.lexicographicalComparator());
// GroupBy queries tend to do a lot of reads from this. We co-store a hash map to make those reads go faster.
private final Map<ByteBuffer, Integer> positionsHash = Maps.newHashMap();
private final Map<IntArrayWrapper, Integer> positionsHash = Maps.newHashMap();

public RowUpdater(
ByteBuffer metricValues,
Expand All @@ -169,31 +191,35 @@ public int getNumRows()
return positions.size();
}

public Map<ByteBuffer, Integer> getPositions()
public Map<int[], Integer> getPositions()
{
return positions;
}

private List<ByteBuffer> updateValues(
ByteBuffer key,
private List<int[]> updateValues(
int[] key,
List<DimensionSelector> dims
)
{
if (dims.size() > 0) {
List<ByteBuffer> retVal = null;
List<ByteBuffer> unaggregatedBuffers = null;
int size = dims.size();
int index = key.length - size;
if (size > 0) {
List<int[]> retVal = null;
List<int[]> unaggregatedBuffers = null;

final DimensionSelector dimSelector = dims.get(0);
final IndexedInts row = dimSelector.getRow();
if (row == null || row.size() == 0) {
ByteBuffer newKey = key.duplicate();
newKey.putInt(dimSelector.getValueCardinality());
unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size()));
key[index] = dimSelector.getValueCardinality();
unaggregatedBuffers = updateValues(key, dims.subList(1, size));
} else if (row.size() == 1) {
key[index] = row.get(0);
unaggregatedBuffers = updateValues(key, dims.subList(1, size));
} else {
for (Integer dimValue : row) {
ByteBuffer newKey = key.duplicate();
newKey.putInt(dimValue);
unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size()));
int[] newKey = Arrays.copyOf(key, key.length);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes behavior. In the /master impl only one underlying copy of the data exists. here, you are expanding the quantity of heap objects.

We are already having heap problems on our highly utilized nodes. If it is possible to NOT create more heap objects during this workflow that would be nice.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the most of cases, key.duplicate() (1 ref, 5 int, 1 long, 1 boolean) uses more memory than simple int[].

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd have to have 7 (I think) dimensions before heap use of int[] exceeds ByteBuffer. Fewer than that and the int[] would actually be cheaper. Hard to say which is better without data, but my guess most real world groupBy queries are going to be roughly at or below 7 dimensions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, in v2, groupBy is reworked to avoid this recursive structure, which reduces allocations and stack depth.

newKey[index] = dimValue;
unaggregatedBuffers = updateValues(newKey, dims.subList(1, size));
}
}
if (unaggregatedBuffers != null) {
Expand All @@ -204,23 +230,20 @@ private List<ByteBuffer> updateValues(
}
return retVal;
} else {
key.clear();
Integer position = positionsHash.get(key);
IntArrayWrapper wrapper = new IntArrayWrapper(key);

Integer position = positionsHash.get(wrapper);
int[] increments = positionMaintainer.getIncrements();
int thePosition;

if (position == null) {
ByteBuffer keyCopy = ByteBuffer.allocate(key.limit());
keyCopy.put(key.asReadOnlyBuffer());
keyCopy.clear();

position = positionMaintainer.getNext();
if (position == null) {
return Lists.newArrayList(keyCopy);
return Lists.newArrayList(key);
}

positions.put(keyCopy, position);
positionsHash.put(keyCopy, position);
positions.put(key, position);
positionsHash.put(wrapper, position);
thePosition = position;
for (int i = 0; i < aggregators.length; ++i) {
aggregators[i].init(metricValues, thePosition);
Expand Down Expand Up @@ -295,13 +318,13 @@ private static class RowIterator implements CloseableIterator<Row>

private final List<DimensionSpec> dimensionSpecs;
private final List<DimensionSelector> dimensions;
private final ArrayList<String> dimNames;
private final List<String> dimNames;
private final List<AggregatorFactory> aggregatorSpecs;
private final BufferAggregator[] aggregators;
private final String[] metricNames;
private final int[] sizesRequired;

private List<ByteBuffer> unprocessedKeys;
private List<int[]> unprocessedKeys;
private Iterator<Row> delegate;

public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBuffer, GroupByQueryConfig config)
Expand Down Expand Up @@ -364,16 +387,16 @@ public Row next()
final PositionMaintainer positionMaintainer = new PositionMaintainer(0, sizesRequired, metricsBuffer.remaining());
final RowUpdater rowUpdater = new RowUpdater(metricsBuffer, aggregators, positionMaintainer);
if (unprocessedKeys != null) {
for (ByteBuffer key : unprocessedKeys) {
final List<ByteBuffer> unprocUnproc = rowUpdater.updateValues(key, ImmutableList.<DimensionSelector>of());
for (int[] key : unprocessedKeys) {
final List<int[]> unprocUnproc = rowUpdater.updateValues(key, ImmutableList.<DimensionSelector>of());
if (unprocUnproc != null) {
throw new ISE("Not enough memory to process the request.");
}
}
cursor.advance();
}
while (!cursor.isDone() && rowUpdater.getNumRows() < maxIntermediateRows) {
ByteBuffer key = ByteBuffer.allocate(dimensions.size() * Ints.BYTES);
int[] key = new int[dimensions.size()];

unprocessedKeys = rowUpdater.updateValues(key, dimensions);
if (unprocessedKeys != null) {
Expand All @@ -393,20 +416,20 @@ public Row next()
delegate = FunctionalIterator
.create(rowUpdater.getPositions().entrySet().iterator())
.transform(
new Function<Map.Entry<ByteBuffer, Integer>, Row>()
new Function<Map.Entry<int[], Integer>, Row>()
{
private final DateTime timestamp = cursor.getTime();
private final int[] increments = positionMaintainer.getIncrements();

@Override
public Row apply(@Nullable Map.Entry<ByteBuffer, Integer> input)
public Row apply(Map.Entry<int[], Integer> input)
{
Map<String, Object> theEvent = Maps.newLinkedHashMap();

ByteBuffer keyBuffer = input.getKey().duplicate();
int[] keyArray = input.getKey();
for (int i = 0; i < dimensions.size(); ++i) {
final DimensionSelector dimSelector = dimensions.get(i);
final int dimVal = keyBuffer.getInt();
final int dimVal = keyArray[i];
if (dimSelector.getValueCardinality() != dimVal) {
theEvent.put(dimNames.get(i), dimSelector.lookupName(dimVal));
}
Expand Down