Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay)
*/
long getReplicasCheckCTime() throws ReplicationException.UnavailableException;

/**
* Receive notification asynchronously when the num of under-replicated ledgers Changed.
*
* @param cb
* @throws ReplicationException.UnavailableException
*/
void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb)
throws ReplicationException.UnavailableException;

/**
* Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,5 +394,7 @@ public String getReplicationWorkerIdRereplicatingLedger(long ledgerId)
throws ReplicationException.UnavailableException {
throw new ReplicationException.UnavailableException("null");
}
@Override
public void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.SubTreeCache;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -866,6 +867,28 @@ public int getLostBookieRecoveryDelay() throws UnavailableException {
}
}

@Override
public void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb) throws UnavailableException {
LOG.debug("notifyUnderReplicationLedgerChanged()");
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent e) {
if (e.getType() == Event.EventType.NodeDeleted && idExtractionPattern.matcher(e.getPath()).find()) {
cb.operationComplete(0, null);
}
}
};
try {
zkc.addWatch(urLedgerPath, w, AddWatchMode.PERSISTENT_RECURSIVE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will register a lot of watcher on Zookeeper

} catch (KeeperException ke) {
LOG.error("Error while checking the state of underReplicated ledgers", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
}
}

@Override
public void notifyLostBookieRecoveryDelayChanged(GenericCallback<Void> cb) throws UnavailableException {
LOG.debug("notifyLostBookieRecoveryDelayChanged()");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_REPLICATED_LEDGERS;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS_GUAGE;
import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE;
Expand All @@ -45,6 +47,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -163,6 +166,7 @@ public class Auditor implements AutoCloseable {
private final AtomicInteger numLedgersFoundHavingLessThanAQReplicasOfAnEntry;
private final AtomicInteger numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue;
private final AtomicInteger numLedgersFoundHavingLessThanWQReplicasOfAnEntry;
private final AtomicInteger underReplicatedLedgersGuageValue;
private final long underreplicatedLedgerRecoveryGracePeriod;
private final int zkOpTimeoutMs;
private final Semaphore openLedgerNoRecoverySemaphore;
Expand Down Expand Up @@ -235,6 +239,11 @@ public class Auditor implements AutoCloseable {
help = "the number of delayed-bookie-audits cancelled"
)
private final Counter numDelayedBookieAuditsCancelled;
@StatsDoc(
name = NUM_REPLICATED_LEDGERS,
help = "the number of replicated ledgers"
)
private final Counter numReplicatedLedgers;
@StatsDoc(
name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
help = "Gauge for number of ledgers not adhering to placement policy found in placement policy check"
Expand Down Expand Up @@ -267,6 +276,11 @@ public class Auditor implements AutoCloseable {
+ ", this doesn't include ledgers counted towards numLedgersHavingLessThanAQReplicasOfAnEntry"
)
private final Gauge<Integer> numLedgersHavingLessThanWQReplicasOfAnEntry;
@StatsDoc(
name = NUM_UNDER_REPLICATED_LEDGERS_GUAGE,
help = "Gauge for num of underreplicated ledgers"
)
private final Gauge<Integer> numUnderReplicatedLedgers;

static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
Expand Down Expand Up @@ -364,6 +378,7 @@ public Auditor(final String bookieIdentifier,
this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();

this.underReplicatedLedgersGuageValue = new AtomicInteger(0);
numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
uRLPublishTimeForLostBookies = this.statsLogger
Expand All @@ -380,6 +395,7 @@ public Auditor(final String bookieIdentifier,
numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
numDelayedBookieAuditsCancelled = this.statsLogger
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
numReplicatedLedgers = this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS);
numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
Expand Down Expand Up @@ -460,6 +476,18 @@ public Integer getSample() {
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
numLedgersHavingLessThanWQReplicasOfAnEntry);
numUnderReplicatedLedgers = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}

@Override
public Integer getSample() {
return underReplicatedLedgersGuageValue.get();
}
};
this.statsLogger.registerGauge(NUM_UNDER_REPLICATED_LEDGERS_GUAGE, numUnderReplicatedLedgers);

this.bkc = bkc;
this.ownBkc = ownBkc;
Expand Down Expand Up @@ -707,6 +735,15 @@ public void start() {
submitShutdownTask();
}

try {
this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(
new UnderReplicatedLedgersChangedCb());
} catch (UnavailableException ue) {
LOG.error("Exception while registering for under-replicated ledgers change notification, so exiting",
ue);
submitShutdownTask();
}

scheduleBookieCheckTask();
scheduleCheckAllLedgersTask();
schedulePlacementPolicyCheckTask();
Expand Down Expand Up @@ -1014,6 +1051,16 @@ public void run() {
}), initialDelay, interval, TimeUnit.SECONDS);
}

private class UnderReplicatedLedgersChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
Iterator<UnderreplicatedLedger> underreplicatedLedgersInfo = ledgerUnderreplicationManager
.listLedgersToRereplicate(null);
underReplicatedLedgersGuageValue.set(Iterators.size(underreplicatedLedgersInfo));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Iterators.size will iterate the whole Znodes in an under-replicated path which brings heavy pressure on Zookeeper and lead to dead lock.

numReplicatedLedgers.inc();
}
}

private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ public interface ReplicationStats {
String REPLICATE_EXCEPTION = "exceptions";
String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE";
String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS";
}