Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();

for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
if(worker.entryReader != null) {
Expand All @@ -710,6 +711,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}

for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
Expand All @@ -728,6 +730,9 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
worker.entryReader.interrupt();
}
}
//If worker is already stopped but there was still entries batched,
//we need to clear buffer used for non processed entries
worker.clearWALEntryBatch();
}

if (join) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.LongAccumulator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -54,7 +56,7 @@ public enum WorkerState {
private final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
private final ReplicationSourceInterface source;
private final ReplicationSource source;

// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
Expand All @@ -74,7 +76,7 @@ public enum WorkerState {
private final int shipEditsTimeout;

public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
this.conf = conf;
this.walGroupId = walGroupId;
this.queue = queue;
Expand Down Expand Up @@ -125,6 +127,7 @@ public final void run() {
if (!isFinished()) {
setWorkerState(WorkerState.STOPPED);
} else {
source.workerThreads.remove(this.walGroupId);
postFinish();
}
}
Expand Down Expand Up @@ -330,4 +333,56 @@ public boolean sleepForRetries(String msg, int sleepMultiplier) {
}
return sleepMultiplier < maxRetriesMultiplier;
}

/**
* Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
* in case there were unprocessed entries batched by the reader to the shipper,
* but the shipper didn't manage to ship those because the replication source is being terminated.
* In that case, it iterates through the batched entries and decrease the pending
* entries size from <code>ReplicationSourceManager.totalBufferUser</code>
* <p/>
* <b>NOTES</b>
* 1) This method should only be called upon replication source termination.
* It blocks waiting for both shipper and reader threads termination,
* to make sure no race conditions
* when updating <code>ReplicationSourceManager.totalBufferUser</code>.
*
* 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
* have been triggered interruption/termination prior to calling this method.
*/
void clearWALEntryBatch() {
long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
while(this.isAlive() || this.entryReader.isAlive()){
try {
if (System.currentTimeMillis() >= timeout) {
LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
+ "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
return;
} else {
// Wait both shipper and reader threads to stop
Thread.sleep(this.sleepForRetries);
}
} catch (InterruptedException e) {
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
+ "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
return;
}
}
LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
entryReader.entryBatchQueue.forEach(w -> {
entryReader.entryBatchQueue.remove(w);
w.getWalEntries().forEach(e -> {
long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
});
});
if( LOG.isTraceEnabled()) {
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
}
long newBufferUsed = source.getSourceManager().getTotalBufferUsed()
.addAndGet(-totalToDecrement.longValue());
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter;
private final ReplicationSource source;

private final BlockingQueue<WALEntryBatch> entryBatchQueue;
@InterfaceAudience.Private
final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


import java.io.IOException;
import java.util.ArrayList;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -131,6 +133,8 @@ public void testDefaultSkipsMetaWAL() throws IOException {
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics()).
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
Expand Down Expand Up @@ -272,6 +276,47 @@ public void testTerminateTimeout() throws Exception {
}
}

@Test
public void testTerminateClearsBuffer() throws Exception {
ReplicationSource source = new ReplicationSource();
ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
MetricsReplicationGlobalSourceSource mockMetrics =
mock(MetricsReplicationGlobalSourceSource.class);
AtomicLong buffer = new AtomicLong();
Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
source.init(testConf, null, mockManager, null, mockPeer, null,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source);
ReplicationSourceShipper shipper =
new ReplicationSourceShipper(conf, null, null, source);
shipper.entryReader = reader;
source.workerThreads.put("testPeer", shipper);
WALEntryBatch batch = new WALEntryBatch(10, logDir);
WAL.Entry mockEntry = mock(WAL.Entry.class);
WALEdit mockEdit = mock(WALEdit.class);
WALKeyImpl mockKey = mock(WALKeyImpl.class);
when(mockEntry.getEdit()).thenReturn(mockEdit);
when(mockEdit.isEmpty()).thenReturn(false);
when(mockEntry.getKey()).thenReturn(mockKey);
when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
when(mockEdit.heapSize()).thenReturn(10000L);
when(mockEdit.size()).thenReturn(0);
ArrayList<Cell> cells = new ArrayList<>();
KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
Bytes.toBytes("1"), Bytes.toBytes("v1"));
cells.add(kv);
when(mockEdit.getCells()).thenReturn(cells);
reader.addEntryToBatch(batch, mockEntry);
reader.entryBatchQueue.put(batch);
source.terminate("test");
assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
}

/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
Expand Down Expand Up @@ -471,6 +516,8 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics()).
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
Expand Down