From 524f7b00dc0a5075a734b80249658af59f8e2e48 Mon Sep 17 00:00:00 2001 From: Rushabh Date: Thu, 28 Jan 2021 07:23:16 -0800 Subject: [PATCH 1/2] [HBASE-25536] Remove 0 length wal file from logQueue if it belongs to old sources. --- .../replication/regionserver/ReplicationSourceWALReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index a6d87870b495..4296c7f0c79c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -248,7 +248,7 @@ private void handleEmptyWALEntryBatch() throws InterruptedException { // enabled, then dump the log private void handleEofException(IOException e) { if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - logQueue.size() > 1 && this.eofAutoRecovery) { + (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) { try { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); From 07fb533c7d3d2f53e107ae113ad3a7a36ca32176 Mon Sep 17 00:00:00 2001 From: Rushabh Date: Thu, 28 Jan 2021 12:34:36 -0800 Subject: [PATCH 2/2] [HBASE-25536] Adding test case --- .../ReplicationSourceWALReader.java | 2 ++ .../regionserver/TestWALEntryStream.java | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 4296c7f0c79c..be262a6d9504 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -247,6 +247,8 @@ private void handleEmptyWALEntryBatch() throws InterruptedException { // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log private void handleEofException(IOException e) { + // Dump the log even if logQueue size is 1 if the source is from recovered Source + // since we don't add current log to recovered source queue so it is safe to remove. if ((e instanceof EOFException || e.getCause() instanceof EOFException) && (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) { try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 63e7a8b90496..1db9c175e922 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -652,4 +653,33 @@ public void testReadBeyondCommittedLength() throws IOException, InterruptedExcep assertFalse(entryStream.hasNext()); } } + + /* + Test removal of 0 length log from logQueue if the source is a recovered source and + size of logQueue is only 1. + */ + @Test + public void testEOFExceptionForRecoveredQueue() throws Exception { + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + // Create a 0 length log. + Path emptyLog = new Path("emptyLog"); + FSDataOutputStream fsdos = fs.create(emptyLog); + fsdos.close(); + assertEquals(0, fs.getFileStatus(emptyLog).getLen()); + queue.add(emptyLog); + + Configuration conf = new Configuration(CONF); + // Override the max retries multiplier to fail fast. + conf.setInt("replication.source.maxretriesmultiplier", 1); + conf.setBoolean("replication.source.eof.autorecovery", true); + // Create a reader thread with source as recovered source. + ReplicationSource source = mockReplicationSource(true, conf); + when(source.isPeerEnabled()).thenReturn(true); + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source); + reader.run(); + // ReplicationSourceWALReaderThread#handleEofException method will + // remove empty log from logQueue. + assertEquals(0, queue.size()); + } }