[fix] [broker] Fix isolated group not work problem.#21096
[fix] [broker] Fix isolated group not work problem.#21096hangc0276 merged 8 commits intoapache:masterfrom
Conversation
hangc0276
left a comment
There was a problem hiding this comment.
Please add a test to protect this logic.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #21096 +/- ##
=============================================
+ Coverage 33.46% 73.18% +39.72%
- Complexity 12179 32436 +20257
=============================================
Files 1623 1887 +264
Lines 127399 139985 +12586
Branches 13929 15413 +1484
=============================================
+ Hits 42631 102449 +59818
+ Misses 79158 29440 -49718
- Partials 5610 8096 +2486
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| Optional<BookiesRackConfiguration> optional = | ||
| bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get( | ||
| metaOpTimeout, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Can the zookeeper thread reach here? Just to confirm it will not introduce a deadlock.
There was a problem hiding this comment.
I discuss with hang, and we will use a graceful way to avoid sync operation. I will push a next commit later.
hangc0276
left a comment
There was a problem hiding this comment.
LGTM. Please take a look at the failed tests, thanks.
codelipenghui
left a comment
There was a problem hiding this comment.
I would like to provide another approach for handling the cached rack configuration update.
- Make allGroupsBookieMapping as a
volatilevariable - Update the cached rack configuration async
initialize
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt -> {
opt.ifPresent(bookiesRackConfiguration -> cachedRackConfiguration = bookiesRackConfiguration);
}).exceptionally(ex -> {
log.warn("Failed to load bookies rack configuration while initialize the PlacementPolicy.");
return null;
});getExcludedBookiesWithIsolationGroups
CompletableFuture<Optional<BookiesRackConfiguration>> future =
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
BookiesRackConfiguration allGroupsBookieMapping;
future.thenAccept(opt ->
opt.ifPresent(bookiesRackConfiguration -> cachedRackConfiguration = bookiesRackConfiguration)
).exceptionally(ex -> {
log.warn("The newest bookie rack config is not available now.");
return null;
});
allGroupsBookieMapping = cachedRackConfiguration;
if (allGroupsBookieMapping == null) {
return excludedBookies;
}When the broker restarts, we might lose the rack configurations, but it can avoid any potential deadlocks.
The initialize method is called when starting the broker before the broker service becomes available to the client. So we will not lose the bookie rack configuration in most cases.
| cachedRackConfiguration); | ||
| allGroupsBookieMapping = cachedRackConfiguration; | ||
| } else { | ||
| throw new KeeperException.NoNodeException(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); |
There was a problem hiding this comment.
How bookie client will handle this exception, and what is the expected behavior from the producer/consumer's perspective? Will the producer/consumer be closed by this exception?
There was a problem hiding this comment.
Not, we already caught this exception at line_284
It makes sense, we should print a debug log when the allGroupsBookieMapping is null. |
Yes, sure. |
There is a case that needs discussed, we only update the rack config when the |
@horizonzy We need to delete the cached rack configuration. |
What do you think of it? |
@horizonzy Do checking the exception type work? If the exception is a NoNode exception, update the cached rack configuration to |
It doesn't work. The zk metadata store handles it separately, not throw exceptions. |
| bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); | ||
| bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH) | ||
| .thenAccept(opt -> cachedRackConfiguration = opt.orElse(null)).exceptionally(e -> { | ||
| log.warn("Failed to update the newest bookies rack config."); |
There was a problem hiding this comment.
Change the log msg to log.warn("Failed to update the newest bookies rack config, and use the cached rack configuration: {}", cachedRackConfiguration)
There was a problem hiding this comment.
This is an async operation, it won't affect the result of the current invocation, and printing this log may be misleading.
|
/pulsarbot run-failure-checks |
### Modifications When upgraded the pulsar version from 2.9.2 to 2.10.3, and the isolated group feature not work anymore. Finally, we found the problem. In IsolatedBookieEnsemblePlacementPolicy, when it gets the bookie rack from the metadata store cache, uses future.isDone() to avoid sync operation. If the future is incomplete, return empty blacklists. The cache may expire due to the caffeine cache `getExpireAfterWriteMillis` config, if the cache expires, the future may be incomplete. (#21095 will correct the behavior) In 2.9.2, it uses the sync to get data from the metadata store, we should also keep the behavior.
When upgraded the pulsar version from 2.9.2 to 2.10.3, and the isolated group feature not work anymore. Finally, we found the problem. In IsolatedBookieEnsemblePlacementPolicy, when it gets the bookie rack from the metadata store cache, uses future.isDone() to avoid sync operation. If the future is incomplete, return empty blacklists. The cache may expire due to the caffeine cache `getExpireAfterWriteMillis` config, if the cache expires, the future may be incomplete. (#21095 will correct the behavior) In 2.9.2, it uses the sync to get data from the metadata store, we should also keep the behavior.
|
@horizonzy Can you help create a PR to cherry-pick this change to |
Ok. |
Modifications
When upgraded the pulsar version from 2.9.2 to 2.10.3, and the isolated group feature not work anymore.
Finally, we found the problem. In IsolatedBookieEnsemblePlacementPolicy, when it gets the bookie rack from the metadata store cache, uses future.isDone() to avoid sync operation. If the future is incomplete, return empty blacklists.
The cache may expire due to the caffeine cache
getExpireAfterWriteMillisconfig, if the cache expires, the future may be incomplete. (#21095 will correct the behavior)In 2.9.2, it uses the sync to get data from the metadata store, we should also keep the behavior.
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: