From 8f5f6000f07d067cdef33d9ad1d15f4c4e5bc82a Mon Sep 17 00:00:00 2001 From: shustsud Date: Wed, 6 Oct 2021 08:43:35 +0900 Subject: [PATCH 1/4] Add pre-check to over-replicated ledger GC --- .../ScanAndCompareGarbageCollector.java | 83 +++++++++++++++---- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java index 6f67baec024..23fbc965c7e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java @@ -215,9 +215,17 @@ public void gc(GarbageCleaner garbageCleaner) { private Set removeOverReplicatedledgers(Set bkActiveledgers, final GarbageCleaner garbageCleaner) throws Exception { + // Check ledger ensembles before creating lock nodes. + // This is to reduce the number of lock node creations and deletions in ZK. + // The ensemble check is done again after the lock node is created. + final Set candidateOverReplicatedLedgers = preCheckOverReplicatedLedgers(bkActiveledgers); + if (candidateOverReplicatedLedgers.isEmpty()) { + return candidateOverReplicatedLedgers; + } + final Set overReplicatedLedgers = Sets.newHashSet(); final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_METADATA_REQUESTS); - final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size()); + final CountDownLatch latch = new CountDownLatch(candidateOverReplicatedLedgers.size()); // instantiate zookeeper client to initialize ledger manager @Cleanup @@ -229,7 +237,7 @@ private Set removeOverReplicatedledgers(Set bkActiveledgers, final G @Cleanup LedgerUnderreplicationManager lum = lmf.newLedgerUnderreplicationManager(); - for (final Long ledgerId : bkActiveledgers) { + for (final Long ledgerId : candidateOverReplicatedLedgers) { try { // check if the ledger is being replicated already by the replication worker if (lum.isLedgerBeingReplicated(ledgerId)) { @@ -245,23 +253,12 @@ private Set removeOverReplicatedledgers(Set bkActiveledgers, final G .whenComplete((metadata, exception) -> { try { if (exception == null) { - // do not delete a ledger that is not closed, since the ensemble might - // change again and include the current bookie while we are deleting it - if (!metadata.getValue().isClosed()) { - return; + if (isNotBookieIncludedInLedgerEnsembles(metadata)) { + // this bookie is not supposed to have this ledger, + // thus we can delete this ledger now + overReplicatedLedgers.add(ledgerId); + garbageCleaner.clean(ledgerId); } - SortedMap> ensembles = - metadata.getValue().getAllEnsembles(); - for (List ensemble : ensembles.values()) { - // check if this bookie is supposed to have this ledger - if (ensemble.contains(selfBookieAddress)) { - return; - } - } - // this bookie is not supposed to have this ledger, - // thus we can delete this ledger now - overReplicatedLedgers.add(ledgerId); - garbageCleaner.clean(ledgerId); } } finally { semaphore.release(); @@ -302,4 +299,54 @@ private static MetadataBookieDriver instantiateMetadataDriver(ServerConfiguratio } } + private Set preCheckOverReplicatedLedgers(Set bkActiveLedgers) throws InterruptedException { + final Set candidateOverReplicatedLedgers = Sets.newHashSet(); + final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_METADATA_REQUESTS); + final CountDownLatch latch = new CountDownLatch(bkActiveLedgers.size()); + + for (final Long ledgerId : bkActiveLedgers) { + try { + semaphore.acquire(); + ledgerManager.readLedgerMetadata(ledgerId) + .whenComplete((metadata, exception) -> { + try { + if (exception == null) { + if (isNotBookieIncludedInLedgerEnsembles(metadata)) { + candidateOverReplicatedLedgers.add(ledgerId); + } + } + } finally { + semaphore.release(); + latch.countDown(); + } + }); + } catch (Throwable t) { + LOG.error("Exception when iterating through the ledgers to pre-check for over-replication", t); + latch.countDown(); + } + } + latch.await(); + LOG.info("Finished pre-check over-replicated ledgers. candidateOverReplicatedLedgersSize={}", + candidateOverReplicatedLedgers.size()); + return candidateOverReplicatedLedgers; + } + + private boolean isNotBookieIncludedInLedgerEnsembles(Versioned metadata) { + // do not delete a ledger that is not closed, since the ensemble might + // change again and include the current bookie while we are deleting it + if (!metadata.getValue().isClosed()) { + return false; + } + + SortedMap> ensembles = + metadata.getValue().getAllEnsembles(); + for (List ensemble : ensembles.values()) { + // check if this bookie is supposed to have this ledger + if (ensemble.contains(selfBookieAddress)) { + return false; + } + } + + return true; + } } From c3169007a13695f4461d681513d45302a801999a Mon Sep 17 00:00:00 2001 From: shustsud Date: Thu, 7 Oct 2021 09:27:13 +0900 Subject: [PATCH 2/4] Fixed log --- .../bookkeeper/bookie/ScanAndCompareGarbageCollector.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java index 23fbc965c7e..d86865cf135 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java @@ -326,8 +326,8 @@ private Set preCheckOverReplicatedLedgers(Set bkActiveLedgers) throw } } latch.await(); - LOG.info("Finished pre-check over-replicated ledgers. candidateOverReplicatedLedgersSize={}", - candidateOverReplicatedLedgers.size()); + LOG.info("Finished pre-check over-replicated ledgers. Over-replicated ledgers pre-check count: {}/{}", + candidateOverReplicatedLedgers.size(), bkActiveLedgers.size()); return candidateOverReplicatedLedgers; } From cb3452be17d0edc1f2d8a43affa7e8ef9748feee Mon Sep 17 00:00:00 2001 From: shustsud Date: Mon, 11 Oct 2021 13:18:52 +0900 Subject: [PATCH 3/4] Fixed pre-check --- .../ScanAndCompareGarbageCollector.java | 52 +++---------------- 1 file changed, 8 insertions(+), 44 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java index d86865cf135..0261d0314e5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java @@ -215,17 +215,9 @@ public void gc(GarbageCleaner garbageCleaner) { private Set removeOverReplicatedledgers(Set bkActiveledgers, final GarbageCleaner garbageCleaner) throws Exception { - // Check ledger ensembles before creating lock nodes. - // This is to reduce the number of lock node creations and deletions in ZK. - // The ensemble check is done again after the lock node is created. - final Set candidateOverReplicatedLedgers = preCheckOverReplicatedLedgers(bkActiveledgers); - if (candidateOverReplicatedLedgers.isEmpty()) { - return candidateOverReplicatedLedgers; - } - final Set overReplicatedLedgers = Sets.newHashSet(); final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_METADATA_REQUESTS); - final CountDownLatch latch = new CountDownLatch(candidateOverReplicatedLedgers.size()); + final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size()); // instantiate zookeeper client to initialize ledger manager @Cleanup @@ -237,10 +229,14 @@ private Set removeOverReplicatedledgers(Set bkActiveledgers, final G @Cleanup LedgerUnderreplicationManager lum = lmf.newLedgerUnderreplicationManager(); - for (final Long ledgerId : candidateOverReplicatedLedgers) { + for (final Long ledgerId : bkActiveledgers) { try { - // check if the ledger is being replicated already by the replication worker - if (lum.isLedgerBeingReplicated(ledgerId)) { + // check ledger ensembles before creating lock nodes. + // this is to reduce the number of lock node creations and deletions in ZK. + // the ensemble check is done again after the lock node is created. + // also, check if the ledger is being replicated already by the replication worker + if (!isNotBookieIncludedInLedgerEnsembles(ledgerManager.readLedgerMetadata(ledgerId).get()) + || lum.isLedgerBeingReplicated(ledgerId)) { latch.countDown(); continue; } @@ -299,38 +295,6 @@ private static MetadataBookieDriver instantiateMetadataDriver(ServerConfiguratio } } - private Set preCheckOverReplicatedLedgers(Set bkActiveLedgers) throws InterruptedException { - final Set candidateOverReplicatedLedgers = Sets.newHashSet(); - final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_METADATA_REQUESTS); - final CountDownLatch latch = new CountDownLatch(bkActiveLedgers.size()); - - for (final Long ledgerId : bkActiveLedgers) { - try { - semaphore.acquire(); - ledgerManager.readLedgerMetadata(ledgerId) - .whenComplete((metadata, exception) -> { - try { - if (exception == null) { - if (isNotBookieIncludedInLedgerEnsembles(metadata)) { - candidateOverReplicatedLedgers.add(ledgerId); - } - } - } finally { - semaphore.release(); - latch.countDown(); - } - }); - } catch (Throwable t) { - LOG.error("Exception when iterating through the ledgers to pre-check for over-replication", t); - latch.countDown(); - } - } - latch.await(); - LOG.info("Finished pre-check over-replicated ledgers. Over-replicated ledgers pre-check count: {}/{}", - candidateOverReplicatedLedgers.size(), bkActiveLedgers.size()); - return candidateOverReplicatedLedgers; - } - private boolean isNotBookieIncludedInLedgerEnsembles(Versioned metadata) { // do not delete a ledger that is not closed, since the ensemble might // change again and include the current bookie while we are deleting it From 728fbba98412dea4f52f9281f0dfa00611b27cf3 Mon Sep 17 00:00:00 2001 From: shustsud Date: Mon, 18 Oct 2021 16:51:06 +0900 Subject: [PATCH 4/4] Moved readLedgerMetadata --- .../bookkeeper/bookie/ScanAndCompareGarbageCollector.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java index 0261d0314e5..4c778a13b35 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java @@ -235,8 +235,8 @@ private Set removeOverReplicatedledgers(Set bkActiveledgers, final G // this is to reduce the number of lock node creations and deletions in ZK. // the ensemble check is done again after the lock node is created. // also, check if the ledger is being replicated already by the replication worker - if (!isNotBookieIncludedInLedgerEnsembles(ledgerManager.readLedgerMetadata(ledgerId).get()) - || lum.isLedgerBeingReplicated(ledgerId)) { + Versioned preCheckMetadata = ledgerManager.readLedgerMetadata(ledgerId).get(); + if (!isNotBookieIncludedInLedgerEnsembles(preCheckMetadata) || lum.isLedgerBeingReplicated(ledgerId)) { latch.countDown(); continue; }