diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
index 954be0effe72..c4a7a023fd70 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
@@ -318,7 +318,9 @@ protected void reduce(
{
HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector();
for (BytesWritable value : values) {
- aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()));
+ aggregate.fold(
+ HyperLogLogCollector.makeCollector(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()))
+ );
}
Interval interval = config.getGranularitySpec().getSegmentGranularity().bucket(new DateTime(key.get()));
intervals.add(interval);
diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
index 6531d47b6bfe..ee06c65f34b6 100644
--- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
@@ -173,16 +173,20 @@ public AggregatorFactory apply(String input)
@Override
public Object deserialize(Object object)
{
+ final ByteBuffer buffer;
+
if (object instanceof byte[]) {
- return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object));
+ buffer = ByteBuffer.wrap((byte[]) object);
} else if (object instanceof ByteBuffer) {
- return HyperLogLogCollector.makeCollector((ByteBuffer) object);
+ // Be conservative, don't assume we own this buffer.
+ buffer = ((ByteBuffer) object).duplicate();
} else if (object instanceof String) {
- return HyperLogLogCollector.makeCollector(
- ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)))
- );
+ buffer = ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)));
+ } else {
+ return object;
}
- return object;
+
+ return HyperLogLogCollector.makeCollector(buffer);
}
@Override
diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
index 2f8f6fe27c1e..c791dc650f43 100644
--- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
+++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
@@ -53,16 +53,23 @@ public void init(ByteBuffer buf, int position)
@Override
public void aggregate(ByteBuffer buf, int position)
{
- final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
- (ByteBuffer) buf.duplicate().position(position).limit(
- position
- + HyperLogLogCollector.getLatestNumBytesForDenseStorage()
- )
- );
- if (byRow) {
- CardinalityAggregator.hashRow(selectorList, collector);
- } else {
- CardinalityAggregator.hashValues(selectorList, collector);
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+
+ try {
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+ if (byRow) {
+ CardinalityAggregator.hashRow(selectorList, collector);
+ } else {
+ CardinalityAggregator.hashValues(selectorList, collector);
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
}
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV0.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV0.java
index dcadf894bea8..1c2f702dd07c 100644
--- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV0.java
+++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV0.java
@@ -35,14 +35,7 @@ public class HLLCV0 extends HyperLogLogCollector
public static final int HEADER_NUM_BYTES = 3;
public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES;
- private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer();
-
- protected HLLCV0()
- {
- super(defaultStorageBuffer);
- }
-
- protected HLLCV0(ByteBuffer buffer)
+ HLLCV0(ByteBuffer buffer)
{
super(buffer);
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV1.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV1.java
index 5746869f8965..b994eab7174d 100644
--- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV1.java
+++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HLLCV1.java
@@ -44,12 +44,12 @@ public class HLLCV1 extends HyperLogLogCollector
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0})
.asReadOnlyBuffer();
- protected HLLCV1()
+ HLLCV1()
{
- super(defaultStorageBuffer);
+ super(defaultStorageBuffer.duplicate());
}
- protected HLLCV1(ByteBuffer buffer)
+ HLLCV1(ByteBuffer buffer)
{
super(buffer);
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java
index 4803b2ca765e..63a38efb6442 100644
--- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java
+++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java
@@ -35,7 +35,7 @@
*
*
* for (int i = 1; i < 20; ++i) {
- * System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i));
+ * System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i));
* }
*
*
@@ -90,6 +90,17 @@ public static HyperLogLogCollector makeLatestCollector()
return new HLLCV1();
}
+ /**
+ * Create a wrapper object around an HLL sketch contained within a buffer. The position and limit of
+ * the buffer may be changed; if you do not want this to happen, you can duplicate the buffer before
+ * passing it in.
+ *
+ * The mark and byte order of the buffer will not be modified.
+ *
+ * @param buffer buffer containing an HLL sketch starting at its position and ending at its limit
+ *
+ * @return HLLC wrapper object
+ */
public static HyperLogLogCollector makeCollector(ByteBuffer buffer)
{
int remaining = buffer.remaining();
@@ -129,6 +140,11 @@ public static double applyCorrection(double e, int zeroCount)
return e;
}
+ public static double estimateByteBuffer(ByteBuffer buf)
+ {
+ return makeCollector(buf.duplicate()).estimateCardinality();
+ }
+
private static double estimateSparse(
final ByteBuffer buf,
final byte minNum,
@@ -212,7 +228,7 @@ private static boolean isSparse(ByteBuffer buffer)
public HyperLogLogCollector(ByteBuffer byteBuffer)
{
- storageBuffer = byteBuffer.duplicate();
+ storageBuffer = byteBuffer;
initPosition = byteBuffer.position();
estimatedCardinality = null;
}
@@ -281,7 +297,7 @@ public void add(byte[] hashedValue)
byte lookupVal = ByteBitLookup.lookup[UnsignedBytes.toInt(hashedValue[i])];
switch (lookupVal) {
case 0:
- positionOf1 += (byte)8;
+ positionOf1 += (byte) 8;
continue;
default:
positionOf1 += lookupVal;
@@ -353,65 +369,74 @@ public HyperLogLogCollector fold(HyperLogLogCollector other)
other = HyperLogLogCollector.makeCollector(tmpBuffer);
}
- final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
- final byte otherOffset = other.getRegisterOffset();
+ final ByteBuffer otherBuffer = other.storageBuffer;
- byte myOffset = getRegisterOffset();
- short numNonZero = getNumNonZeroRegisters();
+ // Save position and restore later to avoid allocations due to duplicating the otherBuffer object.
+ final int otherPosition = otherBuffer.position();
- final int offsetDiff = myOffset - otherOffset;
- if (offsetDiff < 0) {
- throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
- }
+ try {
+ final byte otherOffset = other.getRegisterOffset();
- final int myPayloadStart = getPayloadBytePosition();
- otherBuffer.position(other.getPayloadBytePosition());
+ byte myOffset = getRegisterOffset();
+ short numNonZero = getNumNonZeroRegisters();
- if (isSparse(otherBuffer)) {
- while (otherBuffer.hasRemaining()) {
- final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
- numNonZero += mergeAndStoreByteRegister(
- storageBuffer,
- myPayloadStart + payloadStartPosition,
- offsetDiff,
- otherBuffer.get()
- );
+ final int offsetDiff = myOffset - otherOffset;
+ if (offsetDiff < 0) {
+ throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
}
- if (numNonZero == NUM_BUCKETS) {
- numNonZero = decrementBuckets();
- setRegisterOffset(++myOffset);
- setNumNonZeroRegisters(numNonZero);
- }
- } else { // dense
- int position = getPayloadBytePosition();
- while (otherBuffer.hasRemaining()) {
- numNonZero += mergeAndStoreByteRegister(
- storageBuffer,
- position,
- offsetDiff,
- otherBuffer.get()
- );
- position++;
- }
- if (numNonZero == NUM_BUCKETS) {
- numNonZero = decrementBuckets();
- setRegisterOffset(++myOffset);
- setNumNonZeroRegisters(numNonZero);
+
+ final int myPayloadStart = getPayloadBytePosition();
+ otherBuffer.position(other.getPayloadBytePosition());
+
+ if (isSparse(otherBuffer)) {
+ while (otherBuffer.hasRemaining()) {
+ final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
+ numNonZero += mergeAndStoreByteRegister(
+ storageBuffer,
+ myPayloadStart + payloadStartPosition,
+ offsetDiff,
+ otherBuffer.get()
+ );
+ }
+ if (numNonZero == NUM_BUCKETS) {
+ numNonZero = decrementBuckets();
+ setRegisterOffset(++myOffset);
+ setNumNonZeroRegisters(numNonZero);
+ }
+ } else { // dense
+ int position = getPayloadBytePosition();
+ while (otherBuffer.hasRemaining()) {
+ numNonZero += mergeAndStoreByteRegister(
+ storageBuffer,
+ position,
+ offsetDiff,
+ otherBuffer.get()
+ );
+ position++;
+ }
+ if (numNonZero == NUM_BUCKETS) {
+ numNonZero = decrementBuckets();
+ setRegisterOffset(++myOffset);
+ setNumNonZeroRegisters(numNonZero);
+ }
}
- }
- // no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented
- setNumNonZeroRegisters(numNonZero);
+ // no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented
+ setNumNonZeroRegisters(numNonZero);
- // this will add the max overflow and also recheck if offset needs to be shifted
- add(other.getMaxOverflowRegister(), other.getMaxOverflowValue());
+ // this will add the max overflow and also recheck if offset needs to be shifted
+ add(other.getMaxOverflowRegister(), other.getMaxOverflowValue());
- return this;
+ return this;
+ }
+ finally {
+ otherBuffer.position(otherPosition);
+ }
}
public HyperLogLogCollector fold(ByteBuffer buffer)
{
- return fold(makeCollector(buffer));
+ return fold(makeCollector(buffer.duplicate()));
}
public ByteBuffer toByteBuffer()
@@ -494,11 +519,6 @@ public double estimateCardinality()
return estimatedCardinality;
}
- public double estimateByteBuffer(ByteBuffer buf)
- {
- return makeCollector(buf).estimateCardinality();
- }
-
@Override
public boolean equals(Object o)
{
@@ -521,7 +541,7 @@ public boolean equals(Object o)
final ByteBuffer denseStorageBuffer;
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
- HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer);
+ HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer.duplicate());
denseCollector.convertToDenseStorage();
denseStorageBuffer = denseCollector.storageBuffer;
} else {
@@ -529,7 +549,7 @@ public boolean equals(Object o)
}
if (otherBuffer.remaining() != getNumBytesForDenseStorage()) {
- HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer);
+ HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer.duplicate());
otherCollector.convertToDenseStorage();
otherBuffer = otherCollector.storageBuffer;
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java
index 852b4fa323d8..5744b762bd2e 100644
--- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java
@@ -147,16 +147,20 @@ public List getRequiredColumns()
@Override
public Object deserialize(Object object)
{
+ final ByteBuffer buffer;
+
if (object instanceof byte[]) {
- return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object));
+ buffer = ByteBuffer.wrap((byte[]) object);
} else if (object instanceof ByteBuffer) {
- return HyperLogLogCollector.makeCollector((ByteBuffer) object);
+ // Be conservative, don't assume we own this buffer.
+ buffer = ((ByteBuffer) object).duplicate();
} else if (object instanceof String) {
- return HyperLogLogCollector.makeCollector(
- ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)))
- );
+ buffer = ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)));
+ } else {
+ return object;
}
- return object;
+
+ return HyperLogLogCollector.makeCollector(buffer);
}
@Override
diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesBufferAggregator.java
index 7e8902c2ec22..98cba7dce450 100644
--- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesBufferAggregator.java
+++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesBufferAggregator.java
@@ -55,12 +55,19 @@ public void aggregate(ByteBuffer buf, int position)
return;
}
- HyperLogLogCollector.makeCollector(
- (ByteBuffer) buf.duplicate().position(position).limit(
- position
- + HyperLogLogCollector.getLatestNumBytesForDenseStorage()
- )
- ).fold(collector);
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+
+ try {
+ HyperLogLogCollector.makeCollector(buf).fold(collector);
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
}
@Override
diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java
index 25bd2ee82906..de42eaa0e5a6 100644
--- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java
+++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollectorTest.java
@@ -314,15 +314,15 @@ public void testBufferSwap() throws Exception
ByteBuffer smallerOffset = makeCollectorBuffer(0, (byte) 0x20, 0x00);
ByteBuffer buffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
- HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer);
+ HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer.duplicate());
// make sure the original buffer gets modified
collector.fold(biggerOffset);
- Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer));
+ Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate()));
// make sure the original buffer gets modified
collector.fold(smallerOffset);
- Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer));
+ Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate()));
}
@Test
@@ -673,7 +673,7 @@ public void testSparseEstimation() throws Exception
}
Assert.assertEquals(
- collector.estimateCardinality(), collector.estimateByteBuffer(collector.toByteBuffer()), 0.0d
+ collector.estimateCardinality(), HyperLogLogCollector.estimateByteBuffer(collector.toByteBuffer()), 0.0d
);
}