-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix] [ml] fix discontinuous ledger deletion #20898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [ml] fix discontinuous ledger deletion #20898
Conversation
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
Outdated
Show resolved
Hide resolved
…l/ManagedLedgerImpl.java Co-authored-by: Qiang Zhao <mattisonchao@apache.org>
mattisonchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM :)
| } | ||
|
|
||
| long slowestReaderLedgerId = -1; | ||
| final long slowestNonDurationLedgerId = getTheSlowestNonDurationReadPosition().getLedgerId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can introduce a new API to the cursor container to get this position and explain cursors.getSlowestReaderPosition() only can get the nonDurable cursor in the future by #17273. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can improve it in the future
lifepuzzlefun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch !
| LedgerInfo ls = ledgerInfoIterator.next(); | ||
| releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we break out of loops early?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, all the ledgers in the collection may no longer need to be read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we break out of loops early, if ledgerId >= slowestNonDurationLedgerId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, fixed.
| // Handle the -1 size limit as "infinite" size quota | ||
| return config.getRetentionSizeInMB() >= 0 | ||
| && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte; | ||
| return retentionSizeInMB >= 0 && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= retentionSizeInMB * MegaByte; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use an immutable value to instead of TOTAL_SIZE_UPDATER.get(this)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I misunderstood this before. It was fixed now, could you take a look again?
Technoboy-
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
gaoran10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #20898 +/- ##
============================================
- Coverage 73.23% 73.09% -0.15%
- Complexity 3762 32129 +28367
============================================
Files 1874 1875 +1
Lines 139377 139403 +26
Branches 15328 15328
============================================
- Hits 102068 101890 -178
- Misses 29297 29450 +153
- Partials 8012 8063 +51
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
- The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)`
- The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)`
So the two tasks above may run concurrently/
The task `trim ledgers` work as the flow below:
- find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`.
- check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`.
- if the create time of the ledger is lower than the earliest retention time, mark it should be deleted
- if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted
- delete the`{Ledgers to be deleted}`
**(Highlight)** There is a scenario that causes the task `trim ledgers` did discontinuous ledger deletion, resulting consume messages discontinuous:
- context:
- ledgers: `[{id=1, size=100}, {id=2,size=100}]`
- retention size: 150
- no cursor there
- Check `ledger 1`, skip by retention check `(200 - 100) < 150`
- One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now.
- Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted.
- Delete the `ledger 2`.
- Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped.
Once the retention constraint has been met, break the loop.
(cherry picked from commit 782e91f)
- The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)`
- The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)`
So the two tasks above may run concurrently/
The task `trim ledgers` work as the flow below:
- find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`.
- check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`.
- if the create time of the ledger is lower than the earliest retention time, mark it should be deleted
- if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted
- delete the`{Ledgers to be deleted}`
**(Highlight)** There is a scenario that causes the task `trim ledgers` did discontinuous ledger deletion, resulting consume messages discontinuous:
- context:
- ledgers: `[{id=1, size=100}, {id=2,size=100}]`
- retention size: 150
- no cursor there
- Check `ledger 1`, skip by retention check `(200 - 100) < 150`
- One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now.
- Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted.
- Delete the `ledger 2`.
- Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped.
Once the retention constraint has been met, break the loop.
(cherry picked from commit 782e91f)
### Motivation
- The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)`
- The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)`
So the two tasks above may run concurrently/
The task `trim ledgers` work as the flow below:
- find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`.
- check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`.
- if the create time of the ledger is lower than the earliest retention time, mark it should be deleted
- if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted
- delete the`{Ledgers to be deleted}`
**(Highlight)** There is a scenario that causes the task `trim ledgers` did discontinuous ledger deletion, resulting consume messages discontinuous:
- context:
- ledgers: `[{id=1, size=100}, {id=2,size=100}]`
- retention size: 150
- no cursor there
- Check `ledger 1`, skip by retention check `(200 - 100) < 150`
- One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now.
- Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted.
- Delete the `ledger 2`.
- Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped.
### Modifications
Once the retention constraint has been met, break the loop.
(cherry picked from commit 782e91f)
### Motivation
- The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)`
- The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)`
So the two tasks above may run concurrently/
The task `trim ledgers` work as the flow below:
- find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`.
- check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`.
- if the create time of the ledger is lower than the earliest retention time, mark it should be deleted
- if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted
- delete the`{Ledgers to be deleted}`
**(Highlight)** There is a scenario that causes the task `trim ledgers` did discontinuous ledger deletion, resulting consume messages discontinuous:
- context:
- ledgers: `[{id=1, size=100}, {id=2,size=100}]`
- retention size: 150
- no cursor there
- Check `ledger 1`, skip by retention check `(200 - 100) < 150`
- One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now.
- Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted.
- Delete the `ledger 2`.
- Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped.
### Modifications
Once the retention constraint has been met, break the loop.
(cherry picked from commit 782e91f)
(cherry picked from commit b87c0fb)
### Motivation
- The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)`
- The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)`
So the two tasks above may run concurrently/
The task `trim ledgers` work as the flow below:
- find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`.
- check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`.
- if the create time of the ledger is lower than the earliest retention time, mark it should be deleted
- if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted
- delete the`{Ledgers to be deleted}`
**(Highlight)** There is a scenario that causes the task `trim ledgers` did discontinuous ledger deletion, resulting consume messages discontinuous:
- context:
- ledgers: `[{id=1, size=100}, {id=2,size=100}]`
- retention size: 150
- no cursor there
- Check `ledger 1`, skip by retention check `(200 - 100) < 150`
- One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now.
- Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted.
- Delete the `ledger 2`.
- Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped.
### Modifications
Once the retention constraint has been met, break the loop.
(cherry picked from commit 782e91f)
(cherry picked from commit b87c0fb)
### Motivation
- The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)`
- The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)`
So the two tasks above may run concurrently/
The task `trim ledgers` work as the flow below:
- find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`.
- check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`.
- if the create time of the ledger is lower than the earliest retention time, mark it should be deleted
- if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted
- delete the`{Ledgers to be deleted}`
**(Highlight)** There is a scenario that causes the task `trim ledgers` did discontinuous ledger deletion, resulting consume messages discontinuous:
- context:
- ledgers: `[{id=1, size=100}, {id=2,size=100}]`
- retention size: 150
- no cursor there
- Check `ledger 1`, skip by retention check `(200 - 100) < 150`
- One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now.
- Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted.
- Delete the `ledger 2`.
- Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped.
### Modifications
Once the retention constraint has been met, break the loop.
(cherry picked from commit 782e91f)
(cherry picked from commit b87c0fb)
Motivation
trim ledgersruns in the threadBkMainThreadPool.choose(ledgerName)write entries to BKruns in the threadBkMainThreadPool.choose(ledgerId)So the two tasks above may run concurrently/
The task
trim ledgerswork as the flow below:{Ledgers before the slowest read}.{Ledgers before the slowest read}is out of retention policy, the result is{Ledgers to be deleted}.{Ledgers to be deleted}In
step-3-2, the implementation is like this:see: https://github.com/apache/pulsar/blob/branch-3.0/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2671-L2698
(Highlight) There is a scenario that causes the task
trim ledgersdid discontinuous ledger deletion, resulting consume messages discontinuous:[{id=1, size=100}, {id=2,size=100}]ledger 1, skip by retention check(200 - 100) < 150calculateTotalSizeWrited()would return300now.ledger 2, retention check(300 - 100) > 150, mark the ledger-2 should be deleted.ledger 2.[ledger-1, ledegr-3], but theledger-2will be skipped.Modifications
once the retention constraint has been met, break the loop.
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: x