From 03d2d82d050a63cc852e28b92e0d07fbd88384e2 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 14 Sep 2020 11:13:14 +0100 Subject: [PATCH 1/4] HBASE-24877 Add option to avoid aborting RS process upon uncaught exceptions happen on replication source --- .../regionserver/ReplicationSource.java | 123 +++++++++++++----- .../ReplicationSourceShipper.java | 3 +- .../regionserver/TestReplicationSource.java | 94 +++++++++++++ 3 files changed, 190 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c495376c87f0..f87fca07ee72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -35,8 +35,10 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -122,6 +124,17 @@ public class ReplicationSource implements ReplicationSourceInterface { // ReplicationEndpoint which will handle the actual replication private volatile ReplicationEndpoint replicationEndpoint; + private boolean abortOnError; + //This is needed for the startup loop to identify when there's already + //an initialization happening (but not finished yet), + //so that it doesn't try submit another initialize thread. + //NOTE: this should only be set to false at the end of initialize method, prior to return. + private AtomicBoolean startupOngoing = new AtomicBoolean(false); + //Flag that signalizes uncaught error happening while starting up the source + //and a retry should be attempted + private AtomicBoolean retryStartup = new AtomicBoolean(false); + + /** * A filter (or a chain of filters) for WAL entries; filters out edits. */ @@ -132,6 +145,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private long defaultBandwidth; private long currentBandwidth; private WALFileLengthProvider walFileLengthProvider; + @VisibleForTesting protected final ConcurrentHashMap workerThreads = new ConcurrentHashMap<>(); @@ -220,6 +234,10 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; + + this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", + true); + LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -241,6 +259,9 @@ public void enqueueLog(Path wal) { PriorityBlockingQueue queue = queues.get(logPrefix); if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); + // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise + // the shipper may quit immediately + queue.put(wal); queues.put(logPrefix, queue); if (this.isSourceActive() && this.walEntryFilter != null) { // new wal group observed after source startup, start a new worker thread to track it @@ -248,8 +269,9 @@ public void enqueueLog(Path wal) { // still not launched, so it's necessary to check workerThreads before start the worker tryStartNewShipper(logPrefix, queue); } + } else { + queue.put(wal); } - queue.put(wal); if (LOG.isTraceEnabled()) { LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix, this.replicationQueueInfo.getQueueId()); @@ -354,24 +376,30 @@ private void initializeWALEntryFilter(UUID peerClusterId) { } private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { - ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); - ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); - if (extant != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(), - walGroupId); - } - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId); + workerThreads.compute(walGroupId, (key, value) -> { + if (value != null) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "{} Someone has beat us to start a worker thread for wal group {}", + logPeerId(), key); + } + return value; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key); + } + ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); + ReplicationSourceWALReader walReader = + createNewWALReader(walGroupId, queue, worker.getStartPosition()); + Threads.setDaemonThreadRunning( + walReader, Thread.currentThread().getName() + + ".replicationSource.wal-reader." + walGroupId + "," + queueId, + (t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + worker.setWALReader(walReader); + worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + return worker; } - ReplicationSourceWALReader walReader = - createNewWALReader(walGroupId, queue, worker.getStartPosition()); - Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + - ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException); - worker.setWALReader(walReader); - worker.startup(this::uncaughtException); - } + }); } @Override @@ -443,11 +471,28 @@ WALEntryFilter getWalEntryFilter() { return walEntryFilter; } - protected final void uncaughtException(Thread t, Throwable e) { + protected final void uncaughtException(Thread t, Throwable e, + ReplicationSourceManager manager, String peerId) { RSRpcServices.exitIfOOME(e); LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e); - server.abort("Unexpected exception in " + t.getName(), e); + if(abortOnError){ + server.abort("Unexpected exception in " + t.getName(), e); + } + if(manager != null){ + while (true) { + try { + LOG.info("Refreshing replication sources now due to previous error on thread: {}", + t.getName()); + manager.refreshSources(peerId); + break; + } catch (IOException e1) { + LOG.error("Replication sources refresh failed.", e1); + sleepForRetries("Sleeping before try refreshing sources again", + maxRetriesMultiplier); + } + } + } } @Override @@ -546,12 +591,18 @@ private void initialize() { replicationEndpoint.stop(); if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { sleepMultiplier++; + } else { + retryStartup.set(!this.abortOnError); + this.startupOngoing.set(false); + throw new RuntimeException("Exhausted retries to start replication endpoint."); } } } if (!this.isSourceActive()) { - return; + retryStartup.set(!this.abortOnError); + this.startupOngoing.set(false); + throw new IllegalStateException("Source should be active."); } sleepMultiplier = 1; @@ -572,8 +623,10 @@ private void initialize() { } } - if (!this.isSourceActive()) { - return; + if(!this.isSourceActive()) { + retryStartup.set(!this.abortOnError); + this.startupOngoing.set(false); + throw new IllegalStateException("Source should be active."); } LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); @@ -585,16 +638,28 @@ private void initialize() { PriorityBlockingQueue queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } + this.startupOngoing.set(false); } @Override public void startup() { - // mark we are running now - this.sourceRunning = true; - initThread = new Thread(this::initialize); - Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, - this::uncaughtException); + retryStartup.set(true); + do { + if(retryStartup.get()) { + this.sourceRunning = true; + startupOngoing.set(true); + retryStartup.set(false); + // mark we are running now + initThread = new Thread(this::initialize); + Threads.setDaemonThreadRunning(initThread, + Thread.currentThread().getName() + ".replicationSource," + this.queueId, + (t,e) -> { + sourceRunning = false; + uncaughtException(t, e, null, null); + retryStartup.set(!this.abortOnError); + }); + } + } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 92646d2cac45..b171eb4d78df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -280,7 +280,8 @@ private boolean updateLogPosition(WALEntryBatch batch) { public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(this, - name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); + name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), + handler::uncaughtException); } Path getCurrentPath() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 15f202f06467..a31cd8341907 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -396,6 +397,26 @@ protected void doStop() { } } + /** + * Deadend Endpoint. Does nothing. + */ + public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + + static int count = 0; + + @Override + public synchronized UUID getPeerUUID() { + LOG.info(">>>>>>>> " + count); + if(count==0) { + count++; + throw new RuntimeException(); + } else { + return super.getPeerUUID(); + } + } + + } + /** * Test HBASE-20497 * Moved here from TestReplicationSource because doesn't need cluster. @@ -423,5 +444,78 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage); assertEquals(1001L, shipper.getStartPosition()); } + + /** + * Test ReplicationSource retries startup once an uncaught exception happens + * during initialization and eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnError() throws IOException { + ReplicationSource rs = new ReplicationSource(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt("replication.source.maxretriesmultiplier", 1); + conf.setBoolean("replication.source.regionserver.abort", false); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + FaultyReplicationEndpoint.count = 0; + Mockito.when(peerConfig.getReplicationEndpointImpl()). + thenReturn(FaultyReplicationEndpoint.class.getName()); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + String queueId = "qid"; + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + p -> OptionalLong.empty(), new MetricsSource(queueId)); + try { + rs.startup(); + assertTrue(rs.isSourceActive()); + assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); + rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); + assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); + rs.enqueueLog(new Path("a.1")); + assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + + /** + * Test ReplicationSource retries startup once an uncaught exception happens + * during initialization and replication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortTrueOnError() throws IOException { + ReplicationSource rs = new ReplicationSource(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt("replication.source.maxretriesmultiplier", 1); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + FaultyReplicationEndpoint.count = 0; + Mockito.when(peerConfig.getReplicationEndpointImpl()). + thenReturn(FaultyReplicationEndpoint.class.getName()); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + String queueId = "qid"; + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + p -> OptionalLong.empty(), new MetricsSource(queueId)); + try { + rs.startup(); + Waiter.waitFor(conf, 1000, () -> rss.isAborted()); + assertFalse(rs.isSourceActive()); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } } From a3b302cf6a2ab857f947ef7dcea7e59da91c4ec9 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 15 Sep 2020 10:43:10 +0100 Subject: [PATCH 2/4] removing extra unnecessary logging --- .../hbase/replication/regionserver/TestReplicationSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index a31cd8341907..d790c5f612b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -406,7 +406,6 @@ public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoi @Override public synchronized UUID getPeerUUID() { - LOG.info(">>>>>>>> " + count); if(count==0) { count++; throw new RuntimeException(); From 1db06dd50f9b269d7616a4070831bd4546008cf1 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Tue, 29 Sep 2020 10:00:57 +0100 Subject: [PATCH 3/4] HBASE-24877 addendum: additional checks to avoid one extra possible race control in the initialize loop (#2400) Signed-off-by: Duo Zhang Signed-off-by: Josh Elser --- .../regionserver/ReplicationSource.java | 46 ++++-- .../regionserver/TestReplicationSource.java | 155 ++++++++++-------- 2 files changed, 119 insertions(+), 82 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f87fca07ee72..f74bc056bf9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -134,7 +134,6 @@ public class ReplicationSource implements ReplicationSourceInterface { //and a retry should be attempted private AtomicBoolean retryStartup = new AtomicBoolean(false); - /** * A filter (or a chain of filters) for WAL entries; filters out edits. */ @@ -643,23 +642,34 @@ private void initialize() { @Override public void startup() { - retryStartup.set(true); - do { - if(retryStartup.get()) { - this.sourceRunning = true; - startupOngoing.set(true); - retryStartup.set(false); - // mark we are running now - initThread = new Thread(this::initialize); - Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, - (t,e) -> { - sourceRunning = false; - uncaughtException(t, e, null, null); - retryStartup.set(!this.abortOnError); - }); - } - } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); + // mark we are running now + this.sourceRunning = true; + startupOngoing.set(true); + initThread = new Thread(this::initialize); + Threads.setDaemonThreadRunning(initThread, + Thread.currentThread().getName() + ".replicationSource," + this.queueId, + (t,e) -> { + //if first initialization attempt failed, and abortOnError is false, we will + //keep looping in this thread until initialize eventually succeeds, + //while the server main startup one can go on with its work. + sourceRunning = false; + uncaughtException(t, e, null, null); + retryStartup.set(!this.abortOnError); + do { + if(retryStartup.get()) { + this.sourceRunning = true; + startupOngoing.set(true); + retryStartup.set(false); + try { + initialize(); + } catch(Throwable error){ + sourceRunning = false; + uncaughtException(t, error, null, null); + retryStartup.set(!this.abortOnError); + } + } + } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); + }); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index d790c5f612b4..83b06124f922 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -22,7 +22,11 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; +import java.util.ArrayList; import java.util.OptionalLong; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -119,15 +123,15 @@ public void testDefaultSkipsMetaWAL() throws IOException { ReplicationSource rs = new ReplicationSource(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); - Mockito.when(peerConfig.getReplicationEndpointImpl()). + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getConfiguration()).thenReturn(conf); + when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + when(peerConfig.getReplicationEndpointImpl()). thenReturn(DoNothingReplicationEndpoint.class.getName()); - Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); @@ -157,15 +161,15 @@ public void testWALEntryFilter() throws IOException { ReplicationSource rs = new ReplicationSource(); UUID uuid = UUID.randomUUID(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); - Mockito.when(peerConfig.getReplicationEndpointImpl()). + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getConfiguration()).thenReturn(conf); + when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + when(peerConfig.getReplicationEndpointImpl()). thenReturn(DoNothingReplicationEndpoint.class.getName()); - Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); @@ -251,12 +255,12 @@ public void testTerminateTimeout() throws Exception { replicationEndpoint = new DoNothingReplicationEndpoint(); try { replicationEndpoint.start(); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -400,7 +404,7 @@ protected void doStop() { /** * Deadend Endpoint. Does nothing. */ - public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { static int count = 0; @@ -416,6 +420,17 @@ public synchronized UUID getPeerUUID() { } + public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + + static int count = 0; + + @Override + public synchronized UUID getPeerUUID() { + throw new RuntimeException(); + } + + } + /** * Test HBASE-20497 * Moved here from TestReplicationSource because doesn't need cluster. @@ -427,15 +442,15 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); queue.put(new Path("/www/html/test")); - RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); - Server server = Mockito.mock(Server.class); - Mockito.when(server.getServerName()).thenReturn(serverName); - Mockito.when(source.getServer()).thenReturn(server); - Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); - ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); - Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) + RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); + Server server = mock(Server.class); + when(server.getServerName()).thenReturn(serverName); + when(source.getServer()).thenReturn(server); + when(source.getServerWALsBelongTo()).thenReturn(deadServer); + ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); + when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) .thenReturn(1001L); - Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) + when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) .thenReturn(-1L); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt("replication.source.maxretriesmultiplier", -1); @@ -444,31 +459,38 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception assertEquals(1001L, shipper.getStartPosition()); } + private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, + String endpointName) throws IOException { + conf.setInt("replication.source.maxretriesmultiplier", 1); + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getConfiguration()).thenReturn(conf); + when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + FaultyReplicationEndpoint.count = 0; + when(peerConfig.getReplicationEndpointImpl()). + thenReturn(endpointName); + when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + String queueId = "qid"; + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + p -> OptionalLong.empty(), new MetricsSource(queueId)); + return rss; + } + /** * Test ReplicationSource retries startup once an uncaught exception happens * during initialization and eplication.source.regionserver.abort is set to false. */ @Test public void testAbortFalseOnError() throws IOException { - ReplicationSource rs = new ReplicationSource(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); - conf.setInt("replication.source.maxretriesmultiplier", 1); conf.setBoolean("replication.source.regionserver.abort", false); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); - FaultyReplicationEndpoint.count = 0; - Mockito.when(peerConfig.getReplicationEndpointImpl()). - thenReturn(FaultyReplicationEndpoint.class.getName()); - Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - String queueId = "qid"; - RegionServerServices rss = - TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, - p -> OptionalLong.empty(), new MetricsSource(queueId)); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FlakyReplicationEndpoint.class.getName()); try { rs.startup(); assertTrue(rs.isSourceActive()); @@ -483,30 +505,35 @@ public void testAbortFalseOnError() throws IOException { } } + /** + * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, + * when eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FaultyReplicationEndpoint.class.getName()); + try { + rs.startup(); + assertTrue(true); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + /** * Test ReplicationSource retries startup once an uncaught exception happens - * during initialization and replication.source.regionserver.abort is set to false. + * during initialization and replication.source.regionserver.abort is set to true. */ @Test public void testAbortTrueOnError() throws IOException { - ReplicationSource rs = new ReplicationSource(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); - conf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); - FaultyReplicationEndpoint.count = 0; - Mockito.when(peerConfig.getReplicationEndpointImpl()). - thenReturn(FaultyReplicationEndpoint.class.getName()); - Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - String queueId = "qid"; - RegionServerServices rss = - TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, - p -> OptionalLong.empty(), new MetricsSource(queueId)); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FlakyReplicationEndpoint.class.getName()); try { rs.startup(); Waiter.waitFor(conf, 1000, () -> rss.isAborted()); From 65d958640a7ac5740996c74897244e54e8f08630 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 12 Oct 2020 14:36:53 +0100 Subject: [PATCH 4/4] resolved compilation errors from last merge --- .../regionserver/TestReplicationSource.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 6ccca96d90b0..275ade22a718 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -22,6 +22,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.OptionalLong; import java.util.UUID; @@ -438,13 +441,13 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); queue.put(new Path("/www/html/test")); - RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); - Server server = Mockito.mock(Server.class); - Mockito.when(server.getServerName()).thenReturn(serverName); - Mockito.when(source.getServer()).thenReturn(server); - Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); - ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); - Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) + RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); + Server server = mock(Server.class); + when(server.getServerName()).thenReturn(serverName); + when(source.getServer()).thenReturn(server); + when(source.getServerWALsBelongTo()).thenReturn(deadServer); + ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); + when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) .thenReturn(1001L); when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) .thenReturn(-1L);