Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public Auditor getAuditor() {
return auditorElector.getAuditor();
}

@VisibleForTesting
public ReplicationWorker getReplicationWorker() {
return replicationWorker;
}

/** Is auto-recovery service running? */
public boolean isAutoRecoveryRunning() {
return running;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
Expand Down Expand Up @@ -244,6 +246,11 @@ public void run() {
LOG.error("UnavailableException "
+ "while replicating fragments", e);
waitBackOffTime(rwRereplicateBackoffMs);
if (Thread.currentThread().isInterrupted()) {
LOG.error("Interrupted while replicating fragments");
shutdown();
return;
}
}
}
LOG.info("ReplicationWorker exited loop!");
Expand Down Expand Up @@ -646,7 +653,8 @@ public void shutdown() {
/**
* Gives the running status of ReplicationWorker.
*/
boolean isRunning() {
@VisibleForTesting
public boolean isRunning() {
return workerRunning && workerThread.isAlive();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,12 @@ public void testEmptyLedgerLosesQuorumEventually() throws Exception {
assertTrue("Should be marked as underreplicated", latch.await(5, TimeUnit.SECONDS));
latch = new CountDownLatch(1);
s = watchUrLedgerNode(urZNode, latch); // should be marked as replicated

startNewBookie();
getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs

if (s != null) {
assertTrue("Should be marked as replicated", latch.await(5, TimeUnit.SECONDS));
assertTrue("Should be marked as replicated", latch.await(20, TimeUnit.SECONDS));
}

// should be able to open ledger without issue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.server.Main;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
Expand Down Expand Up @@ -426,6 +427,11 @@ public ServerConfiguration getBkConf(BookieId addr) throws Exception {
public ServerConfiguration killBookie(BookieId addr) throws Exception {
Optional<ServerTester> tester = byAddress(addr);
if (tester.isPresent()) {
if (tester.get().autoRecovery != null
&& tester.get().autoRecovery.getAuditor() != null
&& tester.get().autoRecovery.getAuditor().isRunning()) {
LOG.warn("Killing bookie {} who is the current Auditor", addr);
}
servers.remove(tester.get());
tester.get().shutdown();
return tester.get().getConfiguration();
Expand Down Expand Up @@ -763,7 +769,20 @@ public Auditor getAuditor(int timeout, TimeUnit unit) throws Exception {
while (System.nanoTime() < timeoutAt) {
for (ServerTester t : servers) {
Auditor a = t.getAuditor();
if (a != null) {
ReplicationWorker replicationWorker = t.getReplicationWorker();

// found a candidate Auditor + ReplicationWorker
if (a != null && a.isRunning()
&& replicationWorker != null && replicationWorker.isRunning()) {
int deathWatchInterval = t.getConfiguration().getDeathWatchInterval();
Thread.sleep(deathWatchInterval + 1000);
}

// double check, because in the meantime AutoRecoveryDeathWatcher may have killed the
// AutoRecovery daemon
if (a != null && a.isRunning()
&& replicationWorker != null && replicationWorker.isRunning()) {
LOG.info("Found Auditor Bookie {}", t.server.getBookieId());
return a;
}
}
Expand Down Expand Up @@ -899,6 +918,14 @@ Auditor getAuditor() {
}
}

ReplicationWorker getReplicationWorker() {
if (autoRecovery != null) {
return autoRecovery.getReplicationWorker();
} else {
return null;
}
}

ServerConfiguration getConfiguration() {
return conf;
}
Expand Down