Prohibit assigning concurrent maps into Map-typed variables and fields and fix a race condition in CoordinatorRuleManager#6898
Conversation
…s; Fix a race condition in CoordinatorRuleManager; improve logic in DirectDruidClient and ResourcePool
| private static final Logger log = new Logger(DirectDruidClient.class); | ||
|
|
||
| private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = new ConcurrentHashMap<>(); | ||
| private static final ConcurrentHashMap<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = |
There was a problem hiding this comment.
Change ConcurrentHashMap to ConcurrentMap.
There was a problem hiding this comment.
Actually, it's often should be the opposite: ConcurrentHashMap should be deliberately used instead of ConcurrentMap whenever compute(), computeIfAbsent(), etc. called on the map, because ConcurrentHashMap guarantees atomicity and linearizability of such actions, but ConcurrentMap doesn't. E. g. ConcurrentSkipListMap merely guarantees that if two concurrent threads call computeIfAbsent() on the same key at the same time, the program won't crash with IllegalStateException or ConcurrentModificationException, but the lambdas could be computed in parallel and it's unknown which wins.
I will go though this PR and change types.
There was a problem hiding this comment.
Got it, thanks for the explanation.
| @@ -159,14 +158,15 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> c | |||
|
|
|||
| Pair<JavaType, JavaType> types = typesMap.get(query.getClass()); | |||
| if (types == null) { | |||
There was a problem hiding this comment.
Since using computeIfAbsent, this if (types == null) can be removed.
There was a problem hiding this comment.
It could improve concurrency, see #4397 (comment). I'll add a comment.
There was a problem hiding this comment.
Hmm, computeIfAbsent() may call get() again(ConcurrentHashMap doesn't, ConcurrentSkipListMap does), so I think we shouldn't use computeIfAbsent(), using put() is ok here.
There was a problem hiding this comment.
If two queries of the same new type are run in parallel, there could be a race between them. Maybe it could be tolerated here because computation (body of the lambda) could be run in parallel and re-run for the same type with no harm, but computeIfAbsent() is clearer.
| private ExecutorService listenerExecutor; | ||
|
|
||
| private final Map<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>(); | ||
| private final ConcurrentHashMap<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Same, change ConcurrentHashMap to ConcurrentMap.
There was a problem hiding this comment.
| private final IndexMerger indexMerger; | ||
| private final Cache cache; | ||
| private final Map<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); | ||
| private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Same, change ConcurrentHashMap to ConcurrentMap.
There was a problem hiding this comment.
yeah, I guess this one looks like it could just be ConcurrentMap
| return retVal; | ||
| ConcurrentMap<SegmentId, String> segments = currentlyProcessingSegments.get(tier); | ||
| List<String> segmentsAndHosts = new ArrayList<>(); | ||
| segments.forEach((segmentId, serverId) -> segmentsAndHosts.add(segmentId + " ON " + serverId)); |
There was a problem hiding this comment.
I wonder why you didn't use StringUtils.format.
There was a problem hiding this comment.
Simple string concatenation should be faster because it doesn't involve parsing of the format string.
…erge() is called on a ConcurrentHashMap, it's stored in a ConcurrentHashMap-typed variable, not ConcurrentMap; add comments explaining get()-before-computeIfAbsent() optimization; refactor Counters; fix a race condition in Intialization.java
|
@QiuMM in a newer commit I've added comments and enforced that @jihoonson FYI I refactored |
gianm
left a comment
There was a problem hiding this comment.
LGTM. I had a couple of comments and questions but nothing critical.
| <constraint name="x" within="" contains="" /> | ||
| <constraint name="y" nameOfExprType="java\.util\.concurrent\.ConcurrentMap" expressionTypes="java.util.concurrent.ConcurrentMap" exprTypeWithinHierarchy="true" within="" contains="" /> | ||
| </searchConfiguration> | ||
| <searchConfiguration name="A ConcurrentHashMap on which compute() is called should be assinged into variables of ConcurrentHashMap type, not ConcurrentMap" text="$x$.compute($y$, $z$)" recursive="true" caseInsensitive="true" type="JAVA"> |
There was a problem hiding this comment.
Consider including the rationale in this message: it is not obvious that it's because ConcurrentMap does not guarantee atomicity.
There was a problem hiding this comment.
This field is not assumed to be a message, it's a configuration name, I think I already overuse them. Probably neither desktop IntelliJ nor TeamCity CI are prepared for something multiline in this field.
| public static <K> int incrementAndGetInt(ConcurrentHashMap<K, AtomicInteger> counters, K key) | ||
| { | ||
| return intCounters.computeIfAbsent(key, k -> new AtomicInteger()).addAndGet(val); | ||
| // get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed. |
There was a problem hiding this comment.
Any idea why ConcurrentHashMap does not already employ an optimization like this?
There was a problem hiding this comment.
That's a throughput vs. scalability tradeoff, + lack of information. We are potentially doing two operations instead of one, and avoid locking in some cases instead.
At some sites where computeIfAbsent() is actually expected to find the key absent and compute the value most of the time, the get() guard just makes things worse.
There is also an area where it's hard for me to say what approach is better, is that when the map is big and computeIfAbsent() constitutes significant part of the app's CPU consumption (the bigger the map and the hotter computeIfAbsent() call is, the more likely that it's better to not guard computeIfAbsent() with get()). I think it's never nearly the case on Druid nodes that computeIfAbsent() is hot, but I could be wrong.
There was a problem hiding this comment.
From the ConcurrentHashMap's part, it would be useful if computeIfAbsentMoreScalableButMaybeDoingExtraWork() existed, where they don't recompute hash bucket twice and just walk the collision chain twice. But it's easy to imagine why such method doesn't exist.
…zation; IdentityHashMap optimization
11057ee to
14307c3
Compare
|
It looks like some unit tests are failing now with similar messages. Maybe a recent change broke something? |
| private ExecutorService listenerExecutor; | ||
|
|
||
| private final Map<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>(); | ||
| private final ConcurrentHashMap<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
| private final IndexMerger indexMerger; | ||
| private final Cache cache; | ||
| private final Map<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); | ||
| private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
yeah, I guess this one looks like it could just be ConcurrentMap
Concurrent maps (i. e.
ConcurrentHashMapandConcurrentSkipListMap) should be assigned into variables of their respective type orConcurrentMap, but not justMap.Why this is important could be seen in
CoordinatorRuleManager, where it's pretty obvious that codehas a race condition, but previously when the type of the variable was
Mapit was not obvious.This race condition in
CoordinatorRuleManageris fixed in this PR. Also, improved logic inDirectDruidClientandResourcePool.