Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ public HyperLogLogCollector fold(@Nullable HyperLogLogCollector other)

storageBuffer.duplicate().put(other.storageBuffer.asReadOnlyBuffer());

if (other.storageBuffer.remaining() != other.getNumBytesForDenseStorage()) {
// The other buffer was sparse, densify it
final int newLImit = storageBuffer.position() + other.storageBuffer.remaining();
storageBuffer.limit(newLImit);
convertToDenseStorage();
}

other = HyperLogLogCollector.makeCollector(tmpBuffer);
}

Expand Down
113 changes: 96 additions & 17 deletions hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
Expand All @@ -30,21 +31,36 @@
import org.junit.Test;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;

/**
*
*/
public class HyperLogLogCollectorTest
{
private static final Logger log = new Logger(HyperLogLogCollectorTest.class);

private final HashFunction fn = Hashing.murmur3_128();

private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
{
byte offset = startOffset;
while (offset <= endOffset) {
// fill buckets to shift registerOffset
for (short bucket = 0; bucket < 2048; ++bucket) {
collector.add(bucket, offset);
}
offset++;
}
}

@Test
public void testFolding()
{
Expand Down Expand Up @@ -78,14 +94,13 @@ public void testFolding()
}
}


/**
* This is a very long running test, disabled by default.
* It is meant to catch issues when combining a large numer of HLL objects.
*
* It compares adding all the values to one HLL vs.
* splitting up values into HLLs of 100 values each, and folding those HLLs into a single main HLL.
*
*
* When reaching very large cardinalities (>> 50,000,000), offsets are mismatched between the main HLL and the ones
* with 100 values, requiring a floating max as described in
* http://druid.io/blog/2014/02/18/hyperloglog-optimizations-for-real-world-systems.html
Expand Down Expand Up @@ -502,7 +517,8 @@ private short computeNumNonZero(byte theByte)
return retVal;
}

@Ignore @Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
@Ignore
@Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
public void testFoldingwithDifferentOffsets()
{
// final Random random = new Random(37); // this seed will cause this test to fail because of slightly larger errors
Expand Down Expand Up @@ -533,7 +549,8 @@ public void testFoldingwithDifferentOffsets()
}
}

@Ignore @Test
@Ignore
@Test
public void testFoldingwithDifferentOffsets2() throws Exception
{
final Random random = new Random(0);
Expand Down Expand Up @@ -707,6 +724,81 @@ public void testMaxOverflow()
Assert.assertEquals(0, collector.getNumNonZeroRegisters());
}

@Test
public void testRegisterSwapWithSparse()
{
final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
// Skip the first bucket
for (int i = 1; i < HyperLogLogCollector.NUM_BUCKETS; i++) {
collector.add((short) i, (byte) 1);
Assert.assertEquals(i, collector.getNumNonZeroRegisters());
Assert.assertEquals(0, collector.getRegisterOffset());
}
Assert.assertEquals(
15615.219683654448D,
HyperLogLogCollector.makeCollector(collector.toByteBuffer().asReadOnlyBuffer())
.estimateCardinality(),
1e-5D
);

final byte[] hash = new byte[10];
hash[0] = 1; // Bucket 0, 1 offset of 0
collector.add(hash);
Assert.assertEquals(0, collector.getNumNonZeroRegisters());
Assert.assertEquals(1, collector.getRegisterOffset());

// We have a REALLY bad distribution, Sketch as 0 is fine.
Assert.assertEquals(
0.0D,
HyperLogLogCollector.makeCollector(collector.toByteBuffer().asReadOnlyBuffer())
.estimateCardinality(),
1e-5D
);
final ByteBuffer buffer = collector.toByteBuffer();
Assert.assertEquals(collector.getNumHeaderBytes(), buffer.remaining());

final HyperLogLogCollector denseCollector = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < HyperLogLogCollector.NUM_BUCKETS - 1; i++) {
denseCollector.add((short) i, (byte) 1);
}

Assert.assertEquals(HyperLogLogCollector.NUM_BUCKETS - 1, denseCollector.getNumNonZeroRegisters());
final HyperLogLogCollector folded = denseCollector.fold(HyperLogLogCollector.makeCollector(buffer));
Assert.assertNotNull(folded.toByteBuffer());
Assert.assertEquals(folded.getStorageBuffer().remaining(), denseCollector.getNumBytesForDenseStorage());
}

// Example of a terrible sampling filter. Don't use this method
@Test
public void testCanFillUpOnMod()
{
final HashFunction fn = Hashing.murmur3_128();
final HyperLogLogCollector hyperLogLogCollector = HyperLogLogCollector.makeLatestCollector();
final byte[] b = new byte[10];
b[0] = 1;
hyperLogLogCollector.add(b);
final Random random = new Random(347893248701078L);
long loops = 0;
// Do a 1% "sample" where the mod of the hash is 43
final Predicate<Integer> pass = i -> {
// ByteOrder.nativeOrder() on lots of systems is ByteOrder.LITTLE_ENDIAN
final ByteBuffer bb = ByteBuffer.wrap(fn.hashInt(i).asBytes()).order(ByteOrder.LITTLE_ENDIAN);
return (bb.getInt() % 100) == 43;
};
final long loopLimit = 1_000_000_000L;
do {
final int rnd = random.nextInt();
if (!pass.test(rnd)) {
continue;
}
final Hasher hasher = fn.newHasher();
hasher.putInt(rnd);
hyperLogLogCollector.add(hasher.hash().asBytes());
} while (hyperLogLogCollector.getNumNonZeroRegisters() > 0 && ++loops < loopLimit);
Assert.assertNotEquals(loopLimit, loops);
Assert.assertEquals(hyperLogLogCollector.getNumHeaderBytes(), hyperLogLogCollector.toByteBuffer().remaining());
}

@Test
public void testMergeMaxOverflow()
{
Expand Down Expand Up @@ -736,19 +828,6 @@ public void testMergeMaxOverflow()
Assert.assertEquals(67, collector.getMaxOverflowValue());
}


private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
{
byte offset = startOffset;
while (offset <= endOffset) {
// fill buckets to shift registerOffset
for (short bucket = 0; bucket < 2048; ++bucket) {
collector.add(bucket, offset);
}
offset++;
}
}

@Test
public void testFoldOrder()
{
Expand Down