-
Notifications
You must be signed in to change notification settings - Fork 3.7k
fix lastCumulativeAck.messageId npe when PersistentAcknowledgmentsGroupingTracker.flushSync #12343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix lastCumulativeAck.messageId npe when PersistentAcknowledgmentsGroupingTracker.flushSync #12343
Conversation
|
/pulsarbot run-failure-checks |
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Outdated
Show resolved
Hide resolved
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Outdated
Show resolved
Hide resolved
a963849 to
e65e64d
Compare
aa42602 to
b13d274
Compare
|
@eolivelli PTAL agagin |
|
@eolivelli PTAL |
…upingTracker.flushSync
b13d274 to
06d41bb
Compare
|
/pulsarbot run-failure-checks |
1 similar comment
|
/pulsarbot run-failure-checks |
|
@eolivelli Could you please help review this PR again? |
|
@gaozhangmin:Thanks for your contribution. For this PR, do we need to update docs? |
|
Regarding the halted executor, I believe @lhotari is adding support to ensure that these exceptions would not stop the scheduled task. |
michaeljmarshall
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering how the client gets into a state where we have cumulativeAckFlushRequired set to true, but there is no "last ack". Is there a chance that we're updating the state incorrectly?
| this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId); | ||
| shouldFlush = true; | ||
| cumulativeAckFlushRequired = false; | ||
| final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.messageId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why we enter this code block when lastCumulativeAck.messageId is null? It seems like cumulativeAckFlushRequired should be false. I'm fine with adding a null check, but it seems like there could be another issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous logic showed, stats were updated only when this.consumer.unAckedChunkedMessageIdSequenceMap.remove successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaozhangmin When reading the code, it seems that messageId should never be null. As @michaeljmarshall pointed out, there could be another issue hiding here.
It might be a thread safety issue which could cause other problems. @gaozhangmin Did you check how messageId could become null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR #11607 fixes another case where lastCumulativeAck.messageId was null. @BewareMyPower What are your thoughts about the reason why the field is null in the first place? Could there be a thread safety issue that won't go away by adding null checks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaozhangmin do you have a chance to analyse the reason? My concern is that adding a null check might just silently ignore a potential thread safety issue which might lead to inconsistency behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lhotari As I've explained in my PR, I think there is a possible thread safety problem like
if (lastCumulativeAck.messageId == null) { // 1. messageId is not null
return false;
}
// 2. messageId was modified to null in another thread
if (messageId.compareTo(lastCumulativeAck.messageId) <= 0) { // 3. messageId is null nowI think the root cause is the design of LastCumulativeAck class. It's not well encapsulated. Though its fields are all private, the class is an inner class so the outer class can access the members directly. And we can see the direct access in many places, which makes it hard to analyze whether all the accesses are thread safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the root cause is the design of LastCumulativeAck class
@BewareMyPower I also think that LastCumulativeAck class is the source of problems. Using the Netty Recycler introduces the thread safety issue. Using an immutable class design for LastCumulativeAck and removing the use of Netty recycler would be something that I'd recommend for fixing the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will implement immutable class design.
What's about this pr. should we close it? @lhotari
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaozhangmin @lhotari I picked up this issue today. After looking deeper into this issue, I found another bug and it would be better to synchronize the update operations of LastCumulativeAck. See #16072, PTAL when you have time.
@michaeljmarshall The PR #12853 has already been merged to master. |
|
/pulsarbot run-failure-checks |
|
@gaozhangmin is this a bug fix (no need to update docs)? |
|
@gaozhangmin:Thanks for providing doc info! |
Yes |
|
@gaozhangmin when submitting a PR, can you help provide a doc label (tick the box) in the PR template which contains info about doc? This helps others know more about the changes. Thanks |
|
/pulsarbot run-failure-checks |
|
Yes I would close this pr |
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, two synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object, which will be responsible to recycle the `BitSetRecyclable` field after that. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command and recycle the `BitSetRecyclable` field. - `lastCumulativeAck` updates the latest message ID and bit set, the update operations can be performed by multiple threads and `lastCumulativeAck` saves the latest value. - `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache to the latest value in `flushAsync`.
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, two synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object, which will be responsible to recycle the `BitSetRecyclable` field after that. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command and recycle the `BitSetRecyclable` field. - `lastCumulativeAck` updates the latest message ID and bit set, the update operations can be performed by multiple threads and `lastCumulativeAck` saves the latest value. - `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache to the latest value in `flushAsync`.
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object. After that, the `update` operation on this object won't recycle the `BitSetRecyclable` field. - `restoreOwnershipIfEmpty`: Restore the ownership from another `LastCumulativeAck` object. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command. After that, restore the ownership to `lastCumulativeAck` unless it has been updated in other threads.
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. Then mark it as no need to flush. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice.
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. Remove unused field Don't reset in LastCumulativeAck#flush
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice.
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
…ache#16072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd) (cherry picked from commit 5eefdf1)
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)

Reader ACK would be stuck:

This was caused by:
flushAsyncis executed byscheduledTask, Any thrown exception or error reaching the executor causes the executor to halt。The exception is as below: