From ddba1291083868c2d5925874d790785a8d5d81de Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Fri, 13 Jun 2025 21:32:06 +0530 Subject: [PATCH 1/5] HBASE-29219: Ignore Empty WAL Files While Consuming Backed-Up WAL Files --- .../hbase/mapreduce/WALInputFormat.java | 11 ++++- .../hadoop/hbase/mapreduce/WALPlayer.java | 2 + .../hbase/mapreduce/TestWALInputFormat.java | 47 +++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 8d6e91633f7a..a49cdca5a845 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -328,14 +328,21 @@ List getSplits(final JobContext context, final String startKey, fina throw e; } } + + boolean ignoreEmptyFiles = + conf.getBoolean(WALPlayer.IGNORE_EMPTY_FILES, WALPlayer.DEFAULT_IGNORE_EMPTY_FILES); List splits = new ArrayList(allFiles.size()); for (FileStatus file : allFiles) { + if (ignoreEmptyFiles && file.getLen() == 0) { + LOG.warn("Ignoring empty file: " + file.getPath()); + continue; + } splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); } return splits; } - private Path[] getInputPaths(Configuration conf) { + Path[] getInputPaths(Configuration conf) { String inpDirs = conf.get(FileInputFormat.INPUT_DIR); return StringUtils .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); @@ -349,7 +356,7 @@ private Path[] getInputPaths(Configuration conf) { * equal to this value else we will filter out the file. If name does not seem to * have a timestamp, we will just return it w/o filtering. */ - private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) + List getFiles(FileSystem fs, Path dir, long startTime, long endTime) throws IOException { List result = new ArrayList<>(); LOG.debug("Scanning " + dir.toString() + " for WAL files"); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 5e2dc0902e0d..9edcba5b3414 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -80,6 +80,8 @@ public class WALPlayer extends Configured implements Tool { public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support"; + public final static String IGNORE_EMPTY_FILES = "wal.input.ignore.empty.files"; + public final static boolean DEFAULT_IGNORE_EMPTY_FILES = true; protected static final String tableSeparator = ";"; diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java index 70602a371668..b31ea014977c 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java @@ -18,16 +18,23 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -74,4 +81,44 @@ public void testAddFile() { WALInputFormat.addFile(lfss, lfs, now, now); assertEquals(8, lfss.size()); } + + @Test + public void testEmptyFileIsIgnoredWhenConfigured() throws IOException, InterruptedException { + List splits = getSplitsForEmptyFile(true); + assertTrue("Empty file should be ignored when IGNORE_EMPTY_FILES is true", splits.isEmpty()); + } + + @Test + public void testEmptyFileIsIncludedWhenNotIgnored() throws IOException, InterruptedException { + List splits = getSplitsForEmptyFile(false); + assertEquals("Empty file should be included when IGNORE_EMPTY_FILES is false", 1, + splits.size()); + } + + private List getSplitsForEmptyFile(boolean ignoreEmptyFiles) + throws IOException, InterruptedException { + Configuration conf = new Configuration(); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, ignoreEmptyFiles); + + JobContext jobContext = Mockito.mock(JobContext.class); + Mockito.when(jobContext.getConfiguration()).thenReturn(conf); + + LocatedFileStatus emptyFile = Mockito.mock(LocatedFileStatus.class); + Mockito.when(emptyFile.getLen()).thenReturn(0L); + Mockito.when(emptyFile.getPath()).thenReturn(new Path("/empty.wal")); + + WALInputFormat inputFormat = new WALInputFormat() { + @Override + Path[] getInputPaths(Configuration conf) { + return new Path[] { new Path("/input") }; + } + + @Override + List getFiles(FileSystem fs, Path inputPath, long startTime, long endTime) { + return Collections.singletonList(emptyFile); + } + }; + + return inputFormat.getSplits(jobContext, "", ""); + } } From 7b16f1714175df5f4e9131ceaa65a70f4f32dfd9 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Sat, 14 Jun 2025 22:07:48 +0530 Subject: [PATCH 2/5] change default behavior of ignoring wal files --- .../main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 9edcba5b3414..f13063e5e561 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -81,7 +81,7 @@ public class WALPlayer extends Configured implements Tool { public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support"; public final static String IGNORE_EMPTY_FILES = "wal.input.ignore.empty.files"; - public final static boolean DEFAULT_IGNORE_EMPTY_FILES = true; + public final static boolean DEFAULT_IGNORE_EMPTY_FILES = false; protected static final String tableSeparator = ";"; From c17942a3066e03e8f37546964e92a69cbe4db470 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Mon, 16 Jun 2025 12:32:41 +0530 Subject: [PATCH 3/5] add tests to WALPlayer class --- .../hbase/backup/impl/BackupAdminImpl.java | 2 + .../hadoop/hbase/mapreduce/TestWALPlayer.java | 43 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index 1e91258ba6cc..e82d9804f9dc 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; import java.text.ParseException; @@ -846,6 +847,7 @@ private Tool initializeWalPlayer(long startTime, long endTime) { Configuration conf = HBaseConfiguration.create(conn.getConfiguration()); conf.setLong(WALInputFormat.START_TIME_KEY, startTime); conf.setLong(WALInputFormat.END_TIME_KEY, endTime); + conf.setBoolean(IGNORE_EMPTY_FILES, true); Tool walPlayer = new WALPlayer(); walPlayer.setConf(conf); return walPlayer; diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index b39d04802c98..77764fe63d3a 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -35,6 +36,7 @@ import java.util.ArrayList; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -338,4 +340,45 @@ public void testMainMethod() throws Exception { } + @Test + public void testIgnoreEmptyWALFiles() throws Exception { + // Create an empty WAL file in a test input directory + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path inputDir = new Path("/empty-wal-dir"); + dfs.mkdirs(inputDir); + + Path emptyWAL = new Path(inputDir, "empty.wal"); + FSDataOutputStream out = dfs.create(emptyWAL); + out.close(); // Creates a 0-byte file + + assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL)); + assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen()); + + // Run WALPlayer with IGNORE_EMPTY_FILES = true + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true); + + int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() }); + + assertEquals("WALPlayer should exit cleanly even with empty files", 0, exitCode); + } + + @Test + public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception { + // Create an empty WAL file again + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path inputDir = new Path("/fail-empty-wal-dir"); + dfs.mkdirs(inputDir); + Path emptyWAL = new Path(inputDir, "empty.wal"); + FSDataOutputStream out = dfs.create(emptyWAL); + out.close(); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, false); + + int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() }); + + assertNotEquals("WALPlayer should fail on empty files when not ignored", 0, exitCode); + } + } From 261d0662ed4b34c7c9a3d7f4ce3679cf2e27a27c Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Tue, 17 Jun 2025 21:18:25 +0530 Subject: [PATCH 4/5] remove duplicate code in TestWAlPlayer --- .../hadoop/hbase/mapreduce/TestWALPlayer.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 77764fe63d3a..7818f8d2f739 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -32,6 +32,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.concurrent.ThreadLocalRandom; @@ -342,43 +343,45 @@ public void testMainMethod() throws Exception { @Test public void testIgnoreEmptyWALFiles() throws Exception { - // Create an empty WAL file in a test input directory + Path inputDir = createEmptyWALFile("empty-wal-dir"); FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); - Path inputDir = new Path("/empty-wal-dir"); - dfs.mkdirs(inputDir); - Path emptyWAL = new Path(inputDir, "empty.wal"); - FSDataOutputStream out = dfs.create(emptyWAL); - out.close(); // Creates a 0-byte file assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL)); assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen()); - // Run WALPlayer with IGNORE_EMPTY_FILES = true Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true); int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() }); - assertEquals("WALPlayer should exit cleanly even with empty files", 0, exitCode); } @Test public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception { - // Create an empty WAL file again + Path inputDir = createEmptyWALFile("fail-empty-wal-dir"); FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); - Path inputDir = new Path("/fail-empty-wal-dir"); - dfs.mkdirs(inputDir); Path emptyWAL = new Path(inputDir, "empty.wal"); - FSDataOutputStream out = dfs.create(emptyWAL); - out.close(); + + assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL)); + assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen()); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, false); int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() }); - assertNotEquals("WALPlayer should fail on empty files when not ignored", 0, exitCode); } + private Path createEmptyWALFile(String walDir) throws IOException { + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path inputDir = new Path("/" + walDir); + dfs.mkdirs(inputDir); + + Path emptyWAL = new Path(inputDir, "empty.wal"); + FSDataOutputStream out = dfs.create(emptyWAL); + out.close(); // Explicitly closing the stream + + return inputDir; + } } From d8f931afd3b247c50bc3c9555565adbdfe91efa1 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Tue, 24 Jun 2025 21:21:46 +0530 Subject: [PATCH 5/5] add javadocs to new configuration --- .../apache/hadoop/hbase/mapreduce/WALPlayer.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index f13063e5e561..cea6da97649d 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -80,6 +80,22 @@ public class WALPlayer extends Configured implements Tool { public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support"; + + /** + * Configuration flag that controls how the WALPlayer handles empty input WAL files. + *

+ * If set to {@code true}, the WALPlayer will silently ignore empty files that cannot be parsed as + * valid WAL files. This is useful in scenarios where such files are expected (e.g., due to + * partial writes or cleanup operations). + *

+ *

+ * If set to {@code false} (default), the WALPlayer will throw an exception when it encounters an + * empty or un-parsable WAL file. This is useful for catching unexpected data issues early. + *

+ *

+ * Default value: {@link #DEFAULT_IGNORE_EMPTY_FILES} ({@code false}) + *

+ */ public final static String IGNORE_EMPTY_FILES = "wal.input.ignore.empty.files"; public final static boolean DEFAULT_IGNORE_EMPTY_FILES = false;