Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHan
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
startPosition, fs, conf, readerFilter, metrics);
startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this);
Threads.setDaemonThreadRunning(entryReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
private AtomicLong totalBufferUsed;
private long totalBufferQuota;

private ReplicationSource source;

/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
Expand All @@ -94,8 +96,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
*/
public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
long startPosition,
FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter,
MetricsSource metrics, ReplicationSource source) {
this.replicationQueueInfo = replicationQueueInfo;
this.logQueue = logQueue;
this.lastReadPath = logQueue.peek();
Expand All @@ -118,6 +120,7 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.metrics = metrics;
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
this.source = source;
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
Expand All @@ -132,6 +135,10 @@ public void run() {
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a safeguard to prevent accumulation of batches right? No other implications of the patch that I can think of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah and the accumulation of batches can lead to major problems, because it's been accounted on overall buffer usage by ReplicationSourceManager. If buffer usage reaches the quota limits, replication becomes stuck. And since we check the buffer usage at ReplicationSourceManager, that means a single buffer for all peers. If one peer is disabled, while other source peers were supposed to continue to get replicated edits, those source would also be stuck because of this, until an RS restart.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I've been following the jira updates on the issue that Josh created.

Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
Expand All @@ -38,7 +40,12 @@
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -81,7 +88,9 @@
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

@RunWith(MockitoJUnitRunner.class)
@Category({ ReplicationTests.class, LargeTests.class })
Expand Down Expand Up @@ -359,10 +368,12 @@ public void testReplicationSourceWALReaderThread() throws Exception {

// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread batcher =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
fs, conf, getDummyFilter(), new MetricsSource("1"));
fs, conf, getDummyFilter(), new MetricsSource("1"), source);
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
Expand Down Expand Up @@ -398,10 +409,13 @@ fs, conf, new MetricsSource("1"))) {
}

ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1"));
walQueue, 0, fs, conf, getDummyFilter(),
new MetricsSource("1"), source);
Path walPath = walQueue.toArray(new Path[2])[1];
reader.start();
WALEntryBatch entryBatch = reader.take();
Expand Down Expand Up @@ -456,10 +470,12 @@ public void testReplicationSourceWALReaderThreadWithFilter() throws Exception {
appendEntriesToLog(2);

ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
0, fs, conf, filter, new MetricsSource("1"));
0, fs, conf, filter, new MetricsSource("1"), source);
reader.start();

WALEntryBatch entryBatch = reader.take();
Expand Down Expand Up @@ -490,10 +506,12 @@ public void testReplicationSourceWALReaderThreadWithFilterWhenLogRolled() throws
final long eof = getPosition(firstWAL);

ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
0, fs, conf, filter, new MetricsSource("1"));
0, fs, conf, filter, new MetricsSource("1"), source);
reader.start();

// reader won't put any batch, even if EOF reached.
Expand Down Expand Up @@ -613,4 +631,65 @@ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
currentPath = newPath;
}
}

@Test
public void testReplicationSourceWALReaderDisabled()
throws IOException, InterruptedException, ExecutionException {
for(int i=0; i<3; i++) {
//append and sync
appendToLog("key" + i);
}
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}

// start up a reader
Path walPath = walQueue.peek();
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));

final AtomicBoolean enabled = new AtomicBoolean(false);
when(source.isPeerEnabled()).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
return enabled.get();
}
});

ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
0, fs, conf, getDummyFilter(), new MetricsSource("1"), source);

reader.start();
Future<WALEntryBatch> future =
Executors.newSingleThreadExecutor().submit(new Callable<WALEntryBatch>() {
@Override
public WALEntryBatch call() throws Exception {
return reader.take();
}
});

// make sure that the isPeerEnabled has been called several times
verify(source, timeout(30000).atLeast(5)).isPeerEnabled();
// confirm that we can read nothing if the peer is disabled
assertFalse(future.isDone());
// then enable the peer, we should get the batch
enabled.set(true);
WALEntryBatch entryBatch = future.get();

// should've batched up our entries
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
}
}