tl;dr Probabilistic algorithms can bite you in the....
Here's our little story.
We are ingesting reasonably large amounts of data in real time (approximately 15k events per second). We are then querying the last week of data, for various, arbitrary intervals varying in length from hours to days.
Among other things, we issue queries structured as follows:
{
"queryType":"topN",
"aggregations":[
{
"filter":{
"dimension":"SOME_DIMENSION",
"value":"some value",
"type":"selector"
},
"aggregator":{
"fieldName":"UNIQUE_FIELD",
"type":"hyperUnique",
"name":"uniques"
},
"type":"filtered"
},
(some other aggregations),
],
"intervals": (some interval),
"filter":{
"dimension":"OTHER_DIMENSION",
"value":"other value",
"type":"selector"
},
"threshold":100,
"granularity":"all",
"dataSource":"very_secretive_name",
"metric":"uniques",
"dimension":"KEY_DIMENSION"
}
At some point, we noticed a very strange anomaly in the results returned from Druid. Small percentage (~4%) of our queries returned almost sorted results. For example, they could look like this:
[
{
"KEY_DIMENSION":"ID1",
"uniques":32454.549667124706,
"other_aggregation": xxxxxxxx
},
{
"KEY_DIMENSION":"ID2",
"uniques":29738.77111864203,
"other_aggregation": xxxxxxxxx
},
(...)
{
"KEY_DIMENSION":'ID12_ANOMALY_START',
"uniques":13531.402343246148,
"other_aggregation": xxxxxxxxx
},
(...)
{
"KEY_DIMENSION":'ID16_ANOMALY_END',
"uniques":10391.412343246148,
"other_aggregation": xxxxxxxxx
},
{
"KEY_DIMENSION":"ID14",
"uniques":20141.402343256151,
"other_aggregation": xxxxxxxxx
},
(...)
Root cause:
I did quite a lot of digging through the inner workings of Druid TopN queries. One particular area that seemed suspicious, was the merge phase, when results from different segments are merged together and sorted, prior to being returned to the caller.
The key observation at this point was that even though we issue almost identical queries ordered by different metrics, the anomaly only ever appears, when sorting by a hyperUnique metric. This pointed me to the implementation of HyperLogLogCollector (io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java).
Now, I'm convinced, the issue is almost certainly caused by unstable implementation of HyperLogLogCollector.compareTo(). HLL comparator is behaving inconsistently with the order imposed by estimated cardinalities. There are two unit tests verifying precisely that. Unfortunately, they are both too naive to detect such a rare and yet important anomaly.
Steps to reproduce:
Execute test added by DreamLab@3dbb494 to io.druid.query.aggregation.hyperloglog.HyperLogLogCollectorTest. Find it attached below:
@Test
public void testCompareToShouldBehaveConsistentlyWithEstimatedCardinalitiesEvenInToughCases() throws Exception {
// given
Random rand = new Random(0);
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 0; i < 1000; ++i) {
// given
HyperLogLogCollector leftCollector = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(9000) + 5000;
for (int l = 0; l < j; ++l) {
leftCollector.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector rightCollector = HyperLogLogCollector.makeLatestCollector();
int k = rand.nextInt(9000) + 5000;
for (int l = 0; l < k; ++l) {
rightCollector.add(fn.hashLong(rand.nextLong()).asBytes());
}
// when
final int orderedByCardinality = Double.compare(leftCollector.estimateCardinality(),
rightCollector.estimateCardinality());
final int orderedByComparator = comparator.compare(leftCollector, rightCollector);
// then, assert hyperloglog comparator behaves consistently with estimated cardinalities
Assert.assertEquals(
String.format("orderedByComparator=%d, orderedByCardinality=%d,\n" +
"Left={cardinality=%f, hll=%s},\n" +
"Right={cardinality=%f, hll=%s},\n", orderedByComparator, orderedByCardinality,
leftCollector.estimateCardinality(), leftCollector,
rightCollector.estimateCardinality(), rightCollector),
orderedByCardinality,
orderedByComparator
);
}
}
Time to dig deeper:
Closer inspection of the .compareTo method reveals that it uses two optimizations/heuristics (in the following order), before falling back to calculating estimatedCardinality directly:
- Comparison of register offsets
- Comparison of the number of non-zero registers
To find out, which of those is causing the problem let's take a look at an example output produced by the above test:
java.lang.AssertionError: orderedByComparator=1, orderedByCardinality=-1,
Left={cardinality=13338.797458, hll=HyperLogLogCollector{initPosition=0, version=1, registerOffset=1, numNonZeroRegisters=1970, maxOverflowValue=16, maxOverflowRegister=911}},
Right={cardinality=13610.832917, hll=HyperLogLogCollector{initPosition=0, version=1, registerOffset=0, numNonZeroRegisters=2046, maxOverflowValue=16, maxOverflowRegister=1559}},
Expected :-1
Actual :1
As you can see, when compared by cardinalities, Right > Left. However, if we apply the optimization 1. and compare registerOffsets instead, we will return a wrong result!
So, we've just empirically proven that optimization 1. should be removed. But, why is it so? And what about 2.. Can we prove that 2. is correct?
Back to basics:
Let's take a step back and consider, what HyperLogLog truly is, at the most basic level. At its core, it's not much more than a harmonic mean of a bunch of numbers.
Suppose we have two sets of integers:
M1 = [5, 5, 5]
M2 = [6, 4, 6]
We shall call integers M1_i and M2_i, registers. We will be calculating harmonic mean of powers of 2, with exponents given by M1_i and M2_i. We also introduce a fancy storage optimization, by storing an additional integer for each of the sets. We call those, register offsets. Now, instead of storing exact values of registers, we can store positive offsets from the register offset:
OFFSET1 = min(M1_i)
OFFSET2 = min(M2_i)
Hence we have,
OFFSET1 = 5; M1 = [0, 0, 0]
OFFSET2 = 4; M2 = [2, 0, 2]
Can we compare H1 and H2 by comparing their offsets?
If yes, then OFFSET2 = 4 < OFFSET1 = 5 => H2 < H1. Let's verify this claim:
H1 = 3 / (1 / 2^(5 + 0) + 1 / 2^(5 + 0) + 1 / 2^(5 + 0)) = 3 / (1/32 + 1/32 + 1/32) = 3 / 3/32 = 32
H2 = 3 / (1 / 2^(4 + 2) + 1 / 2^(4 + 0) + 1 / 2^(4 + 2)) = 3 / (1/64 + 1/16 + 1/64) = 3 / 6/64 = 32
Hence,
H1 == H2 !!!
No, we can't.
Can we compare H1 and H2 by comparing the number of non-zero registers?
If yes, then NUM_NON_ZERO1 = 0 < NUM_NON_ZERO2 = 2 => H1 < H2
As above, H1 == H2, therefore no, we can't.
Conclusion:
It appears that both optimizations used in the current implementation can't be considered safe and should be removed. I will try to open a pull request soon.
Performance implications:
It's hard to say what impact exactly, will the removal of those two optimizations have on the performance of the comparator (microbenchmarking is currently blocked by #2432). However, my claim is that actually, in the average case, it shouldn't be slower at all, thanks to another optimization implemented in the HLL - caching of estimated cardinalities.
tl;dr Probabilistic algorithms can bite you in the....
Here's our little story.
We are ingesting reasonably large amounts of data in real time (approximately 15k events per second). We are then querying the last week of data, for various, arbitrary intervals varying in length from hours to days.
Among other things, we issue queries structured as follows:
{ "queryType":"topN", "aggregations":[ { "filter":{ "dimension":"SOME_DIMENSION", "value":"some value", "type":"selector" }, "aggregator":{ "fieldName":"UNIQUE_FIELD", "type":"hyperUnique", "name":"uniques" }, "type":"filtered" }, (some other aggregations), ], "intervals": (some interval), "filter":{ "dimension":"OTHER_DIMENSION", "value":"other value", "type":"selector" }, "threshold":100, "granularity":"all", "dataSource":"very_secretive_name", "metric":"uniques", "dimension":"KEY_DIMENSION" }At some point, we noticed a very strange anomaly in the results returned from Druid. Small percentage (~4%) of our queries returned almost sorted results. For example, they could look like this:
[ { "KEY_DIMENSION":"ID1", "uniques":32454.549667124706, "other_aggregation": xxxxxxxx }, { "KEY_DIMENSION":"ID2", "uniques":29738.77111864203, "other_aggregation": xxxxxxxxx }, (...) { "KEY_DIMENSION":'ID12_ANOMALY_START', "uniques":13531.402343246148, "other_aggregation": xxxxxxxxx }, (...) { "KEY_DIMENSION":'ID16_ANOMALY_END', "uniques":10391.412343246148, "other_aggregation": xxxxxxxxx }, { "KEY_DIMENSION":"ID14", "uniques":20141.402343256151, "other_aggregation": xxxxxxxxx }, (...)Root cause:
I did quite a lot of digging through the inner workings of Druid TopN queries. One particular area that seemed suspicious, was the merge phase, when results from different segments are merged together and sorted, prior to being returned to the caller.
The key observation at this point was that even though we issue almost identical queries ordered by different metrics, the anomaly only ever appears, when sorting by a hyperUnique metric. This pointed me to the implementation of HyperLogLogCollector (
io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java).Now, I'm convinced, the issue is almost certainly caused by unstable implementation of HyperLogLogCollector.compareTo(). HLL comparator is behaving inconsistently with the order imposed by estimated cardinalities. There are two unit tests verifying precisely that. Unfortunately, they are both too naive to detect such a rare and yet important anomaly.
Steps to reproduce:
Execute test added by DreamLab@3dbb494 to
io.druid.query.aggregation.hyperloglog.HyperLogLogCollectorTest. Find it attached below:Time to dig deeper:
Closer inspection of the
.compareTomethod reveals that it uses two optimizations/heuristics (in the following order), before falling back to calculatingestimatedCardinalitydirectly:To find out, which of those is causing the problem let's take a look at an example output produced by the above test:
As you can see, when compared by cardinalities,
Right > Left. However, if we apply the optimization 1. and compareregisterOffsetsinstead, we will return a wrong result!So, we've just empirically proven that optimization 1. should be removed. But, why is it so? And what about 2.. Can we prove that 2. is correct?
Back to basics:
Let's take a step back and consider, what HyperLogLog truly is, at the most basic level. At its core, it's not much more than a harmonic mean of a bunch of numbers.
Suppose we have two sets of integers:
We shall call integers M1_i and M2_i, registers. We will be calculating harmonic mean of powers of 2, with exponents given by M1_i and M2_i. We also introduce a fancy storage optimization, by storing an additional integer for each of the sets. We call those, register offsets. Now, instead of storing exact values of registers, we can store positive offsets from the register offset:
Can we compare H1 and H2 by comparing their offsets?
If yes, then
OFFSET2 = 4 < OFFSET1 = 5 => H2 < H1. Let's verify this claim:No, we can't.
Can we compare H1 and H2 by comparing the number of non-zero registers?
If yes, then
NUM_NON_ZERO1 = 0 < NUM_NON_ZERO2 = 2 => H1 < H2As above,
H1 == H2, therefore no, we can't.Conclusion:
It appears that both optimizations used in the current implementation can't be considered safe and should be removed. I will try to open a pull request soon.
Performance implications:
It's hard to say what impact exactly, will the removal of those two optimizations have on the performance of the comparator (microbenchmarking is currently blocked by #2432). However, my claim is that actually, in the average case, it shouldn't be slower at all, thanks to another optimization implemented in the HLL - caching of estimated cardinalities.