Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Apr 28, 2021

Motivation

There are concurrency issues in PersistentDispatcherMultipleConsumers and PersistentStreamingDispatcherMultipleConsumers classes. Some symptoms or previous fixes are #5311, #6054, #6255, #7266, #9789 .

Currently PersistentStreamingDispatcherMultipleConsumers.readMoreEntries method isn't synchronized:

@Override
public void readMoreEntries() {
// totalAvailablePermits may be updated by other threads
int currentTotalAvailablePermits = totalAvailablePermits;

In one location the method is called within a synchronized block like this:

@Override
public void run(Timeout timeout) throws Exception {
if (log.isDebugEnabled()) {
log.debug("[{}] Timer triggered", dispatcher.getName());
}
if (timeout.isCancelled()) {
return;
}
synchronized (dispatcher) {
currentTimeoutTarget = -1;
timeout = null;
dispatcher.readMoreEntries();
}
}

Another location where the readEntries call is explicitly wrapped in a synchronized block

topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherMultipleConsumers.this) {
if (!havePendingRead) {
log.info("[{}] Retrying read operation", name);
readMoreEntries();
} else {
log.info("[{}] Skipping read retry: havePendingRead {}", name, havePendingRead, exception);
}
}
}, waitTimeMillis, TimeUnit.MILLISECONDS);

Synchronization is missing in this location (and a few other locations):

public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
lock.writeLock().lock();
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
executor().execute(() -> dispatcher.readMoreEntries());
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
} finally {
lock.writeLock().unlock();
}
}

It seems that it would be more consistent to make the readMoreEntries method a synchronized method.

Modifications

  • make readEntriesMethod in PersistentDispatcherMultipleConsumers and PersistentStreamingDispatcherMultipleConsumers classes a synchronized method.

@lhotari
Copy link
Member Author

lhotari commented Apr 28, 2021

@eolivelli @devinbost @merlimat @codelipenghui @rdhabalia Please review this change that could be a solution to some concurrency issues in PersistentDispatcherMultipleConsumers and PersistentStreamingDispatcherMultipleConsumers classes.

@devinbost
Copy link
Contributor

You know you've got my support, though I do wonder if there was a specific reason why it wasn't synchronized already. One of the others with a deeper knowledge of the architecture might have some insight there.

@devinbost
Copy link
Contributor

After thinking about this, I noticed a few things.

  1. readMoreEntries() is recursive, but synchronized locks in Java are reentrant, so synchronizing the method won't create an immediate deadlock
  2. as noted, readMoreEntries() is called in more than one location by a synchronized method, so a lock is already being placed. If those calls occur from different threads, we will introduce lock contention, which will have a performance impact. Also, since synchronized on a method is equivalent to synchronized(this), we could be placing a larger lock than necessary. In that case, local locks will be less expensive.
  3. We want to be mindful of the impact on latency because message dispatch is blocked by this path. However, unless the impact is huge, we should be okay because message consumption will be buffered by permits. So, except for the initial dispatch (where the client doesn't already have messages in its queue), most applications won't be affected by an increase in latency here.
  4. It could be argued that many local locks consume more memory than using a single lock, but there are ways of mitigating the memory impact (such as by using AtomicIntegerFieldUpdater).

So, I think it becomes a question of latency vs memory impact and whether more local locks should be used instead.

@merlimat merlimat added the type/bug The PR fixed a bug or issue reported a bug label Apr 28, 2021
@merlimat merlimat added this to the 2.8.0 milestone Apr 28, 2021
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Initially the readMoreEntries() was a private method and always called with sync already taken..

@eolivelli
Copy link
Contributor

@devinbost this patch did not apply. See the related patch for branch 2.7

eolivelli pushed a commit that referenced this pull request May 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants