diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java index 63a38efb6442..72f1877a0b99 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java @@ -342,72 +342,66 @@ public void add(short bucket, byte positionOf1) } } + // this will always return `this`, with possibly changing (but not breaking) `other` public HyperLogLogCollector fold(HyperLogLogCollector other) { if (other == null || other.storageBuffer.remaining() == 0) { return this; } - if (storageBuffer.isReadOnly()) { - convertToMutableByteBuffer(); - } - - if (storageBuffer.remaining() != getNumBytesForDenseStorage()) { - convertToDenseStorage(); - } - estimatedCardinality = null; - if (getRegisterOffset() < other.getRegisterOffset()) { - // "Swap" the buffers so that we are folding into the one with the higher offset - final ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining()); - tmpBuffer.put(storageBuffer.asReadOnlyBuffer()); - tmpBuffer.clear(); - - storageBuffer.duplicate().put(other.storageBuffer.asReadOnlyBuffer()); + final boolean needSwap = getRegisterOffset() < other.getRegisterOffset(); + final HyperLogLogCollector collector1 = needSwap ? other : this; + final HyperLogLogCollector collector2 = needSwap ? this : other; - other = HyperLogLogCollector.makeCollector(tmpBuffer); + ByteBuffer current = collector1.storageBuffer; + if (current.remaining() != getNumBytesForDenseStorage()) { + collector1.convertToDenseStorage(); + } else if (current.isReadOnly()) { + collector1.convertToMutableByteBuffer(); } - final ByteBuffer otherBuffer = other.storageBuffer; + final ByteBuffer buffer = collector1.storageBuffer; + final ByteBuffer otherBuffer = collector2.storageBuffer; // Save position and restore later to avoid allocations due to duplicating the otherBuffer object. final int otherPosition = otherBuffer.position(); try { - final byte otherOffset = other.getRegisterOffset(); + final byte otherOffset = collector2.getRegisterOffset(); - byte myOffset = getRegisterOffset(); - short numNonZero = getNumNonZeroRegisters(); + byte myOffset = collector1.getRegisterOffset(); + short numNonZero = collector1.getNumNonZeroRegisters(); final int offsetDiff = myOffset - otherOffset; if (offsetDiff < 0) { throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff); } - final int myPayloadStart = getPayloadBytePosition(); - otherBuffer.position(other.getPayloadBytePosition()); + final int myPayloadStart = collector1.getPayloadBytePosition(); + otherBuffer.position(collector2.getPayloadBytePosition()); if (isSparse(otherBuffer)) { while (otherBuffer.hasRemaining()) { - final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes(); + final int payloadStartPosition = otherBuffer.getShort() - collector2.getNumHeaderBytes(); numNonZero += mergeAndStoreByteRegister( - storageBuffer, + buffer, myPayloadStart + payloadStartPosition, offsetDiff, otherBuffer.get() ); } if (numNonZero == NUM_BUCKETS) { - numNonZero = decrementBuckets(); - setRegisterOffset(++myOffset); - setNumNonZeroRegisters(numNonZero); + numNonZero = collector1.decrementBuckets(); + collector1.setRegisterOffset(++myOffset); + collector1.setNumNonZeroRegisters(numNonZero); } } else { // dense - int position = getPayloadBytePosition(); + int position = collector1.getPayloadBytePosition(); while (otherBuffer.hasRemaining()) { numNonZero += mergeAndStoreByteRegister( - storageBuffer, + buffer, position, offsetDiff, otherBuffer.get() @@ -415,23 +409,35 @@ public HyperLogLogCollector fold(HyperLogLogCollector other) position++; } if (numNonZero == NUM_BUCKETS) { - numNonZero = decrementBuckets(); - setRegisterOffset(++myOffset); - setNumNonZeroRegisters(numNonZero); + numNonZero = collector1.decrementBuckets(); + collector1.setRegisterOffset(++myOffset); + collector1.setNumNonZeroRegisters(numNonZero); } } // no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented - setNumNonZeroRegisters(numNonZero); + collector1.setNumNonZeroRegisters(numNonZero); // this will add the max overflow and also recheck if offset needs to be shifted - add(other.getMaxOverflowRegister(), other.getMaxOverflowValue()); - - return this; - } - finally { + collector1.add(collector2.getMaxOverflowRegister(), collector2.getMaxOverflowValue()); + } finally { otherBuffer.position(otherPosition); } + + if (needSwap) { + // "Swap" the buffers so that we are folding into the one with the higher offset + if (otherBuffer.remaining() == buffer.remaining()) { + otherBuffer.put(buffer.asReadOnlyBuffer()); + otherBuffer.position(otherPosition); + } else { + final ByteBuffer tmpBuffer = ByteBuffer.allocate(buffer.remaining()); + tmpBuffer.put(buffer.asReadOnlyBuffer()); + tmpBuffer.clear(); + + collector2.storageBuffer = tmpBuffer; // collector2 == `this` + } + } + return this; } public HyperLogLogCollector fold(ByteBuffer buffer) diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java index de42eaa0e5a6..124c145bc4ca 100644 --- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java @@ -316,11 +316,11 @@ public void testBufferSwap() throws Exception ByteBuffer buffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage()); HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer.duplicate()); - // make sure the original buffer gets modified + // make sure the original buffer gets modified (this is not always valid assumption) collector.fold(biggerOffset); Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate())); - // make sure the original buffer gets modified + // make sure the original buffer gets modified (this is not always valid assumption) collector.fold(smallerOffset); Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate())); }