Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -65,7 +66,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -260,6 +260,11 @@ public void enqueueLog(Path wal) {
}
}

@InterfaceAudience.Private
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
return logQueue.getQueues();
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;

Expand Down Expand Up @@ -123,44 +122,64 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
@Override
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
WALEntryBatch batch = null;
WALEntryStream entryStream = null;
try {
// we only loop back here if something fatal happened to our stream
while (isReaderRunning()) {
try {
entryStream =
new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}

batch = createBatch(entryStream);
batch = readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
if (batch == null) {
// either the queue have no WAL to read
// or got no new entries (didn't advance position in WAL)
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
} else {
addBatchToShippingQueue(batch);
}
}
WALEntryBatch batch = readWALEntries(entryStream);
currentPosition = entryStream.getPosition();
if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
} catch (IOException e) { // stream related
if (handleEofException(e, batch)) {
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
} else {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} finally {
entryStream.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See earlier branch-1 review as to why the try blocks have been restructured here. lgtm

}
} catch (IOException e) { // stream related
if (!handleEofException(e)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier ++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
} catch (IOException e) {
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
}

Expand Down Expand Up @@ -189,14 +208,19 @@ protected static final boolean switched(WALEntryStream entryStream, Path path) {
return newPath == null || !path.getName().equals(newPath.getName());
}

protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
// We need to get the WALEntryBatch from the caller so we can add entries in there
// This is required in case there is any exception in while reading entries
// we do want to loss the existing entries in the batch
protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
WALEntryBatch batch) throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
// This would mean either no more files in the queue
// or there is no new data yet on the current wal
return null;
}
}
Expand All @@ -208,7 +232,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
WALEntryBatch batch = createBatch(entryStream);
batch.setLastWalPath(currentPath);
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
Expand Down Expand Up @@ -236,6 +260,7 @@ private void handleEmptyWALEntryBatch() throws InterruptedException {
if (logQueue.getQueue(walGroupId).isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state.
LOG.debug("Stopping the replication source wal reader");
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
Expand All @@ -245,22 +270,38 @@ private void handleEmptyWALEntryBatch() throws InterruptedException {
}

/**
* if we get an EOF due to a zero-length log, and there are other logs in queue
* (highly likely we've closed the current log), and autorecovery is
* enabled, then dump the log
* This is to handle the EOFException from the WAL entry stream. EOFException should
* be handled carefully because there are chances of data loss because of never replicating
* the data. Thus we should always try to ship existing batch of entries here.
* If there was only one log in the queue before EOF, we ship the empty batch here
* and since reader is still active, in the next iteration of reader we will
* stop the reader.
* If there was more than one log in the queue before EOF, we ship the existing batch
* and reset the wal patch and position to the log with EOF, so shipper can remove
* logs from replication queue
* @return true only the IOE can be handled
*/
private boolean handleEofException(IOException e) {
private boolean handleEofException(IOException e, WALEntryBatch batch)
throws InterruptedException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
if ((e instanceof EOFException || e.getCause() instanceof EOFException)
&& (source.isRecovered() || queue.size() > 1)
&& this.eofAutoRecovery) {
Path head = queue.peek();
try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
if (fs.getFileStatus(head).getLen() == 0) {
// head of the queue is an empty log file
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
logQueue.remove(walGroupId);
currentPosition = 0;
// After we removed the WAL from the queue, we should
// try shipping the existing batch of entries and set the wal position
// and path to the wal just dequeued to correctly remove logs from the zk
batch.setLastWalPath(head);
batch.setLastWalPosition(currentPosition);
addBatchToShippingQueue(batch);
return true;
}
} catch (IOException ioe) {
Expand All @@ -270,6 +311,20 @@ private boolean handleEofException(IOException e) {
return false;
}

/**
* Update the batch try to ship and return true if shipped
* @param batch Batch of entries to ship
* @throws InterruptedException throws interrupted exception
* @throws IOException throws io exception from stream
*/
private void addBatchToShippingQueue(WALEntryBatch batch)
throws InterruptedException, IOException {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
}

public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
}

@Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
Expand All @@ -70,7 +70,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream);
batch = createBatch(entryStream);
for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public Path getLastWalPath() {
return lastWalPath;
}

public void setLastWalPath(Path lastWalPath) {
this.lastWalPath = lastWalPath;
}

/**
* @return the position in the last WAL that was read.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
* @param walFileLengthProvider provides the length of the WAL file
* @param serverName the server name which all WALs belong to
* @param metrics the replication metrics
* @throws IOException
* @throws IOException throw IO exception from stream
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
Expand Down Expand Up @@ -368,7 +368,9 @@ private void openReader(Path path) throws IOException {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) throw ioe;
if (!(ioe instanceof FileNotFoundException)) {
throw ioe;
}
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -44,18 +44,20 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

/**
Expand Down Expand Up @@ -87,6 +89,8 @@ public class TestReplicationBase {
NB_ROWS_IN_BATCH * 10;
protected static final long SLEEP_TIME = 500;
protected static final int NB_RETRIES = 50;
protected static AtomicInteger replicateCount = new AtomicInteger();
protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();

protected static final TableName tableName = TableName.valueOf("test");
protected static final byte[] famName = Bytes.toBytes("f");
Expand Down Expand Up @@ -281,7 +285,8 @@ private boolean peerExist(String peerId) throws IOException {
public void setUpBase() throws Exception {
if (!peerExist(PEER_ID2)) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer());
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
ReplicationEndpointTest.class.getName());
if (isSyncPeer()) {
FileSystem fs2 = UTIL2.getTestFileSystem();
// The remote wal dir is not important as we do not use it in DA state, here we only need to
Expand Down Expand Up @@ -378,4 +383,20 @@ public static void tearDownAfterClass() throws Exception {
UTIL2.shutdownMiniCluster();
UTIL1.shutdownMiniCluster();
}

/**
* Custom replication endpoint to keep track of replication status for tests.
*/
public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
public ReplicationEndpointTest() {
replicateCount.set(0);
}

@Override public boolean replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
replicatedEntries.addAll(replicateContext.getEntries());

return super.replicate(replicateContext);
}
}
}
Loading