From 41a2a26cb271da855ffaed11a61faa3d2fd45c70 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 3 Aug 2016 11:24:58 -0700 Subject: [PATCH] HLL: Avoid some allocations when possible. - HLLC.fold avoids duplicating the other buffer by saving and restoring its position. - HLLC.makeCollector(buffer) no longer duplicates incoming BBs. - Updated call sites where appropriate to duplicate BBs passed to HLLC. --- .../indexer/DetermineHashedPartitionsJob.java | 4 +- .../CardinalityAggregatorFactory.java | 16 ++- .../CardinalityBufferAggregator.java | 27 ++-- .../query/aggregation/hyperloglog/HLLCV0.java | 9 +- .../query/aggregation/hyperloglog/HLLCV1.java | 6 +- .../hyperloglog/HyperLogLogCollector.java | 132 ++++++++++-------- .../HyperUniquesAggregatorFactory.java | 16 ++- .../HyperUniquesBufferAggregator.java | 19 ++- .../hyperloglog/HyperLogLogCollectorTest.java | 8 +- 9 files changed, 137 insertions(+), 100 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 954be0effe72..c4a7a023fd70 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -318,7 +318,9 @@ protected void reduce( { HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector(); for (BytesWritable value : values) { - aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength())); + aggregate.fold( + HyperLogLogCollector.makeCollector(ByteBuffer.wrap(value.getBytes(), 0, value.getLength())) + ); } Interval interval = config.getGranularitySpec().getSegmentGranularity().bucket(new DateTime(key.get())); intervals.add(interval); diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index 6531d47b6bfe..ee06c65f34b6 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -173,16 +173,20 @@ public AggregatorFactory apply(String input) @Override public Object deserialize(Object object) { + final ByteBuffer buffer; + if (object instanceof byte[]) { - return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object)); + buffer = ByteBuffer.wrap((byte[]) object); } else if (object instanceof ByteBuffer) { - return HyperLogLogCollector.makeCollector((ByteBuffer) object); + // Be conservative, don't assume we own this buffer. + buffer = ((ByteBuffer) object).duplicate(); } else if (object instanceof String) { - return HyperLogLogCollector.makeCollector( - ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object))) - ); + buffer = ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object))); + } else { + return object; } - return object; + + return HyperLogLogCollector.makeCollector(buffer); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java index 2f8f6fe27c1e..c791dc650f43 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java @@ -53,16 +53,23 @@ public void init(ByteBuffer buf, int position) @Override public void aggregate(ByteBuffer buf, int position) { - final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector( - (ByteBuffer) buf.duplicate().position(position).limit( - position - + HyperLogLogCollector.getLatestNumBytesForDenseStorage() - ) - ); - if (byRow) { - CardinalityAggregator.hashRow(selectorList, collector); - } else { - CardinalityAggregator.hashValues(selectorList, collector); + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + + try { + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + if (byRow) { + CardinalityAggregator.hashRow(selectorList, collector); + } else { + CardinalityAggregator.hashValues(selectorList, collector); + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); } } diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV0.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV0.java index dcadf894bea8..1c2f702dd07c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV0.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV0.java @@ -35,14 +35,7 @@ public class HLLCV0 extends HyperLogLogCollector public static final int HEADER_NUM_BYTES = 3; public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES; - private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer(); - - protected HLLCV0() - { - super(defaultStorageBuffer); - } - - protected HLLCV0(ByteBuffer buffer) + HLLCV0(ByteBuffer buffer) { super(buffer); } diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV1.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV1.java index 5746869f8965..b994eab7174d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV1.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV1.java @@ -44,12 +44,12 @@ public class HLLCV1 extends HyperLogLogCollector private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0}) .asReadOnlyBuffer(); - protected HLLCV1() + HLLCV1() { - super(defaultStorageBuffer); + super(defaultStorageBuffer.duplicate()); } - protected HLLCV1(ByteBuffer buffer) + HLLCV1(ByteBuffer buffer) { super(buffer); } diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java index 4803b2ca765e..63a38efb6442 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java @@ -35,7 +35,7 @@ * * * for (int i = 1; i < 20; ++i) { - * System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i)); + * System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i)); * } * * @@ -90,6 +90,17 @@ public static HyperLogLogCollector makeLatestCollector() return new HLLCV1(); } + /** + * Create a wrapper object around an HLL sketch contained within a buffer. The position and limit of + * the buffer may be changed; if you do not want this to happen, you can duplicate the buffer before + * passing it in. + * + * The mark and byte order of the buffer will not be modified. + * + * @param buffer buffer containing an HLL sketch starting at its position and ending at its limit + * + * @return HLLC wrapper object + */ public static HyperLogLogCollector makeCollector(ByteBuffer buffer) { int remaining = buffer.remaining(); @@ -129,6 +140,11 @@ public static double applyCorrection(double e, int zeroCount) return e; } + public static double estimateByteBuffer(ByteBuffer buf) + { + return makeCollector(buf.duplicate()).estimateCardinality(); + } + private static double estimateSparse( final ByteBuffer buf, final byte minNum, @@ -212,7 +228,7 @@ private static boolean isSparse(ByteBuffer buffer) public HyperLogLogCollector(ByteBuffer byteBuffer) { - storageBuffer = byteBuffer.duplicate(); + storageBuffer = byteBuffer; initPosition = byteBuffer.position(); estimatedCardinality = null; } @@ -281,7 +297,7 @@ public void add(byte[] hashedValue) byte lookupVal = ByteBitLookup.lookup[UnsignedBytes.toInt(hashedValue[i])]; switch (lookupVal) { case 0: - positionOf1 += (byte)8; + positionOf1 += (byte) 8; continue; default: positionOf1 += lookupVal; @@ -353,65 +369,74 @@ public HyperLogLogCollector fold(HyperLogLogCollector other) other = HyperLogLogCollector.makeCollector(tmpBuffer); } - final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer(); - final byte otherOffset = other.getRegisterOffset(); + final ByteBuffer otherBuffer = other.storageBuffer; - byte myOffset = getRegisterOffset(); - short numNonZero = getNumNonZeroRegisters(); + // Save position and restore later to avoid allocations due to duplicating the otherBuffer object. + final int otherPosition = otherBuffer.position(); - final int offsetDiff = myOffset - otherOffset; - if (offsetDiff < 0) { - throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff); - } + try { + final byte otherOffset = other.getRegisterOffset(); - final int myPayloadStart = getPayloadBytePosition(); - otherBuffer.position(other.getPayloadBytePosition()); + byte myOffset = getRegisterOffset(); + short numNonZero = getNumNonZeroRegisters(); - if (isSparse(otherBuffer)) { - while (otherBuffer.hasRemaining()) { - final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes(); - numNonZero += mergeAndStoreByteRegister( - storageBuffer, - myPayloadStart + payloadStartPosition, - offsetDiff, - otherBuffer.get() - ); + final int offsetDiff = myOffset - otherOffset; + if (offsetDiff < 0) { + throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff); } - if (numNonZero == NUM_BUCKETS) { - numNonZero = decrementBuckets(); - setRegisterOffset(++myOffset); - setNumNonZeroRegisters(numNonZero); - } - } else { // dense - int position = getPayloadBytePosition(); - while (otherBuffer.hasRemaining()) { - numNonZero += mergeAndStoreByteRegister( - storageBuffer, - position, - offsetDiff, - otherBuffer.get() - ); - position++; - } - if (numNonZero == NUM_BUCKETS) { - numNonZero = decrementBuckets(); - setRegisterOffset(++myOffset); - setNumNonZeroRegisters(numNonZero); + + final int myPayloadStart = getPayloadBytePosition(); + otherBuffer.position(other.getPayloadBytePosition()); + + if (isSparse(otherBuffer)) { + while (otherBuffer.hasRemaining()) { + final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes(); + numNonZero += mergeAndStoreByteRegister( + storageBuffer, + myPayloadStart + payloadStartPosition, + offsetDiff, + otherBuffer.get() + ); + } + if (numNonZero == NUM_BUCKETS) { + numNonZero = decrementBuckets(); + setRegisterOffset(++myOffset); + setNumNonZeroRegisters(numNonZero); + } + } else { // dense + int position = getPayloadBytePosition(); + while (otherBuffer.hasRemaining()) { + numNonZero += mergeAndStoreByteRegister( + storageBuffer, + position, + offsetDiff, + otherBuffer.get() + ); + position++; + } + if (numNonZero == NUM_BUCKETS) { + numNonZero = decrementBuckets(); + setRegisterOffset(++myOffset); + setNumNonZeroRegisters(numNonZero); + } } - } - // no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented - setNumNonZeroRegisters(numNonZero); + // no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented + setNumNonZeroRegisters(numNonZero); - // this will add the max overflow and also recheck if offset needs to be shifted - add(other.getMaxOverflowRegister(), other.getMaxOverflowValue()); + // this will add the max overflow and also recheck if offset needs to be shifted + add(other.getMaxOverflowRegister(), other.getMaxOverflowValue()); - return this; + return this; + } + finally { + otherBuffer.position(otherPosition); + } } public HyperLogLogCollector fold(ByteBuffer buffer) { - return fold(makeCollector(buffer)); + return fold(makeCollector(buffer.duplicate())); } public ByteBuffer toByteBuffer() @@ -494,11 +519,6 @@ public double estimateCardinality() return estimatedCardinality; } - public double estimateByteBuffer(ByteBuffer buf) - { - return makeCollector(buf).estimateCardinality(); - } - @Override public boolean equals(Object o) { @@ -521,7 +541,7 @@ public boolean equals(Object o) final ByteBuffer denseStorageBuffer; if (storageBuffer.remaining() != getNumBytesForDenseStorage()) { - HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer); + HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer.duplicate()); denseCollector.convertToDenseStorage(); denseStorageBuffer = denseCollector.storageBuffer; } else { @@ -529,7 +549,7 @@ public boolean equals(Object o) } if (otherBuffer.remaining() != getNumBytesForDenseStorage()) { - HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer); + HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer.duplicate()); otherCollector.convertToDenseStorage(); otherBuffer = otherCollector.storageBuffer; } diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java index 852b4fa323d8..5744b762bd2e 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java @@ -147,16 +147,20 @@ public List getRequiredColumns() @Override public Object deserialize(Object object) { + final ByteBuffer buffer; + if (object instanceof byte[]) { - return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object)); + buffer = ByteBuffer.wrap((byte[]) object); } else if (object instanceof ByteBuffer) { - return HyperLogLogCollector.makeCollector((ByteBuffer) object); + // Be conservative, don't assume we own this buffer. + buffer = ((ByteBuffer) object).duplicate(); } else if (object instanceof String) { - return HyperLogLogCollector.makeCollector( - ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object))) - ); + buffer = ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object))); + } else { + return object; } - return object; + + return HyperLogLogCollector.makeCollector(buffer); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesBufferAggregator.java index 7e8902c2ec22..98cba7dce450 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesBufferAggregator.java @@ -55,12 +55,19 @@ public void aggregate(ByteBuffer buf, int position) return; } - HyperLogLogCollector.makeCollector( - (ByteBuffer) buf.duplicate().position(position).limit( - position - + HyperLogLogCollector.getLatestNumBytesForDenseStorage() - ) - ).fold(collector); + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + + try { + HyperLogLogCollector.makeCollector(buf).fold(collector); + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } } @Override diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java index 25bd2ee82906..de42eaa0e5a6 100644 --- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java @@ -314,15 +314,15 @@ public void testBufferSwap() throws Exception ByteBuffer smallerOffset = makeCollectorBuffer(0, (byte) 0x20, 0x00); ByteBuffer buffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage()); - HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer); + HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer.duplicate()); // make sure the original buffer gets modified collector.fold(biggerOffset); - Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer)); + Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate())); // make sure the original buffer gets modified collector.fold(smallerOffset); - Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer)); + Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate())); } @Test @@ -673,7 +673,7 @@ public void testSparseEstimation() throws Exception } Assert.assertEquals( - collector.estimateCardinality(), collector.estimateByteBuffer(collector.toByteBuffer()), 0.0d + collector.estimateCardinality(), HyperLogLogCollector.estimateByteBuffer(collector.toByteBuffer()), 0.0d ); }