-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][Java Client] Fix thread safety issue of LastCumulativeAck
#16072
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][Java Client] Fix thread safety issue of LastCumulativeAck
#16072
Conversation
5c48830 to
ba30863
Compare
|
Now all tests passed, PTAL, @lhotari @gaozhangmin @congbobo184 @mattisonchao @Technoboy- @codelipenghui |
Demogorgon314
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just left two minor comments.
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Show resolved
Hide resolved
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Outdated
Show resolved
Hide resolved
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Outdated
Show resolved
Hide resolved
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Outdated
Show resolved
Hide resolved
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Outdated
Show resolved
Hide resolved
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Outdated
Show resolved
Hide resolved
c2b911a to
38ddc7d
Compare
|
I've updated this PR with a refactored change and the PR description has also been updated, PTAL again @Demogorgon314 @congbobo184 |
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. Remove unused field Don't reset in LastCumulativeAck#flush
ac95266 to
89cf62a
Compare
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Show resolved
Hide resolved
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Show resolved
Hide resolved
congbobo184
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
…ache#16072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd) (cherry picked from commit 5eefdf1)
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
Motivation
There were several issues caused by the thread safe issue of
LastCumulativeAck, see:The root cause is that
LastCumulativeAckcould be accessed bydifferent threads, especially in
flushAsyncmethod. But the fields areaccessed directly and no thread safety can be guaranteed.
In addition, the current
LastCumulativeAckclass was added in#8996 to hold two object
references, but this modification is wrong.
Before #8996, there are two CAS operations in
doCumulativeAckmethodin case it's called concurretly. Though the composite CAS operation is
not atomic.
However, after #8996, only CAS operation was performed but it's compared
with a
LastCumulativeAckobject, not the two fields (messageIdandbitSetRecyclable).There is another issue that it uses a flag
cumulativeAckFlushRequiredto mark if
lastCumulativeAckshould flush. However, ifflushAsyncwas called concurrently, both would send ACK commands to broker.
Modifications
To solve the thread safety issue, this PR move the
LastCumulativeAckout of the
PersistentAcknowledgmentsGroupingTrackerto disabledirectly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
update: Guarantee the safe write operations. It also recycles theBitSetRecyclableobject before assigning new values and indicatesitself can be flushed.
flush: If it can be flushed, return a thread localLastCumulativeAckinstance that contains the message ID and the bitset. The bit set is deep copied to avoid the original reference being
recycled in another
updatecall.In addition, since the
messageIdfield is volatile, thegetMessageIdmethod can always retrieve the latest reference.
LastCumulativeAckTestis added to verify the sematics above.Based on the new design, we can only maintain a
LastCumulativeAckfield in
PersistentAcknowledgmentsGroupingTrackerand call the relatedmethods in
doCumulativeAckandflushAsync. It also fixes the problemthat two concurrent
flushAsynccalls might send the same ACK commandtwice.
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)