From 062cf2f688bb7c0dd7a7f1dec29bb81003f8be56 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Thu, 4 Mar 2021 16:47:04 -0800 Subject: [PATCH 1/4] HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized --- ...ricsReplicationGlobalSourceSourceImpl.java | 19 +++++++++- .../MetricsReplicationSourceSource.java | 6 ++-- .../MetricsReplicationSourceSourceImpl.java | 20 +++++++++++ .../regionserver/MetricsSource.java | 36 +++++++++++++++---- .../regionserver/ReplicationSource.java | 24 +++++++++---- .../regionserver/TestReplicationSource.java | 35 ++++++++++++++++++ 6 files changed, 124 insertions(+), 16 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index d4cc1e349952..62cea3c87bec 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -25,7 +25,7 @@ @InterfaceAudience.Private public class MetricsReplicationGlobalSourceSourceImpl - implements MetricsReplicationGlobalSourceSource { + implements MetricsReplicationGlobalSourceSource { private static final String KEY_PREFIX = "source."; private final MetricsReplicationSourceImpl rms; @@ -49,6 +49,7 @@ public class MetricsReplicationGlobalSourceSourceImpl private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue; private final MutableGaugeLong walReaderBufferUsageBytes; + private final MutableGaugeLong sourceInitializing; public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { this.rms = rms; @@ -89,6 +90,7 @@ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms walReaderBufferUsageBytes = rms.getMetricsRegistry() .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); + sourceInitializing = rms.getMetricsRegistry().getGauge(SOURCE_INITIALIZING, 0L); } @Override public void setLastShippedAge(long age) { @@ -198,6 +200,21 @@ public long getOldestWalAge() { return 0; } + @Override + public void incrSourceInitializing() { + sourceInitializing.incr(1L); + } + + @Override + public void decrSourceInitializing() { + sourceInitializing.decr(1L); + } + + @Override + public int getSourceInitializing() { + return (int)sourceInitializing.value(); + } + @Override public void init() { rms.init(); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index ac396afdc022..a4386d740157 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -49,8 +49,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; - /* Used to track the age of oldest wal in ms since its creation time */ - String OLDEST_WAL_AGE = "source.oldestWalAge"; + public static final String SOURCE_INITIALIZING = "source.initializing"; void setLastShippedAge(long age); void incrSizeOfLogQueue(int size); @@ -80,4 +79,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { long getEditsFiltered(); void setOldestWalAge(long age); long getOldestWalAge(); + void incrSourceInitializing(); + void decrSourceInitializing(); + int getSourceInitializing(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 2ceb77b34ec3..1cdba0016581 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -40,6 +40,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String shippedHFilesKey; private final String sizeOfHFileRefsQueueKey; private final String oldestWalAgeKey; + private final String sourceInitializingKey; private final MutableHistogram ageOfLastShippedOpHist; private final MutableGaugeLong sizeOfLogQueueGauge; @@ -67,6 +68,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; private final MutableGaugeLong oldestWalAge; + private final MutableGaugeLong sourceInitializing; public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; @@ -126,6 +128,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); + + sourceInitializingKey = this.keyPrefix + "sourceInitializing"; + sourceInitializing = rms.getMetricsRegistry().getGauge(sourceInitializingKey, 0L); } @Override public void setLastShippedAge(long age) { @@ -189,6 +194,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri rms.removeMetric(completedLogsKey); rms.removeMetric(completedRecoveryKey); rms.removeMetric(oldestWalAgeKey); + rms.removeMetric(sourceInitializingKey); } @Override @@ -262,6 +268,20 @@ public void incrFailedRecoveryQueue() {/*no op*/} return oldestWalAge.value(); } + @Override + public void incrSourceInitializing() { + sourceInitializing.incr(1L); + } + + @Override + public int getSourceInitializing() { + return (int)sourceInitializing.value(); + } + + @Override public void decrSourceInitializing() { + sourceInitializing.decr(1L); + } + @Override public void init() { rms.init(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 4ef98d7b75f3..53f6d5ccd553 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -21,16 +21,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.CompatibilitySingletonFactory; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.metrics.BaseSource; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * This class is for maintaining the various replication statistics for a source and publishing them @@ -62,7 +61,8 @@ public MetricsSource(String id) { singleSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) .getSource(id); - globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + globalSourceSource = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); singleSourceSourceByTable = new HashMap<>(); } @@ -168,6 +168,22 @@ public void decrSizeOfLogQueue() { globalSourceSource.decrSizeOfLogQueue(1); } + /** + * Increment the count for initializing sources + */ + public void incrSourceInitializing() { + singleSourceSource.incrSourceInitializing(); + globalSourceSource.incrSourceInitializing(); + } + + /** + * Decrement the count for initializing sources + */ + public void decrSourceInitializing() { + singleSourceSource.decrSourceInitializing(); + globalSourceSource.decrSourceInitializing(); + } + /** * Add on the the number of log edits read * @@ -324,6 +340,14 @@ public long getReplicationDelay() { } } + /** + * Get the source initializing counts + * @return sizeOfLogQueue + */ + public int getSourceInitializing() { + return singleSourceSource.getSourceInitializing(); + } + /** * Get the slave peer ID * @return peerID diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f3fda674c824..95511589f9d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -535,7 +535,7 @@ private void initialize() { sleepMultiplier++; } else { retryStartup.set(!this.abortOnError); - this.startupOngoing.set(false); + setSourceStartupStatus(false); throw new RuntimeException("Exhausted retries to start replication endpoint."); } } @@ -543,7 +543,7 @@ private void initialize() { if (!this.isSourceActive()) { retryStartup.set(!this.abortOnError); - this.startupOngoing.set(false); + setSourceStartupStatus(false); throw new IllegalStateException("Source should be active."); } @@ -567,7 +567,7 @@ private void initialize() { if(!this.isSourceActive()) { retryStartup.set(!this.abortOnError); - this.startupOngoing.set(false); + setSourceStartupStatus(false); throw new IllegalStateException("Source should be active."); } LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", @@ -578,7 +578,17 @@ private void initialize() { for (String walGroupId: logQueue.getQueues().keySet()) { tryStartNewShipper(walGroupId); } - this.startupOngoing.set(false); + setSourceStartupStatus(false); + } + + private synchronized void setSourceStartupStatus(boolean initializing) { + if (initializing) { + startupOngoing.set(true); + metrics.incrSourceInitializing(); + } else { + startupOngoing.set(false); + metrics.decrSourceInitializing(); + } } @Override @@ -587,7 +597,7 @@ public ReplicationSourceInterface startup() { return this; } this.sourceRunning = true; - startupOngoing.set(true); + setSourceStartupStatus(true); initThread = new Thread(this::initialize); Threads.setDaemonThreadRunning(initThread, Thread.currentThread().getName() + ".replicationSource," + this.queueId, @@ -601,12 +611,12 @@ public ReplicationSourceInterface startup() { do { if(retryStartup.get()) { this.sourceRunning = true; - startupOngoing.set(true); + setSourceStartupStatus(true); retryStartup.set(false); try { initialize(); } catch(Throwable error){ - sourceRunning = false; + setSourceStartupStatus(false); uncaughtException(t, error, null, null); retryStartup.set(!this.abortOnError); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 86a71c95b818..8fd48822ea6c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -465,6 +465,22 @@ public synchronized UUID getPeerUUID() { } + /** + * Bad Endpoint with failing connection to peer on demand. + */ + public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint { + static boolean failing = true; + + @Override + public synchronized UUID getPeerUUID() { + if(failing) { + return null; + } else { + return super.getPeerUUID(); + } + } + } + public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { static int count = 0; @@ -554,6 +570,25 @@ public void testAbortFalseOnError() throws IOException { } } + @Test + public void testReplicationSourceInitializingMetric() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean("replication.source.regionserver.abort", false); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + BadReplicationEndpoint.class.getName()); + try { + rs.startup(); + assertTrue(rs.isSourceActive()); + Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 1); + BadReplicationEndpoint.failing = false; + Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 0); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + /** * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, * when eplication.source.regionserver.abort is set to false. From 2b0461cf93b128b51a71f4889e269dc5e70a4d5d Mon Sep 17 00:00:00 2001 From: sandeepvinayak Date: Thu, 11 Mar 2021 01:13:15 -0800 Subject: [PATCH 2/4] Addressing comments --- ...ricsReplicationGlobalSourceSourceImpl.java | 11 +++--- .../MetricsReplicationSourceSource.java | 3 +- .../MetricsReplicationSourceSourceImpl.java | 13 ++++--- .../metrics2/lib/DynamicMetricsRegistry.java | 37 ++++++++++++++++++- .../regionserver/MetricsSource.java | 2 +- .../regionserver/ReplicationSource.java | 3 +- .../regionserver/TestReplicationSource.java | 4 +- 7 files changed, 54 insertions(+), 19 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index 62cea3c87bec..0cc9f9fdec6e 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.yetus.audience.InterfaceAudience; @@ -49,7 +50,7 @@ public class MetricsReplicationGlobalSourceSourceImpl private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue; private final MutableGaugeLong walReaderBufferUsageBytes; - private final MutableGaugeLong sourceInitializing; + private final MutableGaugeInt sourceInitializing; public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { this.rms = rms; @@ -90,7 +91,7 @@ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms walReaderBufferUsageBytes = rms.getMetricsRegistry() .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); - sourceInitializing = rms.getMetricsRegistry().getGauge(SOURCE_INITIALIZING, 0L); + sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0); } @Override public void setLastShippedAge(long age) { @@ -202,17 +203,17 @@ public long getOldestWalAge() { @Override public void incrSourceInitializing() { - sourceInitializing.incr(1L); + sourceInitializing.incr(1); } @Override public void decrSourceInitializing() { - sourceInitializing.decr(1L); + sourceInitializing.decr(1); } @Override public int getSourceInitializing() { - return (int)sourceInitializing.value(); + return sourceInitializing.value(); } @Override diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index a4386d740157..490aec920033 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -49,7 +49,8 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; - public static final String SOURCE_INITIALIZING = "source.initializing"; + // This is to track the num of replication sources getting initialized + public static final String SOURCE_INITIALIZING = "source.numInitializing"; void setLastShippedAge(long age); void incrSizeOfLogQueue(int size); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 1cdba0016581..c1e14dc67b5f 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.yetus.audience.InterfaceAudience; @@ -68,7 +69,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; private final MutableGaugeLong oldestWalAge; - private final MutableGaugeLong sourceInitializing; + private final MutableGaugeInt sourceInitializing; public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; @@ -129,8 +130,8 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); - sourceInitializingKey = this.keyPrefix + "sourceInitializing"; - sourceInitializing = rms.getMetricsRegistry().getGauge(sourceInitializingKey, 0L); + sourceInitializingKey = this.keyPrefix + "numInitializing"; + sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0); } @Override public void setLastShippedAge(long age) { @@ -270,16 +271,16 @@ public void incrFailedRecoveryQueue() {/*no op*/} @Override public void incrSourceInitializing() { - sourceInitializing.incr(1L); + sourceInitializing.incr(1); } @Override public int getSourceInitializing() { - return (int)sourceInitializing.value(); + return sourceInitializing.value(); } @Override public void decrSourceInitializing() { - sourceInitializing.decr(1L); + sourceInitializing.decr(1); } @Override diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java index 7e17ee9bf461..326a659b81fe 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.concurrent.ConcurrentMap; - import org.apache.hadoop.hbase.metrics.Interns; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsInfo; @@ -30,7 +29,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; @@ -452,6 +450,41 @@ public MutableGaugeLong getGauge(String gaugeName, long potentialStartingValue) return (MutableGaugeLong) metric; } + /** + * Get a MetricMutableGaugeInt from the storage. If it is not there atomically put it. + * + * @param gaugeName name of the gauge to create or get. + * @param potentialStartingValue value of the new gauge if we have to create it. + */ + public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) { + //Try and get the guage. + MutableMetric metric = metricsMap.get(gaugeName); + + //If it's not there then try and put a new one in the storage. + if (metric == null) { + + //Create the potential new gauge. + MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""), + potentialStartingValue); + + // Try and put the gauge in. This is atomic. + metric = metricsMap.putIfAbsent(gaugeName, newGauge); + + //If the value we get back is null then the put was successful and we will return that. + //otherwise gaugeLong should contain the thing that was in before the put could be completed. + if (metric == null) { + return newGauge; + } + } + + if (!(metric instanceof MutableGaugeInt)) { + throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName + + " and not of type MetricMutableGaugeInr"); + } + + return (MutableGaugeInt) metric; + } + /** * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 53f6d5ccd553..a91963ec4616 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -342,7 +342,7 @@ public long getReplicationDelay() { /** * Get the source initializing counts - * @return sizeOfLogQueue + * @return number of replication sources getting initialized */ public int getSourceInitializing() { return singleSourceSource.getSourceInitializing(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 95511589f9d0..9baf95480dfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -582,11 +582,10 @@ private void initialize() { } private synchronized void setSourceStartupStatus(boolean initializing) { + startupOngoing.set(initializing); if (initializing) { - startupOngoing.set(true); metrics.incrSourceInitializing(); } else { - startupOngoing.set(false); metrics.decrSourceInitializing(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 8fd48822ea6c..0b39758ee768 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -455,7 +455,7 @@ public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoin @Override public synchronized UUID getPeerUUID() { - if(count==0) { + if (count==0) { count++; throw new RuntimeException(); } else { @@ -473,7 +473,7 @@ public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint @Override public synchronized UUID getPeerUUID() { - if(failing) { + if (failing) { return null; } else { return super.getPeerUUID(); From 0faa6e3a9870cc6132ad017f94ac472fb5af8d70 Mon Sep 17 00:00:00 2001 From: sandeepvinayak Date: Mon, 15 Mar 2021 17:52:00 -0700 Subject: [PATCH 3/4] Addressing comments --- .../regionserver/MetricsReplicationSourceSourceImpl.java | 2 +- .../org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index c1e14dc67b5f..18d536a65e24 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -130,7 +130,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); - sourceInitializingKey = this.keyPrefix + "numInitializing"; + sourceInitializingKey = this.keyPrefix + "isInitializing"; sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java index 326a659b81fe..7be7e16c6ece 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java @@ -471,7 +471,7 @@ public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) metric = metricsMap.putIfAbsent(gaugeName, newGauge); //If the value we get back is null then the put was successful and we will return that. - //otherwise gaugeLong should contain the thing that was in before the put could be completed. + //otherwise gaugeInt should contain the thing that was in before the put could be completed. if (metric == null) { return newGauge; } From 79f6323786c43f666716c5e31f9d0ab60cfdfc15 Mon Sep 17 00:00:00 2001 From: sandeepvinayak Date: Tue, 16 Mar 2021 01:11:14 -0700 Subject: [PATCH 4/4] Addressing comments --- .../apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java | 1 - .../replication/regionserver/TestReplicationSource.java | 6 +----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java index 7be7e16c6ece..7a791c92bc1e 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java @@ -462,7 +462,6 @@ public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) //If it's not there then try and put a new one in the storage. if (metric == null) { - //Create the potential new gauge. MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""), potentialStartingValue); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 0b39758ee768..f5d4f7782947 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -473,11 +473,7 @@ public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint @Override public synchronized UUID getPeerUUID() { - if (failing) { - return null; - } else { - return super.getPeerUUID(); - } + return failing ? null : super.getPeerUUID(); } }