Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,12 @@ private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final G

for (final Long ledgerId : bkActiveledgers) {
try {
// check if the ledger is being replicated already by the replication worker
if (lum.isLedgerBeingReplicated(ledgerId)) {
// check ledger ensembles before creating lock nodes.
// this is to reduce the number of lock node creations and deletions in ZK.
// the ensemble check is done again after the lock node is created.
// also, check if the ledger is being replicated already by the replication worker
Versioned<LedgerMetadata> preCheckMetadata = ledgerManager.readLedgerMetadata(ledgerId).get();
if (!isNotBookieIncludedInLedgerEnsembles(preCheckMetadata) || lum.isLedgerBeingReplicated(ledgerId)) {
latch.countDown();
continue;
}
Expand All @@ -245,23 +249,12 @@ private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final G
.whenComplete((metadata, exception) -> {
try {
if (exception == null) {
// do not delete a ledger that is not closed, since the ensemble might
// change again and include the current bookie while we are deleting it
if (!metadata.getValue().isClosed()) {
return;
if (isNotBookieIncludedInLedgerEnsembles(metadata)) {
// this bookie is not supposed to have this ledger,
// thus we can delete this ledger now
overReplicatedLedgers.add(ledgerId);
garbageCleaner.clean(ledgerId);
}
SortedMap<Long, ? extends List<BookieId>> ensembles =
metadata.getValue().getAllEnsembles();
for (List<BookieId> ensemble : ensembles.values()) {
// check if this bookie is supposed to have this ledger
if (ensemble.contains(selfBookieAddress)) {
return;
}
}
// this bookie is not supposed to have this ledger,
// thus we can delete this ledger now
overReplicatedLedgers.add(ledgerId);
garbageCleaner.clean(ledgerId);
}
} finally {
semaphore.release();
Expand Down Expand Up @@ -302,4 +295,22 @@ private static MetadataBookieDriver instantiateMetadataDriver(ServerConfiguratio
}
}

private boolean isNotBookieIncludedInLedgerEnsembles(Versioned<LedgerMetadata> metadata) {
// do not delete a ledger that is not closed, since the ensemble might
// change again and include the current bookie while we are deleting it
if (!metadata.getValue().isClosed()) {
return false;
}

SortedMap<Long, ? extends List<BookieId>> ensembles =
metadata.getValue().getAllEnsembles();
for (List<BookieId> ensemble : ensembles.values()) {
// check if this bookie is supposed to have this ledger
if (ensemble.contains(selfBookieAddress)) {
return false;
}
}

return true;
}
}