Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ protected void reduce(
{
HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector();
for (BytesWritable value : values) {
aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()));
aggregate.fold(
HyperLogLogCollector.makeCollector(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()))
);
}
Interval interval = config.getGranularitySpec().getSegmentGranularity().bucket(new DateTime(key.get()));
intervals.add(interval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,20 @@ public AggregatorFactory apply(String input)
@Override
public Object deserialize(Object object)
{
final ByteBuffer buffer;

if (object instanceof byte[]) {
return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object));
buffer = ByteBuffer.wrap((byte[]) object);
} else if (object instanceof ByteBuffer) {
return HyperLogLogCollector.makeCollector((ByteBuffer) object);
// Be conservative, don't assume we own this buffer.
buffer = ((ByteBuffer) object).duplicate();
} else if (object instanceof String) {
return HyperLogLogCollector.makeCollector(
ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)))
);
buffer = ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)));
} else {
return object;
}
return object;

return HyperLogLogCollector.makeCollector(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,23 @@ public void init(ByteBuffer buf, int position)
@Override
public void aggregate(ByteBuffer buf, int position)
{
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
(ByteBuffer) buf.duplicate().position(position).limit(
position
+ HyperLogLogCollector.getLatestNumBytesForDenseStorage()
)
);
if (byRow) {
CardinalityAggregator.hashRow(selectorList, collector);
} else {
CardinalityAggregator.hashValues(selectorList, collector);
// Save position, limit and restore later instead of allocating a new ByteBuffer object
final int oldPosition = buf.position();
final int oldLimit = buf.limit();
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
buf.position(position);

try {
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
if (byRow) {
CardinalityAggregator.hashRow(selectorList, collector);
} else {
CardinalityAggregator.hashValues(selectorList, collector);
}
}
finally {
buf.limit(oldLimit);
buf.position(oldPosition);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,7 @@ public class HLLCV0 extends HyperLogLogCollector
public static final int HEADER_NUM_BYTES = 3;
public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES;

private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer();

protected HLLCV0()
{
super(defaultStorageBuffer);
}

protected HLLCV0(ByteBuffer buffer)
HLLCV0(ByteBuffer buffer)
{
super(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public class HLLCV1 extends HyperLogLogCollector
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0})
.asReadOnlyBuffer();

protected HLLCV1()
HLLCV1()
{
super(defaultStorageBuffer);
super(defaultStorageBuffer.duplicate());
}

protected HLLCV1(ByteBuffer buffer)
HLLCV1(ByteBuffer buffer)
{
super(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*
* <code>
* for (int i = 1; i &lt; 20; ++i) {
* System.out.printf("i[%,d], val[%,d] =&gt; error[%f%%]%n", i, 2 &lt;&lt; i, 104 / Math.sqrt(2 &lt;&lt; i));
* System.out.printf("i[%,d], val[%,d] =&gt; error[%f%%]%n", i, 2 &lt;&lt; i, 104 / Math.sqrt(2 &lt;&lt; i));
* }
* </code>
*
Expand Down Expand Up @@ -90,6 +90,17 @@ public static HyperLogLogCollector makeLatestCollector()
return new HLLCV1();
}

/**
* Create a wrapper object around an HLL sketch contained within a buffer. The position and limit of
* the buffer may be changed; if you do not want this to happen, you can duplicate the buffer before
* passing it in.
*
* The mark and byte order of the buffer will not be modified.
*
* @param buffer buffer containing an HLL sketch starting at its position and ending at its limit
*
* @return HLLC wrapper object
*/
public static HyperLogLogCollector makeCollector(ByteBuffer buffer)
{
int remaining = buffer.remaining();
Expand Down Expand Up @@ -129,6 +140,11 @@ public static double applyCorrection(double e, int zeroCount)
return e;
}

public static double estimateByteBuffer(ByteBuffer buf)
{
return makeCollector(buf.duplicate()).estimateCardinality();
}

private static double estimateSparse(
final ByteBuffer buf,
final byte minNum,
Expand Down Expand Up @@ -212,7 +228,7 @@ private static boolean isSparse(ByteBuffer buffer)

public HyperLogLogCollector(ByteBuffer byteBuffer)
{
storageBuffer = byteBuffer.duplicate();
storageBuffer = byteBuffer;
initPosition = byteBuffer.position();
estimatedCardinality = null;
}
Expand Down Expand Up @@ -281,7 +297,7 @@ public void add(byte[] hashedValue)
byte lookupVal = ByteBitLookup.lookup[UnsignedBytes.toInt(hashedValue[i])];
switch (lookupVal) {
case 0:
positionOf1 += (byte)8;
positionOf1 += (byte) 8;
continue;
default:
positionOf1 += lookupVal;
Expand Down Expand Up @@ -353,65 +369,74 @@ public HyperLogLogCollector fold(HyperLogLogCollector other)
other = HyperLogLogCollector.makeCollector(tmpBuffer);
}

final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
final byte otherOffset = other.getRegisterOffset();
final ByteBuffer otherBuffer = other.storageBuffer;

byte myOffset = getRegisterOffset();
short numNonZero = getNumNonZeroRegisters();
// Save position and restore later to avoid allocations due to duplicating the otherBuffer object.
final int otherPosition = otherBuffer.position();

final int offsetDiff = myOffset - otherOffset;
if (offsetDiff < 0) {
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
}
try {
final byte otherOffset = other.getRegisterOffset();

final int myPayloadStart = getPayloadBytePosition();
otherBuffer.position(other.getPayloadBytePosition());
byte myOffset = getRegisterOffset();
short numNonZero = getNumNonZeroRegisters();

if (isSparse(otherBuffer)) {
while (otherBuffer.hasRemaining()) {
final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
numNonZero += mergeAndStoreByteRegister(
storageBuffer,
myPayloadStart + payloadStartPosition,
offsetDiff,
otherBuffer.get()
);
final int offsetDiff = myOffset - otherOffset;
if (offsetDiff < 0) {
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
}
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);
}
} else { // dense
int position = getPayloadBytePosition();
while (otherBuffer.hasRemaining()) {
numNonZero += mergeAndStoreByteRegister(
storageBuffer,
position,
offsetDiff,
otherBuffer.get()
);
position++;
}
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);

final int myPayloadStart = getPayloadBytePosition();
otherBuffer.position(other.getPayloadBytePosition());

if (isSparse(otherBuffer)) {
while (otherBuffer.hasRemaining()) {
final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
numNonZero += mergeAndStoreByteRegister(
storageBuffer,
myPayloadStart + payloadStartPosition,
offsetDiff,
otherBuffer.get()
);
}
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);
}
} else { // dense
int position = getPayloadBytePosition();
while (otherBuffer.hasRemaining()) {
numNonZero += mergeAndStoreByteRegister(
storageBuffer,
position,
offsetDiff,
otherBuffer.get()
);
position++;
}
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);
}
}
}

// no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented
setNumNonZeroRegisters(numNonZero);
// no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented
setNumNonZeroRegisters(numNonZero);

// this will add the max overflow and also recheck if offset needs to be shifted
add(other.getMaxOverflowRegister(), other.getMaxOverflowValue());
// this will add the max overflow and also recheck if offset needs to be shifted
add(other.getMaxOverflowRegister(), other.getMaxOverflowValue());

return this;
return this;
}
finally {
otherBuffer.position(otherPosition);
}
}

public HyperLogLogCollector fold(ByteBuffer buffer)
{
return fold(makeCollector(buffer));
return fold(makeCollector(buffer.duplicate()));
}

public ByteBuffer toByteBuffer()
Expand Down Expand Up @@ -494,11 +519,6 @@ public double estimateCardinality()
return estimatedCardinality;
}

public double estimateByteBuffer(ByteBuffer buf)
{
return makeCollector(buf).estimateCardinality();
}

@Override
public boolean equals(Object o)
{
Expand All @@ -521,15 +541,15 @@ public boolean equals(Object o)

final ByteBuffer denseStorageBuffer;
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer);
HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer.duplicate());
denseCollector.convertToDenseStorage();
denseStorageBuffer = denseCollector.storageBuffer;
} else {
denseStorageBuffer = storageBuffer;
}

if (otherBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer);
HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer.duplicate());
otherCollector.convertToDenseStorage();
otherBuffer = otherCollector.storageBuffer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,20 @@ public List<AggregatorFactory> getRequiredColumns()
@Override
public Object deserialize(Object object)
{
final ByteBuffer buffer;

if (object instanceof byte[]) {
return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object));
buffer = ByteBuffer.wrap((byte[]) object);
} else if (object instanceof ByteBuffer) {
return HyperLogLogCollector.makeCollector((ByteBuffer) object);
// Be conservative, don't assume we own this buffer.
buffer = ((ByteBuffer) object).duplicate();
} else if (object instanceof String) {
return HyperLogLogCollector.makeCollector(
ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)))
);
buffer = ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)));
} else {
return object;
}
return object;

return HyperLogLogCollector.makeCollector(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,19 @@ public void aggregate(ByteBuffer buf, int position)
return;
}

HyperLogLogCollector.makeCollector(
(ByteBuffer) buf.duplicate().position(position).limit(
position
+ HyperLogLogCollector.getLatestNumBytesForDenseStorage()
)
).fold(collector);
// Save position, limit and restore later instead of allocating a new ByteBuffer object
final int oldPosition = buf.position();
final int oldLimit = buf.limit();
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
buf.position(position);

try {
HyperLogLogCollector.makeCollector(buf).fold(collector);
}
finally {
buf.limit(oldLimit);
buf.position(oldPosition);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,15 @@ public void testBufferSwap() throws Exception
ByteBuffer smallerOffset = makeCollectorBuffer(0, (byte) 0x20, 0x00);

ByteBuffer buffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer);
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buffer.duplicate());

// make sure the original buffer gets modified
collector.fold(biggerOffset);
Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer));
Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate()));

// make sure the original buffer gets modified
collector.fold(smallerOffset);
Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer));
Assert.assertEquals(collector, HyperLogLogCollector.makeCollector(buffer.duplicate()));
}

@Test
Expand Down Expand Up @@ -673,7 +673,7 @@ public void testSparseEstimation() throws Exception
}

Assert.assertEquals(
collector.estimateCardinality(), collector.estimateByteBuffer(collector.toByteBuffer()), 0.0d
collector.estimateCardinality(), HyperLogLogCollector.estimateByteBuffer(collector.toByteBuffer()), 0.0d
);
}

Expand Down