Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Apr 9, 2021

Motivation

AtomicIntegerFieldUpdater should be used to update volatile int fields when the value is incremented or decremented. This prevents "lost update" issues.

Modifications

Use existing TOTAL_AVAILABLE_PERMITS_UPDATER to mutate totalAvailablePermits field.

Co-authored-by: Devin Bost devinbost@users.noreply.github.com

@lhotari lhotari changed the title Use AtomicIntegerFieldUpdater to mutate volatile int totalAvailablePermits Use AtomicIntegerFieldUpdater to mutate volatile int totalAvailablePermits in PersistentDispatcherMultipleConsumers class Apr 9, 2021
@lhotari
Copy link
Member Author

lhotari commented Apr 9, 2021

This PR was co-authored with @devinbost . Thanks for @devinbost for pinpointing the code location around totalAvailablePermits in PersistentDispatcherMultipleConsumers class. This fix wouldn't have happened without @devinbost 's persistent investigation work. This change might fix issues such as #6054.

@codelipenghui @merlimat @eolivelli @rdhabalia Please review this change.

@lhotari
Copy link
Member Author

lhotari commented Apr 9, 2021

btw. This problem was only in the PersistentDispatcherMultipleConsumers class. Similar code in NonPersistentDispatcherMultipleConsumers didn't have the issue:

@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
if (consumerSet.removeAll(consumer) == 1) {
consumerList.remove(consumer);
log.info("Removed consumer {}", consumer);
if (consumerList.isEmpty()) {
if (closeFuture != null) {
log.info("[{}] All consumers removed. Subscription is disconnected", name);
closeFuture.complete(null);
}
TOTAL_AVAILABLE_PERMITS_UPDATER.set(this, 0);
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Trying to remove a non-connected consumer: {}", name, consumer);
}
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.getAvailablePermits());
}
}
@Override
public boolean isConsumerConnected() {
return !consumerList.isEmpty();
}
@Override
public CopyOnWriteArrayList<Consumer> getConsumers() {
return consumerList;
}
@Override
public synchronized boolean canUnsubscribe(Consumer consumer) {
return consumerList.size() == 1 && consumerSet.contains(consumer);
}
@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}
@Override
public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
if (!consumerSet.contains(consumer)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ignoring flow control from disconnected consumer {}", name, consumer);
}
return;
}
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, additionalNumberOfMessages);
if (log.isDebugEnabled()) {
log.debug("[{}] Trigger new read after receiving flow control message", consumer);
}
}

@lhotari
Copy link
Member Author

lhotari commented Apr 9, 2021

/pulsarbot run-failure-checks

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eolivelli eolivelli added this to the 2.8.0 milestone Apr 9, 2021
@lhotari
Copy link
Member Author

lhotari commented Apr 9, 2021

/pulsarbot run-failure-checks

closeFuture.complete(null);
}
totalAvailablePermits = 0;
TOTAL_AVAILABLE_PERMITS_UPDATER.set(this, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be equivalent to totalAvailablePermits = 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. I tried to make it consistent with the style used in NonPersistentDispatcherMultipleConsumers:

redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
});
totalAvailablePermits -= consumer.getAvailablePermits();
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.getAvailablePermits());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The increments/decrement on the totalAvailablePermits should already be safe because they're all done while holding the mutex on the dispatcher object.

Copy link
Member Author

@lhotari lhotari Apr 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right, it seems that all updates are already happening withing synchronized methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat I re-opened this PR. readMoreEntries method isn't synchronized and it's getting called from this location without synchronization:

public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
lock.writeLock().lock();
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
executor().execute(() -> dispatcher.readMoreEntries());
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
} finally {
lock.writeLock().unlock();
}
}
.

@devinbost
Copy link
Contributor

/pulsarbot run-failure-checks

@lhotari
Copy link
Member Author

lhotari commented Apr 9, 2021

I'm closing this PR since as @merlimat pointed out, all mutations are already safe since they are done in synchronized methods in PersistentDispatcherMultipleConsumers class.

@lhotari lhotari closed this Apr 9, 2021
@lhotari lhotari reopened this Apr 28, 2021
…rmits

- value can get corrupted if value is directly mutated
@lhotari lhotari force-pushed the lh-fix-totalAvailablePermits-mutations branch from 52e9d6c to 5f1f947 Compare April 28, 2021 03:29
@lhotari
Copy link
Member Author

lhotari commented Apr 28, 2021

Currently PersistentStreamingDispatcherMultipleConsumers.readMoreEntries method isn't synchronized:

@Override
public void readMoreEntries() {
// totalAvailablePermits may be updated by other threads
int currentTotalAvailablePermits = totalAvailablePermits;

Is there a reason why it's not a synchronized method? In one location the method is called within a synchronized block like this:

@Override
public void run(Timeout timeout) throws Exception {
if (log.isDebugEnabled()) {
log.debug("[{}] Timer triggered", dispatcher.getName());
}
if (timeout.isCancelled()) {
return;
}
synchronized (dispatcher) {
currentTimeoutTarget = -1;
timeout = null;
dispatcher.readMoreEntries();
}
}

Synchronization is missing in this location:

public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
lock.writeLock().lock();
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
executor().execute(() -> dispatcher.readMoreEntries());
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
} finally {
lock.writeLock().unlock();
}
}

@lhotari
Copy link
Member Author

lhotari commented Apr 28, 2021

Another location where the readEntries call is explicitly wrapped in a synchronized block

topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherMultipleConsumers.this) {
if (!havePendingRead) {
log.info("[{}] Retrying read operation", name);
readMoreEntries();
} else {
log.info("[{}] Skipping read retry: havePendingRead {}", name, havePendingRead, exception);
}
}
}, waitTimeMillis, TimeUnit.MILLISECONDS);

I'm running an experiment to see if tests pass when readEntries method is made synchronized.

@lhotari
Copy link
Member Author

lhotari commented Apr 28, 2021

I opened a separate PR #10413 to make the readMoreEntries method synchronized.

@lhotari
Copy link
Member Author

lhotari commented Apr 28, 2021

This PR is not needed since #10413 was merged.

@lhotari lhotari closed this Apr 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants