[fix][broker] PIP-379 Key_Shared implementation race condition causing out-of-order message delivery#23874
Conversation
(cherry picked from commit 005b5c0)
|
There's some remaining issue in the test where it fails in a different way. Will investigate later. |
Resolved the issue. The test contained a race condition. |
This reverts commit e40461f.
There was a problem hiding this comment.
I think we'd better solve the issue this way, which has less lock range, better performance, and easier to read
PersistentStickyKeyDispatcherMultipleConsumers.ReplayPositionFilter.java
private class ReplayPositionFilter implements Predicate<Position> {
// tracks the available permits for each consumer for the duration of the filter usage
// the filter is stateful and shouldn't be shared or reused later
private final Map<Consumer, MutableInt> availablePermitsMap = new HashMap<>();
+ private HashSet<Long> stuckKeys = new HashSet<>();
@Override
public boolean test(Position position) {
// if out of order delivery is allowed, then any position will be replayed
if (isAllowOutOfOrderDelivery()) {
return true;
}
// lookup the sticky key hash for the entry at the replay position
Long stickyKeyHash = redeliveryMessages.getHash(position.getLedgerId(), position.getEntryId());
if (stickyKeyHash == null) {
// the sticky key hash is missing for delayed messages, the filtering will happen at the time of
// dispatch after reading the entry from the ledger
if (log.isDebugEnabled()) {
log.debug("[{}] replay of entry at position {} doesn't contain sticky key hash.", name, position);
}
return true;
}
+ if (stuckKeys.contains(stickyKeyHash)) {
+ return false;
+ }
// find the consumer for the sticky key hash
Consumer consumer = selector.select(stickyKeyHash.intValue());
// skip replaying the message position if there's no assigned consumer
if (consumer == null) {
+ stuckKeys.add(stickyKeyHash);
return false;
}
// lookup the available permits for the consumer
MutableInt availablePermits =
availablePermitsMap.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(consumer)));
// skip replaying the message position if the consumer has no available permits
if (availablePermits.intValue() <= 0) {
+ stuckKeys.add(stickyKeyHash);
return false;
}
if (drainingHashesRequired
&& drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash.intValue())) {
// the hash is draining and the consumer is not the draining consumer
+ stuckKeys.add(stickyKeyHash);
return false;
}
availablePermits.decrement();
return true;
}
}
}
@poorbarcode thanks, I'll check the details to see if it addresses the issue. |
@poorbarcode Yes, it looks good. Great solution. Just thinking if a race could happen at that stage (in sending/dispatching) when a draining hash is unblocked: |
This is the complete method: I guess that unblocking at this stage wouldn't cause a race where messages could get delivered out-of-order. At least I'm not seeing it, although I first thought that there could be such a case. |
Just to be sure, I added a similar solution in a5d2029. There's also a slight performance benefit that there won't be a need to run all checks when the hash is already blocked in one round. I used an efficient |
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
…g out-of-order message delivery (apache#23874)
Fixes #23870
Motivation
See #23870 for details. It was possible to reproduce a racecondition that caused out-of-order message delivery.
This happened when a draining hash got unblocked by an ack while message replay filtering was in progress.
Modifications
Fix provided by @poorbarcode in comment #23874 (review)
Documentation
docdoc-requireddoc-not-neededdoc-complete