Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Mar 1, 2021

Motivation

Currently ConcurrentOpenHashMap.forEach keeps a read lock in each section for the duration of the processing. This can lead to various issues when I/O operations are done in the processing function. This is a common patter in Pulsar code base. The I/O usually happens to Zookeeper and it leads to threads piling up in waiting for locks in ConcurrentOpenHashMap when there are both reads and writes for the same section in concurrently executing threads.
It is also possible that long living locks lead to dead locks.

This is the current implementation of ConcurrentOpenHashMap.forEach

public void forEach(BiConsumer<? super K, ? super V> processor) {
for (Section<K, V> s : sections) {
s.forEach(processor);
}
}

and the referenced Section.forEach
public void forEach(BiConsumer<? super K, ? super V> processor) {
long stamp = tryOptimisticRead();
Object[] table = this.table;
boolean acquiredReadLock = false;
try {
// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
table = this.table;
}
// Go through all the buckets for this section
for (int bucket = 0; bucket < table.length; bucket += 2) {
K storedKey = (K) table[bucket];
V storedValue = (V) table[bucket + 1];
if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
storedKey = (K) table[bucket];
storedValue = (V) table[bucket + 1];
}
if (storedKey != DeletedKey && storedKey != EmptyKey) {
processor.accept(storedKey, storedValue);
}
}
} finally {
if (acquiredReadLock) {
unlockRead(stamp);
}
}
}

Modifications

  • Add a new method ConcurrentOpenHashMap.forEachInSnapshot which makes a copy of the entries in each section before processing the entries.
    public void forEachInSnapshot(BiConsumer<? super K, ? super V> processor) {
        for (Section<K, V> s : sections) {
            List<Entry<K, V>> entries = new ArrayList<>(s.size);
            s.forEach((k, v) -> entries.add(new Entry<>(k, v)));
            entries.forEach(entry -> processor.accept(entry.key, entry.value));
        }
    }
  • Replace all current usages of forEach with forEachInSnapshot

There are tradeoffs involved. Some memory allocations will have to be made to make a copy. However, allocations aren't a real issue. JVMs are really efficient for such short time allocations and the allocations aren't causing new bottlenecks or performance issues.
The other tradeoff is a related to data consistency. The entry might have already been removed from the collection at the time it is processed. This is also a non-issue in most cases since it would be a bad solution in the first place to determine state based on the membership in the collection.

Since the previous forEach implementation remains, it's possible to choose between the usage of forEach or forEachInSnapshot in each specific use case. If this feature isn't needed, it is possible to replace the forEach implemention with the snapshotting implemention completely.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I like this work and I am +1 to the new method.

But I am not sure we can touch so many sensible parts of the codebase in one single patch.
We could break things without knowing and it will be hard to track down to the cause.

I don't know which is the best strategy to incorporate this work safely

@lhotari
Copy link
Member Author

lhotari commented Mar 1, 2021

But I am not sure we can touch so many sensible parts of the codebase in one single patch.
We could break things without knowing and it will be hard to track down to the cause.

I don't know which is the best strategy to incorporate this work safely

that's a valid concern when a major change is made. However, the problems that the long living locks cause are nondeterministic and much hard to track down. One of the issues with StampedLocks used by ConcurrentOpenHashMap is that a dead lock doesn't get detected by the JVM. I made a thread dump of an experiment where I caused a dead lock by modifying the testDeadlockPreventionWithForEachInSnapshot to use forEach and a timeout (@Test(timeOut = 10000L)). I cherry-picked #9766 to get a standard thread dump with locks information. The StampedLocks information don't seem to be covered.
StampedLocks aren't re-entrant so that is yet another way how deadlocks can occur since a single thread can deadlock.

@lhotari
Copy link
Member Author

lhotari commented Mar 1, 2021

Here's an example of a suspicious stacktrace from one of the ReplicatorTest timeouts (full threaddump):

"ForkJoinPool.commonPool-worker-2" daemon prio=5 tid=503 in Object.wait()
java.lang.Thread.State: WAITING (on object monitor)
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1286)
        at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:367)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.onPoliciesUpdate(PersistentTopic.java:2170)
        at org.apache.pulsar.broker.service.BrokerService.lambda$null$62(BrokerService.java:1674)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$922/1818690322.accept(Unknown Source)
        at java.util.Optional.ifPresent(Optional.java:159)
        at org.apache.pulsar.broker.service.BrokerService.lambda$null$63(BrokerService.java:1674)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$921/148576384.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.BrokerService.lambda$onUpdate$64(BrokerService.java:1669)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$920/1723403002.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.BrokerService.onUpdate(BrokerService.java:1665)
        at org.apache.pulsar.broker.service.BrokerService.onUpdate(BrokerService.java:175)
        at org.apache.pulsar.zookeeper.ZooKeeperDataCache.lambda$4(ZooKeeperDataCache.java:138)
        at org.apache.pulsar.zookeeper.ZooKeeperDataCache$$Lambda$538/1964134182.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$18(ZooKeeperCache.java:386)
        at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$188/1103281887.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$14(ZooKeeperCache.java:366)
        at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$198/880015051.run(Unknown Source)
        at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

whenever Policies get updated, this code will get executed:

@Override
public void onUpdate(String path, Policies data, Stat stat) {
final NamespaceName namespace = NamespaceName.get(NamespaceBundleFactory.getNamespaceFromPoliciesPath(path));
log.info("{} updating with {}", path, data);
topics.forEach((name, topicFuture) -> {
if (namespace.includes(TopicName.get(name))) {
// If the topic is already created, immediately apply the updated policies, otherwise once the topic is
// created it'll apply the policies update
topicFuture.thenAccept(topic -> {
if (log.isDebugEnabled()) {
log.debug("Notifying topic that policies have changed: {}", name);
}
topic.ifPresent(t -> t.onPoliciesUpdate(data));
});
}
});
// sometimes, some brokers don't receive policies-update watch and miss to remove replication-cluster and still
// own the bundle. That can cause data-loss for TODO: git-issue
unloadDeletedReplNamespace(data, namespace);
}

One can find the other possible partiticipants of the dead lock by searching for "ConcurrentOpenHashMap" in the thread dump file.

This type of issues stopped occuring in ReplicatorTest after applying the changes in this PR.

When looking at the existing code in ConcurrentOpenHashMap$Section.forEach, it seems that there wouldn't be a major difference in data consistency when using the snapshot approach. The forEach uses optimistic reads. If a rehashing happens, it will continue to iterate the table which was in place before the rehashing. The read lock gets acquired only in the case where some write happens during the read. Examples can be seen from ReplicatorTest failures where there are actual failures so there seems to be a clear need to reduce duration of locks when processing items held in ConcurrentOpenHashMaps.

@lhotari lhotari force-pushed the lh-reduce-concurrentopenhashmap-locking branch from ff62064 to 576b13b Compare March 1, 2021 17:37
@merlimat
Copy link
Contributor

merlimat commented Mar 1, 2021

There are tradeoffs involved. Some memory allocations will have to be made to make a copy. However, allocations aren't a real issue. JVMs are really efficient for such short time allocations and the allocations aren't causing new bottlenecks or performance issues.

The primary goal of having these custom concurrent maps implementation was actually to avoid memory allocations to the max possible extent :)

Even if these allocations are short lived, they still contribute to generate garbage that will fill up the new-gen space at a faster rate. That in turn will cause more "in-flight", reasonably short-lived objects to get caught in a collection event and copied over to old-gen spaces, where they'll have to get compacted later on.

On the class there are also already 2 methods, keys() and values(), that are taking copies, thought they were put there mostly to simplify unit tests.

The I/O usually happens to Zookeeper and it leads to threads piling up in waiting for locks in ConcurrentOpenHashMap when there are both reads and writes for the same section in concurrently executing threads.

We shouldn't be doing blocking calls from within the process method. I think we should rather refactor that code instead.

@lhotari
Copy link
Member Author

lhotari commented Mar 2, 2021

The primary goal of having these custom concurrent maps implementation was actually to avoid memory allocations to the max possible extent :)

Even if these allocations are short lived, they still contribute to generate garbage that will fill up the new-gen space at a faster rate. That in turn will cause more "in-flight", reasonably short-lived objects to get caught in a collection event and copied over to old-gen spaces, where they'll have to get compacted later on.

Yes, it makes sense to avoid memory allocations in general, but here there could be reason to make a compromise in order to make Pulsar more stable.

In this case, the memory allocated in forEachInSnapshot is a tiny fraction of all memory allocations happening. The ConcurrentOpenHashMap.forEach calls aren't on the fast path of processing where a lot of garbage could be generated. The amount of memory allocations are in the order of a few kilobytes per second since it's only about a few object references that consume memory (about 28 bytes per item: 16+4+4 bytes for Entry, 4 bytes for each item in the ArrayList). Is GC pressure really a problem in this case and how .forEach is used in the Pulsar code base?

We shouldn't be doing blocking calls from within the process method. I think we should rather refactor that code instead.

This is easier said than done. It's a major refactoring to refactor the code in that way. It won't happen quickly. There's plenty of code that does blocking calls within forEach or putIfAbsent methods. My assumption is that Zookeeper is involved in a lot of the cases.

After reviewing how the current forEach method relies on optimistic locking, I don't think there is risk from data consistency perspective in switching to use the forEachInSnapshot method for processing elements of ConcurrentOpenHashMaps. That was something that was my major concern initially that the behavior would be a lot different. It's no more a concern for me.

I have added a few signs of deadlocks from ReplicatorTest in comments of this PR. All such issues go away after applying the changes in this PR. That's why I'm confident that this PR is useful.

In the past, I have been investigating some problems in real environments where the symptoms observed from the thread dumps look similar to thread dumps from stalled ReplicatorTests. A certain set of problems seem to happen only in Pulsar deployments where there's a large amount of topics. When there's a lot of topics, the .forEach iterations take a long time. There's a lot more changes that the nondeterministic deadlocks occur. That is something that the changes in this PR would prevent.

Perhaps we could evaluate each individual usage of .forEach, case-per-case, and decide whether to use .forEach or .forEachInSnapshot? When there's a change the the processing uses blocking methods, we would choose .forEachInSnapshot and if it's computation-only, .forEach would be used? @merlimat WDYT?

@lhotari
Copy link
Member Author

lhotari commented Mar 2, 2021

/pulsarbot run-failure-checks

@eolivelli
Copy link
Contributor

@lhotari
what about changing to the new method only in the cases that are relevant to the deadlocks you see in the test ?

@lhotari
Copy link
Member Author

lhotari commented Mar 2, 2021

what about changing to the new method only in the cases that are relevant to the deadlocks you see in the test ?

@eolivelli Why limit to the cases that happen to show up? Matteo commented earlier: "We shouldn't be doing blocking calls from within the process method. I think we should rather refactor that code instead." Based on this rule, it can be determined from the code whether a certain usage of .forEach needs addressing or not.
If it's possible to refactor the code to not use blocking calls, that should be done. Since it's really hard to get rid of the blocking calls in processing methods, it leaves the only choice of switching to use .forEachInSnapshot instead of .forEach.
Makes sense?

@lhotari
Copy link
Member Author

lhotari commented Mar 2, 2021

/pulsarbot run-failure-checks

@merlimat
Copy link
Contributor

merlimat commented Mar 2, 2021

In this case, the memory allocated in forEachInSnapshot is a tiny fraction of all memory allocations happening. The ConcurrentOpenHashMap.forEach calls aren't on the fast path of processing where a lot of garbage could be generated. The amount of memory allocations are in the order of a few kilobytes per second since it's only about a few object references that consume memory (about 28 bytes per item: 16+4+4 bytes for Entry, 4 bytes for each item in the ArrayList). Is GC pressure really a problem in this case and how .forEach is used in the Pulsar code base?

This map is used in many places throughout the code base and, depending on the use cases, there can be instances with a lot of entries or many instances with few entries. Also, the rate of this scanning through the map could be quite high in some cases.

In light of that, I'm not convinced that a model where we always defensively copy the entire map when we need to do a map scan is a good general option, or a good replacement in the current code where most places assumed the scanning not be expensive (allocations-wise), even though there might be few specific cases in which it's appropriate.

If we do have blocking in that code that process the map items, and that cannot be easily removed, I believe a better alternative would be to make the scanning more flexible.

For example, we're not expecting to do a scan over a consistent snapshot of the map (for the simple reason that the map is broken down into independent sections). We only need to maintain the consistency between a key and it's associated value.

With that in mind, we don't strictly need to get a read lock for the entire duration of scanning the whole section.
We could instead use different approaches:

  • Grab the stamp once per section and upgrade to read-lock only if there are write occurring
  • Do the same as a get, with stamp and fallback to read-lock per each item we're reading

Another alternative could be to measure the time spent in the process function and decide based on that wether to make a copy or keep the read lock during the scan.

@lhotari
Copy link
Member Author

lhotari commented Mar 3, 2021

This map is used in many places throughout the code base and, depending on the use cases, there can be instances with a lot of entries or many instances with few entries. Also, the rate of this scanning through the map could be quite high in some cases.

That's the reason for the previous suggestion that we would evaluate each usage of ConcurrentOpenHashMap.forEach call individually and decide on a case-by-case basis how to fix the handling. In some cases, it could remain to use the existing forEach call and in the cases of I/O (or "blocking") or mutations of other lockable resources, we would switch to use forEachInSnapshot.

In light of that, I'm not convinced that a model where we always defensively copy the entire map when we need to do a map scan is a good general option, or a good replacement in the current code where most places assumed the scanning not be expensive (allocations-wise), even though there might be few specific cases in which it's appropriate.

It's not really a defensive copy of the entire map. The copy is done one section at a time. Memory allocations and GC is extremely efficient in modern JVMs. Since it's about 28 bytes per entry, there would have to have millions of entries for the memory allocation overhead to have any noticeable significance. If necessary, this could be benchmarked.
Since there aren't millions of entries, but usually a few thousand of entries, it has even less significance. This is why I don't see a reason to avoid the copying in this case.

If we do have blocking in that code that process the map items, and that cannot be easily removed,

We do have IO operations (interactions with Zookeeper) in most of the code that processes the map items and it cannot be easily removed.

I believe a better alternative would be to make the scanning more flexible.
...
Grab the stamp once per section and upgrade to read-lock only if there are write occurring

The .forEach method in master branch is already implemented this way "Grab the stamp once per section and upgrade to read-lock only if there are write occurring".
It reduces the duration of the locks in the case that there are no writes. When there are writes, the .forEach processing will hold the read lock until all items have been processed. This is one of the reasons why the problems are rare with small number of items or low amount of write operations. However such a solution won't completely prevent deadlocks, if the model is such that it could lead to a deadlock that isn't detected or resolved after some timeout. A deadlock will eventually happen even if the probability is low. Currently the Pulsar broker locks up and doesn't resume operations if there's a deadlock. That's what makes the problem severe. Since the locks could be in specific sections of a ConcurrenOpenHashMap, it leads to partial unavailability which is hard to detect.

Improving Pulsar stability is high priority and this should be addressed. Please don't take me wrong that I would think that this PR resolves all such issues. I know that it will only resolve the issues that are caused by a dead lock which could be avoided by not locking at all when processing items of a ConcurrentOpenHashMap.

It would be interesting if @Vanlightly could take a look from a formal verification / modelling perspective on what would be practical in this case. @merlimat WDYT?

@eolivelli
Copy link
Contributor

@lhotari
Let me step back, in order to understand better the issue.

if we execute the loop over the sections always in the same order and keep/release the locks in the appropriate order.
how can we fall into a deadlock ?

@lhotari
Copy link
Member Author

lhotari commented Mar 3, 2021

how can we fall into a deadlock ?

Not an easy question. I don't have an answer to that. Perhaps @Vanlightly could help us?

@eolivelli
Copy link
Contributor

I see now, this is a case of deadlock, we are issuing a blocking call to ZooKeeper into a forEach, and the ZooKeeper library handles every operation in only one single thread.
So the chain is pretty hard do understand

"pulsar-backlog-quota-checker-146-1"  prio=5 tid=258 TIMED_WAITING
java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
        at org.apache.pulsar.broker.service.BacklogQuotaManager.getBacklogQuota(BacklogQuotaManager.java:66)
        at org.apache.pulsar.broker.service.BacklogQuotaManager.getBacklogQuota(BacklogQuotaManager.java:78)
        at org.apache.pulsar.broker.service.BacklogQuotaManager.getBacklogQuotaLimit(BacklogQuotaManager.java:94)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.isBacklogExceeded(PersistentTopic.java:2227)
        at org.apache.pulsar.broker.service.BrokerService.lambda$monitorBacklogQuota$53(BrokerService.java:1502)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$453/1035521358.accept(Unknown Source)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$858/1296923805.accept(Unknown Source)
        at java.util.Optional.ifPresent(Optional.java:159)
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$52(BrokerService.java:1490)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$454/965168673.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1488)
        at org.apache.pulsar.broker.service.BrokerService.monitorBacklogQuota(BrokerService.java:1499)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$280/2017773859.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

@Vanlightly
Copy link
Contributor

I've not spent much time in the Pulsar codebase, so don't know much about the wider context. But if it helps, I could model this ConcurrentOpenHashMap in TLA+ and see if it has any deadlock scenarios. Couldn't give any commitment on timing though.

@merlimat
Copy link
Contributor

merlimat commented Mar 3, 2021

It's not really a defensive copy of the entire map. The copy is done one section at a time.

By defensive, I mean that we'd be taking a copy of the entire map (section by section) each time we need to traverse it, even though there would no need to do it (eg: when a section is not being modified). In case of a map which is not updated super-frequently, that would impose a size-able overhead.

Memory allocations and GC is extremely efficient in modern JVMs.

I've been battling GC and allocations for many years now and while that might be true to a certain extent, there are also many bad GC behaviors that we had fixed doing changes like switching from the regular ConcurrentHashMap to this custom implementation.

Since it's about 28 bytes per entry, there would have to have millions of entries for the memory allocation overhead to have any noticeable significance. If necessary, this could be benchmarked.
Since there aren't millions of entries, but usually a few thousand of entries, it has even less significance. This is why I don't see a reason to avoid the copying in this case.

The problem is that there can be easily millions of entries (aggregated from 100s of thousands of maps) on a single instance. That has been the case in many production deployments for many years.

We do have IO operations (interactions with Zookeeper) in most of the code that processes the map items and it cannot be easily removed.

Sure, but we should take care of them (eg: during the refactoring to port these components to use MetadataStore instead of direct ZK access.

Also, I was commenting that we could do it such that we automatically switch to individual locks (instead of whole section lock) if the processing is taking more than X amount of time.

It reduces the duration of the locks in the case that there are no writes. When there are writes, the .forEach processing will hold the read lock until all items have been processed. This is one of the reasons why the problems are rare with small number of items or low amount of write operations. However such a solution won't completely prevent deadlocks, if the model is such that it could lead to a deadlock that isn't detected or resolved after some timeout. A deadlock will eventually happen even if the probability is low. Currently the Pulsar broker locks up and doesn't resume operations if there's a deadlock. That's what makes the problem severe. Since the locks could be in specific sections of a ConcurrenOpenHashMap, it leads to partial unavailability which is hard to detect.

While it's clear that a slow scan will provoke a spike in write latency into the map, I'm still not clear on the exact case of deadlock here.

Typically the way to get deadlocked on the map is if, while doing a scan, you're trying to access it again from a different thread (or even same thread, since the lock is non-reentrant) and that blocks the processing function.

I see now, this is a case of deadlock, we are issuing a blocking call to ZooKeeper into a forEach, and the ZooKeeper library handles every operation in only one single thread.
So the chain is pretty hard do understand

@eolivelli Even if do a ZK blocking call (which we shouldn't), that doesn't necessarily mean that there will be a deadlock. It will only be the case if the ZK callback thread is blocked trying to read/write on the same map. Do you have the other threads stacks?

To conclude, in my opinion the easiest/lower-risk/lower-overhead change here would be to switch the forEach() to do the stamped+read-lock combination for each entry, instead of the current per-section.

That will ensure that the process function is always call without holding a lock on the map. It can lead to spurious inconsistencies during re-hashing time, but I don't think that will be a problem. And anyway that can be solved by requesting to behave like the current implementation for these specific cases.

@merlimat
Copy link
Contributor

merlimat commented Mar 3, 2021

It can lead to spurious inconsistencies during re-hashing time,

Scratch that. it's actually possible to do it with the current concurrency level, no funky stuff.

@lhotari
Copy link
Member Author

lhotari commented Mar 3, 2021

To conclude, in my opinion the easiest/lower-risk/lower-overhead change here would be to switch the forEach() to do the stamped+read-lock combination for each entry, instead of the current per-section.

Thank you for the detailed feedback @merlimat . This makes sense. I believe that it would solve most of the problems. I'll revisit this PR with that approach.

@merlimat
Copy link
Contributor

merlimat commented Mar 3, 2021

I have a WIP solution already. Need to look it a bit more to see if I'm missing anything and I'll create a PR.

@lhotari
Copy link
Member Author

lhotari commented Mar 3, 2021

I have a WIP solution already. Need to look it a bit more to see if I'm missing anything and I'll create a PR.

Ok great. I'm looking forward to that.
I'll close this PR.

@lhotari lhotari closed this Mar 3, 2021
@eolivelli
Copy link
Contributor

@merlimat

@eolivelli Even if do a ZK blocking call (which we shouldn't), that doesn't necessarily mean that there will be a deadlock. It will only be the case if the ZK callback thread is blocked trying to read/write on the same map. Do you have the other threads stacks?

I didn't paste the full dump of competing stacktraces sorry.
Btw I believe that it is no more so important to the discussion. Thanks for answering my comment

@merlimat
Copy link
Contributor

merlimat commented Mar 3, 2021

@eolivelli @lhotari Created #9787

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants