Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class MetricsReplicationGlobalSourceSourceImpl
implements MetricsReplicationGlobalSourceSource {
implements MetricsReplicationGlobalSourceSource {
private static final String KEY_PREFIX = "source.";

private final MetricsReplicationSourceImpl rms;
Expand All @@ -49,6 +50,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
private final MutableGaugeLong walReaderBufferUsageBytes;
private final MutableGaugeInt sourceInitializing;

public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
this.rms = rms;
Expand Down Expand Up @@ -89,6 +91,7 @@ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms

walReaderBufferUsageBytes = rms.getMetricsRegistry()
.getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
}

@Override public void setLastShippedAge(long age) {
Expand Down Expand Up @@ -198,6 +201,21 @@ public long getOldestWalAge() {
return 0;
}

@Override
public void incrSourceInitializing() {
sourceInitializing.incr(1);
}

@Override
public void decrSourceInitializing() {
sourceInitializing.decr(1);
}

@Override
public int getSourceInitializing() {
return sourceInitializing.value();
}

@Override
public void init() {
rms.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
/* Used to track the age of oldest wal in ms since its creation time */
String OLDEST_WAL_AGE = "source.oldestWalAge";
// This is to track the num of replication sources getting initialized
public static final String SOURCE_INITIALIZING = "source.numInitializing";

void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
Expand Down Expand Up @@ -80,4 +80,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
long getEditsFiltered();
void setOldestWalAge(long age);
long getOldestWalAge();
void incrSourceInitializing();
void decrSourceInitializing();
int getSourceInitializing();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -40,6 +41,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey;
private final String oldestWalAgeKey;
private final String sourceInitializingKey;

private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
Expand Down Expand Up @@ -67,6 +69,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableGaugeLong oldestWalAge;
private final MutableGaugeInt sourceInitializing;

public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
Expand Down Expand Up @@ -126,6 +129,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri

oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);

sourceInitializingKey = this.keyPrefix + "isInitializing";
sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0);
}

@Override public void setLastShippedAge(long age) {
Expand Down Expand Up @@ -189,6 +195,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey);
rms.removeMetric(oldestWalAgeKey);
rms.removeMetric(sourceInitializingKey);
}

@Override
Expand Down Expand Up @@ -262,6 +269,20 @@ public void incrFailedRecoveryQueue() {/*no op*/}
return oldestWalAge.value();
}

@Override
public void incrSourceInitializing() {
sourceInitializing.incr(1);
}

@Override
public int getSourceInitializing() {
return sourceInitializing.value();
}

@Override public void decrSourceInitializing() {
sourceInitializing.decr(1);
}

@Override
public void init() {
rms.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Collection;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsInfo;
Expand All @@ -30,7 +29,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;

Expand Down Expand Up @@ -452,6 +450,40 @@ public MutableGaugeLong getGauge(String gaugeName, long potentialStartingValue)
return (MutableGaugeLong) metric;
}

/**
* Get a MetricMutableGaugeInt from the storage. If it is not there atomically put it.
*
* @param gaugeName name of the gauge to create or get.
* @param potentialStartingValue value of the new gauge if we have to create it.
*/
public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) {
//Try and get the guage.
MutableMetric metric = metricsMap.get(gaugeName);

//If it's not there then try and put a new one in the storage.
if (metric == null) {
//Create the potential new gauge.
MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""),
potentialStartingValue);

// Try and put the gauge in. This is atomic.
metric = metricsMap.putIfAbsent(gaugeName, newGauge);

//If the value we get back is null then the put was successful and we will return that.
//otherwise gaugeInt should contain the thing that was in before the put could be completed.
if (metric == null) {
return newGauge;
}
}

if (!(metric instanceof MutableGaugeInt)) {
throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName +
" and not of type MetricMutableGaugeInr");
}

return (MutableGaugeInt) metric;
}

/**
* Get a MetricMutableCounterLong from the storage. If it is not there atomically put it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

/**
* This class is for maintaining the various replication statistics for a source and publishing them
Expand Down Expand Up @@ -62,7 +61,8 @@ public MetricsSource(String id) {
singleSourceSource =
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
.getSource(id);
globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
globalSourceSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
singleSourceSourceByTable = new HashMap<>();
}

Expand Down Expand Up @@ -168,6 +168,22 @@ public void decrSizeOfLogQueue() {
globalSourceSource.decrSizeOfLogQueue(1);
}

/**
* Increment the count for initializing sources
*/
public void incrSourceInitializing() {
singleSourceSource.incrSourceInitializing();
globalSourceSource.incrSourceInitializing();
}

/**
* Decrement the count for initializing sources
*/
public void decrSourceInitializing() {
singleSourceSource.decrSourceInitializing();
globalSourceSource.decrSourceInitializing();
}

/**
* Add on the the number of log edits read
*
Expand Down Expand Up @@ -324,6 +340,14 @@ public long getReplicationDelay() {
}
}

/**
* Get the source initializing counts
* @return number of replication sources getting initialized
*/
public int getSourceInitializing() {
return singleSourceSource.getSourceInitializing();
}

/**
* Get the slave peer ID
* @return peerID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,15 @@ private void initialize() {
sleepMultiplier++;
} else {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
setSourceStartupStatus(false);
throw new RuntimeException("Exhausted retries to start replication endpoint.");
}
}
}

if (!this.isSourceActive()) {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
setSourceStartupStatus(false);
throw new IllegalStateException("Source should be active.");
}

Expand All @@ -567,7 +567,7 @@ private void initialize() {

if(!this.isSourceActive()) {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
setSourceStartupStatus(false);
throw new IllegalStateException("Source should be active.");
}
LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
Expand All @@ -578,7 +578,16 @@ private void initialize() {
for (String walGroupId: logQueue.getQueues().keySet()) {
tryStartNewShipper(walGroupId);
}
this.startupOngoing.set(false);
setSourceStartupStatus(false);
}

private synchronized void setSourceStartupStatus(boolean initializing) {
startupOngoing.set(initializing);
if (initializing) {
metrics.incrSourceInitializing();
} else {
metrics.decrSourceInitializing();
}
}

@Override
Expand All @@ -587,7 +596,7 @@ public ReplicationSourceInterface startup() {
return this;
}
this.sourceRunning = true;
startupOngoing.set(true);
setSourceStartupStatus(true);
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
Expand All @@ -601,12 +610,12 @@ public ReplicationSourceInterface startup() {
do {
if(retryStartup.get()) {
this.sourceRunning = true;
startupOngoing.set(true);
setSourceStartupStatus(true);
retryStartup.set(false);
try {
initialize();
} catch(Throwable error){
sourceRunning = false;
setSourceStartupStatus(false);
uncaughtException(t, error, null, null);
retryStartup.set(!this.abortOnError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoin

@Override
public synchronized UUID getPeerUUID() {
if(count==0) {
if (count==0) {
count++;
throw new RuntimeException();
} else {
Expand All @@ -465,6 +465,18 @@ public synchronized UUID getPeerUUID() {

}

/**
* Bad Endpoint with failing connection to peer on demand.
*/
public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
static boolean failing = true;

@Override
public synchronized UUID getPeerUUID() {
return failing ? null : super.getPeerUUID();
}
}

public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {

static int count = 0;
Expand Down Expand Up @@ -554,6 +566,25 @@ public void testAbortFalseOnError() throws IOException {
}
}

@Test
public void testReplicationSourceInitializingMetric() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setBoolean("replication.source.regionserver.abort", false);
ReplicationSource rs = new ReplicationSource();
RegionServerServices rss = setupForAbortTests(rs, conf,
BadReplicationEndpoint.class.getName());
try {
rs.startup();
assertTrue(rs.isSourceActive());
Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
BadReplicationEndpoint.failing = false;
Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
} finally {
rs.terminate("Done");
rss.stop("Done");
}
}

/**
* Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
* when <b>eplication.source.regionserver.abort</b> is set to false.
Expand Down