diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java index 39d1d740c61..0f2b2c009a1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java @@ -185,6 +185,11 @@ public Auditor getAuditor() { return auditorElector.getAuditor(); } + @VisibleForTesting + public ReplicationWorker getReplicationWorker() { + return replicationWorker; + } + /** Is auto-recovery service running? */ public boolean isAutoRecoveryRunning() { return running; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 6509cdd7996..d897e77ea7c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -25,6 +25,8 @@ import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE; import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -244,6 +246,11 @@ public void run() { LOG.error("UnavailableException " + "while replicating fragments", e); waitBackOffTime(rwRereplicateBackoffMs); + if (Thread.currentThread().isInterrupted()) { + LOG.error("Interrupted while replicating fragments"); + shutdown(); + return; + } } } LOG.info("ReplicationWorker exited loop!"); @@ -646,7 +653,8 @@ public void shutdown() { /** * Gives the running status of ReplicationWorker. */ - boolean isRunning() { + @VisibleForTesting + public boolean isRunning() { return workerRunning && workerThread.isAlive(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 05937fe43e9..a8e89dcf0cf 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -403,8 +403,12 @@ public void testEmptyLedgerLosesQuorumEventually() throws Exception { assertTrue("Should be marked as underreplicated", latch.await(5, TimeUnit.SECONDS)); latch = new CountDownLatch(1); s = watchUrLedgerNode(urZNode, latch); // should be marked as replicated + + startNewBookie(); + getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs + if (s != null) { - assertTrue("Should be marked as replicated", latch.await(5, TimeUnit.SECONDS)); + assertTrue("Should be marked as replicated", latch.await(20, TimeUnit.SECONDS)); } // should be able to open ledger without issue diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index aa4b510809b..1b14f172550 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -73,6 +73,7 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.Auditor; import org.apache.bookkeeper.replication.AutoRecoveryMain; +import org.apache.bookkeeper.replication.ReplicationWorker; import org.apache.bookkeeper.server.Main; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.DiskChecker; @@ -426,6 +427,11 @@ public ServerConfiguration getBkConf(BookieId addr) throws Exception { public ServerConfiguration killBookie(BookieId addr) throws Exception { Optional tester = byAddress(addr); if (tester.isPresent()) { + if (tester.get().autoRecovery != null + && tester.get().autoRecovery.getAuditor() != null + && tester.get().autoRecovery.getAuditor().isRunning()) { + LOG.warn("Killing bookie {} who is the current Auditor", addr); + } servers.remove(tester.get()); tester.get().shutdown(); return tester.get().getConfiguration(); @@ -763,7 +769,20 @@ public Auditor getAuditor(int timeout, TimeUnit unit) throws Exception { while (System.nanoTime() < timeoutAt) { for (ServerTester t : servers) { Auditor a = t.getAuditor(); - if (a != null) { + ReplicationWorker replicationWorker = t.getReplicationWorker(); + + // found a candidate Auditor + ReplicationWorker + if (a != null && a.isRunning() + && replicationWorker != null && replicationWorker.isRunning()) { + int deathWatchInterval = t.getConfiguration().getDeathWatchInterval(); + Thread.sleep(deathWatchInterval + 1000); + } + + // double check, because in the meantime AutoRecoveryDeathWatcher may have killed the + // AutoRecovery daemon + if (a != null && a.isRunning() + && replicationWorker != null && replicationWorker.isRunning()) { + LOG.info("Found Auditor Bookie {}", t.server.getBookieId()); return a; } } @@ -899,6 +918,14 @@ Auditor getAuditor() { } } + ReplicationWorker getReplicationWorker() { + if (autoRecovery != null) { + return autoRecovery.getReplicationWorker(); + } else { + return null; + } + } + ServerConfiguration getConfiguration() { return conf; }