diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java index ac468d31563..c24f9208023 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java @@ -236,6 +236,15 @@ boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) */ long getReplicasCheckCTime() throws ReplicationException.UnavailableException; + /** + * Receive notification asynchronously when the num of under-replicated ledgers Changed. + * + * @param cb + * @throws ReplicationException.UnavailableException + */ + void notifyUnderReplicationLedgerChanged(GenericCallback cb) + throws ReplicationException.UnavailableException; + /** * Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java index 3a4247f6c61..4a1e6e419fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java @@ -394,5 +394,7 @@ public String getReplicationWorkerIdRereplicatingLedger(long ledgerId) throws ReplicationException.UnavailableException { throw new ReplicationException.UnavailableException("null"); } + @Override + public void notifyUnderReplicationLedgerChanged(GenericCallback cb) {} } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index b340e0721cb..9dcc81c1622 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -59,6 +59,7 @@ import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.SubTreeCache; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -866,6 +867,28 @@ public int getLostBookieRecoveryDelay() throws UnavailableException { } } + @Override + public void notifyUnderReplicationLedgerChanged(GenericCallback cb) throws UnavailableException { + LOG.debug("notifyUnderReplicationLedgerChanged()"); + Watcher w = new Watcher() { + @Override + public void process(WatchedEvent e) { + if (e.getType() == Event.EventType.NodeDeleted && idExtractionPattern.matcher(e.getPath()).find()) { + cb.operationComplete(0, null); + } + } + }; + try { + zkc.addWatch(urLedgerPath, w, AddWatchMode.PERSISTENT_RECURSIVE); + } catch (KeeperException ke) { + LOG.error("Error while checking the state of underReplicated ledgers", ke); + throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); + } + } + @Override public void notifyLostBookieRecoveryDelayChanged(GenericCallback cb) throws UnavailableException { LOG.debug("notifyLostBookieRecoveryDelayChanged()"); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 8c69812ff94..81c75f68dd4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -34,8 +34,10 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY; +import static org.apache.bookkeeper.replication.ReplicationStats.NUM_REPLICATED_LEDGERS; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS; +import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS_GUAGE; import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE; @@ -45,6 +47,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.HashMultiset; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; @@ -163,6 +166,7 @@ public class Auditor implements AutoCloseable { private final AtomicInteger numLedgersFoundHavingLessThanAQReplicasOfAnEntry; private final AtomicInteger numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue; private final AtomicInteger numLedgersFoundHavingLessThanWQReplicasOfAnEntry; + private final AtomicInteger underReplicatedLedgersGuageValue; private final long underreplicatedLedgerRecoveryGracePeriod; private final int zkOpTimeoutMs; private final Semaphore openLedgerNoRecoverySemaphore; @@ -235,6 +239,11 @@ public class Auditor implements AutoCloseable { help = "the number of delayed-bookie-audits cancelled" ) private final Counter numDelayedBookieAuditsCancelled; + @StatsDoc( + name = NUM_REPLICATED_LEDGERS, + help = "the number of replicated ledgers" + ) + private final Counter numReplicatedLedgers; @StatsDoc( name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY, help = "Gauge for number of ledgers not adhering to placement policy found in placement policy check" @@ -267,6 +276,11 @@ public class Auditor implements AutoCloseable { + ", this doesn't include ledgers counted towards numLedgersHavingLessThanAQReplicasOfAnEntry" ) private final Gauge numLedgersHavingLessThanWQReplicasOfAnEntry; + @StatsDoc( + name = NUM_UNDER_REPLICATED_LEDGERS_GUAGE, + help = "Gauge for num of underreplicated ledgers" + ) + private final Gauge numUnderReplicatedLedgers; static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException { return createBookKeeperClient(conf, NullStatsLogger.INSTANCE); @@ -364,6 +378,7 @@ public Auditor(final String bookieIdentifier, this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec = conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec(); + this.underReplicatedLedgersGuageValue = new AtomicInteger(0); numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS); underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE); uRLPublishTimeForLostBookies = this.statsLogger @@ -380,6 +395,7 @@ public Auditor(final String bookieIdentifier, numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED); numDelayedBookieAuditsCancelled = this.statsLogger .getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED); + numReplicatedLedgers = this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS); numLedgersNotAdheringToPlacementPolicy = new Gauge() { @Override public Integer getDefaultValue() { @@ -460,6 +476,18 @@ public Integer getSample() { }; this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY, numLedgersHavingLessThanWQReplicasOfAnEntry); + numUnderReplicatedLedgers = new Gauge() { + @Override + public Integer getDefaultValue() { + return 0; + } + + @Override + public Integer getSample() { + return underReplicatedLedgersGuageValue.get(); + } + }; + this.statsLogger.registerGauge(NUM_UNDER_REPLICATED_LEDGERS_GUAGE, numUnderReplicatedLedgers); this.bkc = bkc; this.ownBkc = ownBkc; @@ -707,6 +735,15 @@ public void start() { submitShutdownTask(); } + try { + this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged( + new UnderReplicatedLedgersChangedCb()); + } catch (UnavailableException ue) { + LOG.error("Exception while registering for under-replicated ledgers change notification, so exiting", + ue); + submitShutdownTask(); + } + scheduleBookieCheckTask(); scheduleCheckAllLedgersTask(); schedulePlacementPolicyCheckTask(); @@ -1014,6 +1051,16 @@ public void run() { }), initialDelay, interval, TimeUnit.SECONDS); } + private class UnderReplicatedLedgersChangedCb implements GenericCallback { + @Override + public void operationComplete(int rc, Void result) { + Iterator underreplicatedLedgersInfo = ledgerUnderreplicationManager + .listLedgersToRereplicate(null); + underReplicatedLedgersGuageValue.set(Iterators.size(underreplicatedLedgersInfo)); + numReplicatedLedgers.inc(); + } + } + private class LostBookieRecoveryDelayChangedCb implements GenericCallback { @Override public void operationComplete(int rc, Void result) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java index d04fcf04279..74b76b23b22 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java @@ -64,4 +64,6 @@ public interface ReplicationStats { String REPLICATE_EXCEPTION = "exceptions"; String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER"; String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION"; + String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE"; + String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS"; }