From 132cacc421513a2de83d5e74868eb009e6ca004c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 17 Oct 2016 11:39:12 -0500 Subject: [PATCH] Workaround non-thread-safe use of HLL aggregators. (#3578) Despite the non-thread-safety of HyperLogLogCollector, it is actually currently used by multiple threads during realtime indexing. HyperUniquesAggregator's "aggregate" and "get" methods can be called simultaneously by OnheapIncrementalIndex, since its "doAggregate" and "getMetricObjectValue" methods are not synchronized. This means that the optimization of HyperLogLogCollector.fold in #3314 (saving and restoring position rather than duplicating the storage buffer of the right-hand side) could cause corruption in the face of concurrent writes. This patch works around the issue by duplicating the storage buffer in "get" before returning a collector. The returned collector still shares data with the original one, but the situation is no worse than before #3314. In the future we may want to consider making a thread safe version of HLLC that avoids these kinds of problems in realtime indexing. But for now I thought it was best to do a small change that restored the old behavior. --- .../aggregation/hyperloglog/HyperLogLogCollector.java | 9 +++++++-- .../aggregation/hyperloglog/HyperUniquesAggregator.java | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java index 63a38efb6442..84f1cf7a2bbe 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java @@ -39,10 +39,15 @@ * } * * - * This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that + * This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that * only one thread is ever calling methods on it. * - * If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior + * If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior. + * + * Note that despite the non-thread-safety of this class, it is actually currently used by multiple threads during + * realtime indexing. HyperUniquesAggregator's "aggregate" and "get" methods can be called simultaneously by + * OnheapIncrementalIndex, since its "doAggregate" and "getMetricObjectValue" methods are not synchronized. So, watch + * out for that. */ public abstract class HyperLogLogCollector implements Comparable { diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java index ca1f45fc4329..41d7aeb9e29a 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java @@ -57,7 +57,8 @@ public void reset() @Override public Object get() { - return collector; + // Workaround for OnheapIncrementalIndex's penchant for calling "aggregate" and "get" simultaneously. + return HyperLogLogCollector.makeCollector(collector.getStorageBuffer().duplicate()); } @Override