From 36818f7ef14182652c0e1a50dd491c5afe7486f7 Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Mon, 19 Jul 2021 12:59:56 -0400 Subject: [PATCH 1/6] HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory --- .../ReplicationSourceWALReader.java | 33 +++++++++---- .../regionserver/WALEntryStream.java | 3 +- .../regionserver/TestBasicWALEntryStream.java | 49 +++++++++++++++++++ 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index ca4118480772..eaf6c8d63ef4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -124,10 +124,12 @@ public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream WALEntryBatch batch = null; - try (WALEntryStream entryStream = + WALEntryStream entryStream = null; + try { + entryStream = new WALEntryStream(logQueue, conf, currentPosition, - source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), - source.getSourceMetrics(), walGroupId)) { + source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + source.getSourceMetrics(), walGroupId); while (isReaderRunning()) { // loop here to keep reusing stream while we can batch = null; if (!source.isPeerEnabled()) { @@ -156,7 +158,7 @@ public void run() { sleepMultiplier = 1; } } catch (WALEntryFilterRetryableException | IOException e) { // stream related - if (!handleEofException(e, batch)) { + if (!handleEofException(e, batch, entryStream)) { LOG.warn("Failed to read stream of replication entries", e); if (sleepMultiplier < maxRetriesMultiplier) { sleepMultiplier++; @@ -166,6 +168,14 @@ public void run() { } catch (InterruptedException e) { LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue"); Thread.currentThread().interrupt(); + } finally { + try { + if (entryStream != null) { + entryStream.close(); + } + } catch (IOException ioe) { + LOG.warn("Exception while closing WALEntryStream", ioe); + } } } } @@ -268,17 +278,20 @@ private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStre * logs from replication queue * @return true only the IOE can be handled */ - private boolean handleEofException(Exception e, WALEntryBatch batch) { + private boolean handleEofException(Exception e, WALEntryBatch batch, WALEntryStream entryStream) { PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source // since we don't add current log to recovered source queue so it is safe to remove. if ((e instanceof EOFException || e.getCause() instanceof EOFException) && (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { - Path head = queue.peek(); + Path path = queue.peek(); try { - if (fs.getFileStatus(head).getLen() == 0) { - // head of the queue is an empty log file - LOG.warn("Forcing removal of 0 length log in queue: {}", head); + if (!fs.exists(path)) { + // There is a chance that wal has moved to oldWALs directory, so look there also. + path = entryStream.getArchivedLog(path); + } + if (fs.getFileStatus(path).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: {}", path); logQueue.remove(walGroupId); currentPosition = 0; if (batch != null) { @@ -289,7 +302,7 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) { return true; } } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe); + LOG.warn("Couldn't get file length information about log " + path, ioe); } catch (InterruptedException ie) { LOG.trace("Interrupted while adding WAL batch to ship queue"); Thread.currentThread().interrupt(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 53cd0845a9cf..07e7592ffee7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -316,9 +316,8 @@ private boolean openNextLog() throws IOException { return false; } - private Path getArchivedLog(Path path) throws IOException { + Path getArchivedLog(Path path) throws IOException { Path walRootDir = CommonFSUtils.getWALRootDir(conf); - // Try found the log in old dir Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path archivedLogLocation = new Path(oldLogDir, path.getName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index ad77c9d90e69..c8909d67d3f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; @@ -716,4 +717,52 @@ public void testCleanClosedWALs() throws Exception { assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); } } + + /** + * Tests that we handle EOFException properly if the wal has moved to oldWALs directory. + * @throws Exception exception + */ + @Test + public void testEOFExceptionInOldWALsDirectory() throws Exception { + assertEquals(1, logQueue.getQueueSize(fakeWalGroupId)); + AbstractFSWAL abstractWAL = (AbstractFSWAL)log; + Path emptyLogFile = abstractWAL.getCurrentFileName(); + log.rollWriter(true); + + // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously. + // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to + // oldWALs directory. + Waiter.waitFor(CONF, 5000, + (Waiter.Predicate) () ->abstractWAL.getInflightWALCloseCount() == 0); + // There will 2 logs in the queue. + assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); + + Configuration localConf = new Configuration(CONF); + localConf.setInt("replication.source.maxretriesmultiplier", 1); + localConf.setBoolean("replication.source.eof.autorecovery", true); + + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, localConf, 0, log, + null, logQueue.getMetrics(), fakeWalGroupId)) { + // Get the archived dir path for the first wal. + Path archivePath = entryStream.getArchivedLog(emptyLogFile); + // Make sure that the wal path is not the same as archived Dir path. + assertNotEquals(emptyLogFile.toString(), archivePath.toString()); + assertTrue(fs.exists(archivePath)); + fs.truncate(archivePath, 0); + // make sure the size of the wal file is 0. + assertEquals(0, fs.getFileStatus(archivePath).getLen()); + } + + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.isPeerEnabled()).thenReturn(true); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + + // Start the reader thread. + createReader(false, localConf); + // Wait for the replication queue size to be 1. This means that we have handled + // 0 length wal from oldWALs directory. + Waiter.waitFor(localConf, 10000, + (Waiter.Predicate) () -> logQueue.getQueueSize(fakeWalGroupId) == 1); + } } From c76235715f11734cc001b0cae0bca082e576c2b7 Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Mon, 19 Jul 2021 16:59:46 -0400 Subject: [PATCH 2/6] HBASE-26093 Addressing CR comments --- .../regionserver/ReplicationSourceWALReader.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index eaf6c8d63ef4..fa6ec7c6c13d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -26,6 +26,7 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -169,13 +170,8 @@ public void run() { LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue"); Thread.currentThread().interrupt(); } finally { - try { - if (entryStream != null) { - entryStream.close(); - } - } catch (IOException ioe) { - LOG.warn("Exception while closing WALEntryStream", ioe); - } + IOUtils.closeQuietly(entryStream, + e -> LOG.warn("Exception while closing WALEntryStream", e)); } } } From a0d092bc42113064e8c8ca36bc32c72a21e83059 Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Tue, 20 Jul 2021 16:44:55 -0400 Subject: [PATCH 3/6] HBASE-26093 Addressing review comments --- .../ReplicationSourceWALReader.java | 19 +++++------- .../regionserver/WALEntryStream.java | 3 +- .../regionserver/TestBasicWALEntryStream.java | 29 +++++++++---------- 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index fa6ec7c6c13d..6d0a962d361b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -26,7 +26,6 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; @@ -125,12 +125,10 @@ public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream WALEntryBatch batch = null; - WALEntryStream entryStream = null; - try { - entryStream = + try (WALEntryStream entryStream = new WALEntryStream(logQueue, conf, currentPosition, - source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), - source.getSourceMetrics(), walGroupId); + source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + source.getSourceMetrics(), walGroupId)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can batch = null; if (!source.isPeerEnabled()) { @@ -159,7 +157,7 @@ public void run() { sleepMultiplier = 1; } } catch (WALEntryFilterRetryableException | IOException e) { // stream related - if (!handleEofException(e, batch, entryStream)) { + if (!handleEofException(e, batch)) { LOG.warn("Failed to read stream of replication entries", e); if (sleepMultiplier < maxRetriesMultiplier) { sleepMultiplier++; @@ -169,9 +167,6 @@ public void run() { } catch (InterruptedException e) { LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue"); Thread.currentThread().interrupt(); - } finally { - IOUtils.closeQuietly(entryStream, - e -> LOG.warn("Exception while closing WALEntryStream", e)); } } } @@ -274,7 +269,7 @@ private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStre * logs from replication queue * @return true only the IOE can be handled */ - private boolean handleEofException(Exception e, WALEntryBatch batch, WALEntryStream entryStream) { + private boolean handleEofException(Exception e, WALEntryBatch batch) { PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source // since we don't add current log to recovered source queue so it is safe to remove. @@ -284,7 +279,7 @@ private boolean handleEofException(Exception e, WALEntryBatch batch, WALEntryStr try { if (!fs.exists(path)) { // There is a chance that wal has moved to oldWALs directory, so look there also. - path = entryStream.getArchivedLog(path); + path = AbstractFSWALProvider.getArchivedLogPath(path, conf); } if (fs.getFileStatus(path).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: {}", path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 07e7592ffee7..53cd0845a9cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -316,8 +316,9 @@ private boolean openNextLog() throws IOException { return false; } - Path getArchivedLog(Path path) throws IOException { + private Path getArchivedLog(Path path) throws IOException { Path walRootDir = CommonFSUtils.getWALRootDir(conf); + // Try found the log in old dir Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path archivedLogLocation = new Path(oldLogDir, path.getName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index c8909d67d3f7..ec7e48764f52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -733,31 +734,27 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception { // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to // oldWALs directory. Waiter.waitFor(CONF, 5000, - (Waiter.Predicate) () ->abstractWAL.getInflightWALCloseCount() == 0); + (Waiter.Predicate) () -> abstractWAL.getInflightWALCloseCount() == 0); // There will 2 logs in the queue. assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); - Configuration localConf = new Configuration(CONF); - localConf.setInt("replication.source.maxretriesmultiplier", 1); - localConf.setBoolean("replication.source.eof.autorecovery", true); - - try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, localConf, 0, log, - null, logQueue.getMetrics(), fakeWalGroupId)) { - // Get the archived dir path for the first wal. - Path archivePath = entryStream.getArchivedLog(emptyLogFile); - // Make sure that the wal path is not the same as archived Dir path. - assertNotEquals(emptyLogFile.toString(), archivePath.toString()); - assertTrue(fs.exists(archivePath)); - fs.truncate(archivePath, 0); - // make sure the size of the wal file is 0. - assertEquals(0, fs.getFileStatus(archivePath).getLen()); - } + // Get the archived dir path for the first wal. + Path archivePath = AbstractFSWALProvider.getArchivedLogPath(emptyLogFile, CONF); + // Make sure that the wal path is not the same as archived Dir path. + assertNotEquals(emptyLogFile.toString(), archivePath.toString()); + assertTrue(fs.exists(archivePath)); + fs.truncate(archivePath, 0); + // make sure the size of the wal file is 0. + assertEquals(0, fs.getFileStatus(archivePath).getLen()); ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + Configuration localConf = new Configuration(CONF); + localConf.setInt("replication.source.maxretriesmultiplier", 1); + localConf.setBoolean("replication.source.eof.autorecovery", true); // Start the reader thread. createReader(false, localConf); // Wait for the replication queue size to be 1. This means that we have handled From 7a220d66180a1f72a211077abcad5843577af7ef Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Wed, 21 Jul 2021 09:03:16 -0400 Subject: [PATCH 4/6] @HBASE-26093 Addressing review comments --- .../ReplicationSourceWALReader.java | 2 +- .../regionserver/WALEntryStream.java | 30 ++-------------- .../hbase/wal/AbstractFSWALProvider.java | 34 ++++++++++++++++++- .../regionserver/TestBasicWALEntryStream.java | 2 +- 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 6d0a962d361b..c84f2e1a258f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -279,7 +279,7 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) { try { if (!fs.exists(path)) { // There is a chance that wal has moved to oldWALs directory, so look there also. - path = AbstractFSWALProvider.getArchivedLogPath(path, conf); + path = AbstractFSWALProvider.getArchivedLog(path, conf); } if (fs.getFileStatus(path).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: {}", path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 53cd0845a9cf..51826678a797 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; @@ -316,34 +317,9 @@ private boolean openNextLog() throws IOException { return false; } - private Path getArchivedLog(Path path) throws IOException { - Path walRootDir = CommonFSUtils.getWALRootDir(conf); - - // Try found the log in old dir - Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - // Try found the log in the seperate old log dir - oldLogDir = - new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) - .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); - archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - LOG.error("Couldn't locate log: " + path); - return path; - } - private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(path); + Path archivedLog = AbstractFSWALProvider.getArchivedLog(path, conf); if (!path.equals(archivedLog)) { openReader(archivedLog); } else { @@ -408,7 +384,7 @@ private void resetReader() throws IOException { seek(); } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(currentPath); + Path archivedLog = AbstractFSWALProvider.getArchivedLog(currentPath, conf); if (!currentPath.equals(archivedLog)) { openReader(archivedLog); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 4b74e10b35f4..374f2c8a0df4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.wal; - import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -500,6 +499,39 @@ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOEx } } + /** + * Get the archived WAL file path + * @param path - active WAL file path + * @param conf - configuration + * @return archived path if exists, path - otherwise + * @throws IOException exception + */ + public static Path getArchivedLog(Path path, Configuration conf) throws IOException { + Path walRootDir = CommonFSUtils.getWALRootDir(conf); + FileSystem fs = path.getFileSystem(conf); + // Try finding the log in old dir + Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + ServerName serverName = getServerNameFromWALDirectoryName(path); + // Try finding the log in separate old log dir + oldLogDir = + new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) + .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); + archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + LOG.error("Couldn't locate log: " + path); + return path; + } + /** * Opens WAL reader with retries and additional exception handling * @param path path to WAL file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index ec7e48764f52..75eb98991003 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -739,7 +739,7 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception { assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); // Get the archived dir path for the first wal. - Path archivePath = AbstractFSWALProvider.getArchivedLogPath(emptyLogFile, CONF); + Path archivePath = AbstractFSWALProvider.getArchivedLog(emptyLogFile, CONF); // Make sure that the wal path is not the same as archived Dir path. assertNotEquals(emptyLogFile.toString(), archivePath.toString()); assertTrue(fs.exists(archivePath)); From c53892b240e4b5be6dbfd366a0e03ec18173d1cf Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Wed, 21 Jul 2021 10:37:55 -0400 Subject: [PATCH 5/6] @HBASE-26093 Addressing review comments --- .../regionserver/ReplicationSourceWALReader.java | 5 +++-- .../hbase/replication/regionserver/WALEntryStream.java | 9 ++++----- .../apache/hadoop/hbase/wal/AbstractFSWALProvider.java | 8 ++++---- .../regionserver/TestBasicWALEntryStream.java | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index c84f2e1a258f..9af91e5a59f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -279,9 +279,10 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) { try { if (!fs.exists(path)) { // There is a chance that wal has moved to oldWALs directory, so look there also. - path = AbstractFSWALProvider.getArchivedLog(path, conf); + path = AbstractFSWALProvider.findArchivedLog(path, conf); + // path is null if it couldn't find archive path. } - if (fs.getFileStatus(path).getLen() == 0) { + if (path != null && fs.getFileStatus(path).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: {}", path); logQueue.remove(walGroupId); currentPosition = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 51826678a797..f04819d4f828 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -319,8 +318,8 @@ private boolean openNextLog() throws IOException { private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { // If the log was archived, continue reading from there - Path archivedLog = AbstractFSWALProvider.getArchivedLog(path, conf); - if (!path.equals(archivedLog)) { + Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf); + if (archivedLog != null) { openReader(archivedLog); } else { throw fnfe; @@ -384,8 +383,8 @@ private void resetReader() throws IOException { seek(); } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there - Path archivedLog = AbstractFSWALProvider.getArchivedLog(currentPath, conf); - if (!currentPath.equals(archivedLog)) { + Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf); + if (archivedLog != null) { openReader(archivedLog); } else { throw fnfe; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 374f2c8a0df4..893a32603a5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -500,13 +500,13 @@ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOEx } /** - * Get the archived WAL file path + * Find the archived WAL file path if it is not able to locate in WALs dir. * @param path - active WAL file path * @param conf - configuration - * @return archived path if exists, path - otherwise + * @return archived path if exists, null - otherwise * @throws IOException exception */ - public static Path getArchivedLog(Path path, Configuration conf) throws IOException { + public static Path findArchivedLog(Path path, Configuration conf) throws IOException { Path walRootDir = CommonFSUtils.getWALRootDir(conf); FileSystem fs = path.getFileSystem(conf); // Try finding the log in old dir @@ -529,7 +529,7 @@ public static Path getArchivedLog(Path path, Configuration conf) throws IOExcept } LOG.error("Couldn't locate log: " + path); - return path; + return null; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index 75eb98991003..b07b5b42dc97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -739,9 +739,9 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception { assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); // Get the archived dir path for the first wal. - Path archivePath = AbstractFSWALProvider.getArchivedLog(emptyLogFile, CONF); + Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF); // Make sure that the wal path is not the same as archived Dir path. - assertNotEquals(emptyLogFile.toString(), archivePath.toString()); + assertNotNull(archivePath); assertTrue(fs.exists(archivePath)); fs.truncate(archivePath, 0); // make sure the size of the wal file is 0. From 845044070a65765b271978780d5f76e00d34e6d5 Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Wed, 21 Jul 2021 17:00:19 -0400 Subject: [PATCH 6/6] HBASE-26093 Fixing test failures --- .../org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 893a32603a5e..ffe5c422a5b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -507,6 +507,10 @@ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOEx * @throws IOException exception */ public static Path findArchivedLog(Path path, Configuration conf) throws IOException { + // If the path contains oldWALs keyword then exit early. + if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) { + return null; + } Path walRootDir = CommonFSUtils.getWALRootDir(conf); FileSystem fs = path.getFileSystem(conf); // Try finding the log in old dir