diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index d4cc1e349952..0cc9f9fdec6e 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -19,13 +19,14 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class MetricsReplicationGlobalSourceSourceImpl - implements MetricsReplicationGlobalSourceSource { + implements MetricsReplicationGlobalSourceSource { private static final String KEY_PREFIX = "source."; private final MetricsReplicationSourceImpl rms; @@ -49,6 +50,7 @@ public class MetricsReplicationGlobalSourceSourceImpl private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue; private final MutableGaugeLong walReaderBufferUsageBytes; + private final MutableGaugeInt sourceInitializing; public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { this.rms = rms; @@ -89,6 +91,7 @@ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms walReaderBufferUsageBytes = rms.getMetricsRegistry() .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); + sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0); } @Override public void setLastShippedAge(long age) { @@ -198,6 +201,21 @@ public long getOldestWalAge() { return 0; } + @Override + public void incrSourceInitializing() { + sourceInitializing.incr(1); + } + + @Override + public void decrSourceInitializing() { + sourceInitializing.decr(1); + } + + @Override + public int getSourceInitializing() { + return sourceInitializing.value(); + } + @Override public void init() { rms.init(); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index ac396afdc022..490aec920033 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -49,8 +49,8 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; - /* Used to track the age of oldest wal in ms since its creation time */ - String OLDEST_WAL_AGE = "source.oldestWalAge"; + // This is to track the num of replication sources getting initialized + public static final String SOURCE_INITIALIZING = "source.numInitializing"; void setLastShippedAge(long age); void incrSizeOfLogQueue(int size); @@ -80,4 +80,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { long getEditsFiltered(); void setOldestWalAge(long age); long getOldestWalAge(); + void incrSourceInitializing(); + void decrSourceInitializing(); + int getSourceInitializing(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 2ceb77b34ec3..18d536a65e24 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.yetus.audience.InterfaceAudience; @@ -40,6 +41,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String shippedHFilesKey; private final String sizeOfHFileRefsQueueKey; private final String oldestWalAgeKey; + private final String sourceInitializingKey; private final MutableHistogram ageOfLastShippedOpHist; private final MutableGaugeLong sizeOfLogQueueGauge; @@ -67,6 +69,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; private final MutableGaugeLong oldestWalAge; + private final MutableGaugeInt sourceInitializing; public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; @@ -126,6 +129,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); + + sourceInitializingKey = this.keyPrefix + "isInitializing"; + sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0); } @Override public void setLastShippedAge(long age) { @@ -189,6 +195,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri rms.removeMetric(completedLogsKey); rms.removeMetric(completedRecoveryKey); rms.removeMetric(oldestWalAgeKey); + rms.removeMetric(sourceInitializingKey); } @Override @@ -262,6 +269,20 @@ public void incrFailedRecoveryQueue() {/*no op*/} return oldestWalAge.value(); } + @Override + public void incrSourceInitializing() { + sourceInitializing.incr(1); + } + + @Override + public int getSourceInitializing() { + return sourceInitializing.value(); + } + + @Override public void decrSourceInitializing() { + sourceInitializing.decr(1); + } + @Override public void init() { rms.init(); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java index 7e17ee9bf461..7a791c92bc1e 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.concurrent.ConcurrentMap; - import org.apache.hadoop.hbase.metrics.Interns; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsInfo; @@ -30,7 +29,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; @@ -452,6 +450,40 @@ public MutableGaugeLong getGauge(String gaugeName, long potentialStartingValue) return (MutableGaugeLong) metric; } + /** + * Get a MetricMutableGaugeInt from the storage. If it is not there atomically put it. + * + * @param gaugeName name of the gauge to create or get. + * @param potentialStartingValue value of the new gauge if we have to create it. + */ + public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) { + //Try and get the guage. + MutableMetric metric = metricsMap.get(gaugeName); + + //If it's not there then try and put a new one in the storage. + if (metric == null) { + //Create the potential new gauge. + MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""), + potentialStartingValue); + + // Try and put the gauge in. This is atomic. + metric = metricsMap.putIfAbsent(gaugeName, newGauge); + + //If the value we get back is null then the put was successful and we will return that. + //otherwise gaugeInt should contain the thing that was in before the put could be completed. + if (metric == null) { + return newGauge; + } + } + + if (!(metric instanceof MutableGaugeInt)) { + throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName + + " and not of type MetricMutableGaugeInr"); + } + + return (MutableGaugeInt) metric; + } + /** * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 4ef98d7b75f3..a91963ec4616 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -21,16 +21,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.CompatibilitySingletonFactory; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.metrics.BaseSource; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * This class is for maintaining the various replication statistics for a source and publishing them @@ -62,7 +61,8 @@ public MetricsSource(String id) { singleSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) .getSource(id); - globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + globalSourceSource = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); singleSourceSourceByTable = new HashMap<>(); } @@ -168,6 +168,22 @@ public void decrSizeOfLogQueue() { globalSourceSource.decrSizeOfLogQueue(1); } + /** + * Increment the count for initializing sources + */ + public void incrSourceInitializing() { + singleSourceSource.incrSourceInitializing(); + globalSourceSource.incrSourceInitializing(); + } + + /** + * Decrement the count for initializing sources + */ + public void decrSourceInitializing() { + singleSourceSource.decrSourceInitializing(); + globalSourceSource.decrSourceInitializing(); + } + /** * Add on the the number of log edits read * @@ -324,6 +340,14 @@ public long getReplicationDelay() { } } + /** + * Get the source initializing counts + * @return number of replication sources getting initialized + */ + public int getSourceInitializing() { + return singleSourceSource.getSourceInitializing(); + } + /** * Get the slave peer ID * @return peerID diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f3fda674c824..9baf95480dfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -535,7 +535,7 @@ private void initialize() { sleepMultiplier++; } else { retryStartup.set(!this.abortOnError); - this.startupOngoing.set(false); + setSourceStartupStatus(false); throw new RuntimeException("Exhausted retries to start replication endpoint."); } } @@ -543,7 +543,7 @@ private void initialize() { if (!this.isSourceActive()) { retryStartup.set(!this.abortOnError); - this.startupOngoing.set(false); + setSourceStartupStatus(false); throw new IllegalStateException("Source should be active."); } @@ -567,7 +567,7 @@ private void initialize() { if(!this.isSourceActive()) { retryStartup.set(!this.abortOnError); - this.startupOngoing.set(false); + setSourceStartupStatus(false); throw new IllegalStateException("Source should be active."); } LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", @@ -578,7 +578,16 @@ private void initialize() { for (String walGroupId: logQueue.getQueues().keySet()) { tryStartNewShipper(walGroupId); } - this.startupOngoing.set(false); + setSourceStartupStatus(false); + } + + private synchronized void setSourceStartupStatus(boolean initializing) { + startupOngoing.set(initializing); + if (initializing) { + metrics.incrSourceInitializing(); + } else { + metrics.decrSourceInitializing(); + } } @Override @@ -587,7 +596,7 @@ public ReplicationSourceInterface startup() { return this; } this.sourceRunning = true; - startupOngoing.set(true); + setSourceStartupStatus(true); initThread = new Thread(this::initialize); Threads.setDaemonThreadRunning(initThread, Thread.currentThread().getName() + ".replicationSource," + this.queueId, @@ -601,12 +610,12 @@ public ReplicationSourceInterface startup() { do { if(retryStartup.get()) { this.sourceRunning = true; - startupOngoing.set(true); + setSourceStartupStatus(true); retryStartup.set(false); try { initialize(); } catch(Throwable error){ - sourceRunning = false; + setSourceStartupStatus(false); uncaughtException(t, error, null, null); retryStartup.set(!this.abortOnError); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 86a71c95b818..f5d4f7782947 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -455,7 +455,7 @@ public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoin @Override public synchronized UUID getPeerUUID() { - if(count==0) { + if (count==0) { count++; throw new RuntimeException(); } else { @@ -465,6 +465,18 @@ public synchronized UUID getPeerUUID() { } + /** + * Bad Endpoint with failing connection to peer on demand. + */ + public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint { + static boolean failing = true; + + @Override + public synchronized UUID getPeerUUID() { + return failing ? null : super.getPeerUUID(); + } + } + public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { static int count = 0; @@ -554,6 +566,25 @@ public void testAbortFalseOnError() throws IOException { } } + @Test + public void testReplicationSourceInitializingMetric() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean("replication.source.regionserver.abort", false); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + BadReplicationEndpoint.class.getName()); + try { + rs.startup(); + assertTrue(rs.isSourceActive()); + Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 1); + BadReplicationEndpoint.failing = false; + Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 0); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + /** * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, * when eplication.source.regionserver.abort is set to false.