Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Object get(final ByteBuffer buf, final int position)
final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
lock.lock();
try {
return sketchCache.get(buf).get(position);
return sketchCache.get(buf).get(position).copy();
}
finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public synchronized void aggregate(final ByteBuffer buffer, final int position)
@Override
public synchronized Object get(final ByteBuffer buffer, final int position)
{
return sketches.get(buffer).get(position);
return sketches.get(buffer).get(position).compact();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ public Object get(ByteBuffer buf, int position)
// | k (byte) | numLongs (int) | bitset (long[numLongs]) |
int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES);
mutationBuffer.limit(position + sizeBytes);
return mutationBuffer.slice();

ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
resultCopy.put(mutationBuffer.slice());
resultCopy.rewind();
return resultCopy;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,16 @@ public interface BufferAggregator extends HotLoopCallee
*
* Converts the given byte buffer representation into an intermediate aggregate Object
*
* <b>Implementations must not change the position, limit or mark of the given buffer</b>
* <b>Implementations must not change the position, limit or mark of the given buffer.</b>
*
* <b>
* The object returned must not have any references to the given buffer (i.e., make a copy), since the
* underlying buffer is a shared resource and may be given to another processing thread
* while the objects returned by this aggregator are still in use.
* </b>
*
* <b>If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator
* expects its inputs to be mutable, then the object returned by this method must be mutable.</b>
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the aggregate value is stored
Expand Down