Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,96 +342,102 @@ public void add(short bucket, byte positionOf1)
}
}

// this will always return `this`, with possibly changing (but not breaking) `other`
public HyperLogLogCollector fold(HyperLogLogCollector other)
{
if (other == null || other.storageBuffer.remaining() == 0) {
return this;
}

if (storageBuffer.isReadOnly()) {
convertToMutableByteBuffer();
}

if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage();
}

estimatedCardinality = null;

if (getRegisterOffset() < other.getRegisterOffset()) {
// "Swap" the buffers so that we are folding into the one with the higher offset
final ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
tmpBuffer.clear();

storageBuffer.duplicate().put(other.storageBuffer.asReadOnlyBuffer());
final boolean needSwap = getRegisterOffset() < other.getRegisterOffset();
final HyperLogLogCollector collector1 = needSwap ? other : this;
final HyperLogLogCollector collector2 = needSwap ? this : other;

other = HyperLogLogCollector.makeCollector(tmpBuffer);
ByteBuffer current = collector1.storageBuffer;
if (current.remaining() != getNumBytesForDenseStorage()) {
collector1.convertToDenseStorage();
} else if (current.isReadOnly()) {
collector1.convertToMutableByteBuffer();
}

final ByteBuffer otherBuffer = other.storageBuffer;
final ByteBuffer buffer = collector1.storageBuffer;
final ByteBuffer otherBuffer = collector2.storageBuffer;

// Save position and restore later to avoid allocations due to duplicating the otherBuffer object.
final int otherPosition = otherBuffer.position();

try {
final byte otherOffset = other.getRegisterOffset();
final byte otherOffset = collector2.getRegisterOffset();

byte myOffset = getRegisterOffset();
short numNonZero = getNumNonZeroRegisters();
byte myOffset = collector1.getRegisterOffset();
short numNonZero = collector1.getNumNonZeroRegisters();

final int offsetDiff = myOffset - otherOffset;
if (offsetDiff < 0) {
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
}

final int myPayloadStart = getPayloadBytePosition();
otherBuffer.position(other.getPayloadBytePosition());
final int myPayloadStart = collector1.getPayloadBytePosition();
otherBuffer.position(collector2.getPayloadBytePosition());

if (isSparse(otherBuffer)) {
while (otherBuffer.hasRemaining()) {
final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
final int payloadStartPosition = otherBuffer.getShort() - collector2.getNumHeaderBytes();
numNonZero += mergeAndStoreByteRegister(
storageBuffer,
buffer,
myPayloadStart + payloadStartPosition,
offsetDiff,
otherBuffer.get()
);
}
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);
numNonZero = collector1.decrementBuckets();
collector1.setRegisterOffset(++myOffset);
collector1.setNumNonZeroRegisters(numNonZero);
}
} else { // dense
int position = getPayloadBytePosition();
int position = collector1.getPayloadBytePosition();
while (otherBuffer.hasRemaining()) {
numNonZero += mergeAndStoreByteRegister(
storageBuffer,
buffer,
position,
offsetDiff,
otherBuffer.get()
);
position++;
}
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);
numNonZero = collector1.decrementBuckets();
collector1.setRegisterOffset(++myOffset);
collector1.setNumNonZeroRegisters(numNonZero);
}
}

// no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented
setNumNonZeroRegisters(numNonZero);
collector1.setNumNonZeroRegisters(numNonZero);

// this will add the max overflow and also recheck if offset needs to be shifted
add(other.getMaxOverflowRegister(), other.getMaxOverflowValue());

return this;
}
finally {
collector1.add(collector2.getMaxOverflowRegister(), collector2.getMaxOverflowValue());
} finally {
otherBuffer.position(otherPosition);
}

if (needSwap) {
// "Swap" the buffers so that we are folding into the one with the higher offset
if (otherBuffer.remaining() == buffer.remaining()) {
otherBuffer.put(buffer.asReadOnlyBuffer());
otherBuffer.position(otherPosition);
} else {
final ByteBuffer tmpBuffer = ByteBuffer.allocate(buffer.remaining());
tmpBuffer.put(buffer.asReadOnlyBuffer());
tmpBuffer.clear();

collector2.storageBuffer = tmpBuffer; // collector2 == `this`
}
}
return this;
}

public HyperLogLogCollector fold(ByteBuffer buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,11 @@ public void testBufferSwap() throws Exception
ByteBuffer buffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer.duplicate());

// make sure the original buffer gets modified
// make sure the original buffer gets modified (this is not always valid assumption)
collector.fold(biggerOffset);
Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate()));

// make sure the original buffer gets modified
// make sure the original buffer gets modified (this is not always valid assumption)
collector.fold(smallerOffset);
Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate()));
}
Expand Down