From 80e65ce0057c507c61ff5a6738f642ed049eee7c Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Fri, 11 Apr 2025 12:56:06 +0530 Subject: [PATCH 1/7] Store bulkload files in daywise bucket as well --- .../ContinuousBackupReplicationEndpoint.java | 24 ++++++++++++++----- ...stContinuousBackupReplicationEndpoint.java | 20 ++++++++++++++-- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index 34fcd76bf9c5..eeacc8fbf34c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -23,6 +23,7 @@ import java.util.Date; import java.util.List; import java.util.Map; +import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -304,7 +305,7 @@ private void backupWalEntries(long day, List walEntries) throws IOExc walWriter.append(entry); } walWriter.sync(true); - uploadBulkLoadFiles(bulkLoadFiles); + uploadBulkLoadFiles(day, bulkLoadFiles); } catch (UncheckedIOException e) { String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day; LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day, @@ -314,9 +315,7 @@ private void backupWalEntries(long day, List walEntries) throws IOExc } private FSHLogProvider.Writer createWalWriter(long dayInMillis) { - // Convert dayInMillis to "yyyy-MM-dd" format - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - String dayDirectoryName = dateFormat.format(new Date(dayInMillis)); + String dayDirectoryName = formatToDateString(dayInMillis); FileSystem fs = backupFileSystemManager.getBackupFs(); Path walsDir = backupFileSystemManager.getWalsDir(); @@ -376,7 +375,7 @@ private void close() { } } - private void uploadBulkLoadFiles(List bulkLoadFiles) throws IOException { + private void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) throws IOException { LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId), bulkLoadFiles.size()); @@ -384,9 +383,13 @@ private void uploadBulkLoadFiles(List bulkLoadFiles) throws IOException { LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId), bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); } + String dayDirectoryName = formatToDateString(dayInMillis); + Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName); + backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir); + for (Path file : bulkLoadFiles) { Path sourcePath = getBulkLoadFileStagingPath(file); - Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), file); + Path destPath = new Path(bulkloadDir, file); try { LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, @@ -407,6 +410,15 @@ private void uploadBulkLoadFiles(List bulkLoadFiles) throws IOException { LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId)); } + /** + * Convert dayInMillis to "yyyy-MM-dd" format + */ + private String formatToDateString(long dayInMillis) { + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + return dateFormat.format(new Date(dayInMillis)); + } + private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException { FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); Path rootDir = CommonFSUtils.getRootDir(conf); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java index cd1f758f7607..253675f85d97 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -282,6 +283,7 @@ public void testDayWiseWALBackup() throws IOException { long oneDayBackTime = currentTime - ONE_DAY_IN_MILLISECONDS; SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); String expectedPrevDayDir = dateFormat.format(new Date(oneDayBackTime)); String expectedCurrentDayDir = dateFormat.format(new Date(currentTime)); @@ -437,8 +439,22 @@ private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles, assertEquals(0, getRowCount(tableName)); replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName); - replayBulkLoadHFilesIfPresent(new Path(backupRootDir, BULKLOAD_FILES_DIR).toString(), - tableName); + + // replay Bulk loaded HFiles if Present + try { + Path bulkloadDir = new Path(backupRootDir, BULKLOAD_FILES_DIR); + if (fs.exists(bulkloadDir)) { + FileStatus[] directories = fs.listStatus(bulkloadDir); + for (FileStatus dirStatus : directories) { + if (dirStatus.isDirectory()) { + replayBulkLoadHFilesIfPresent(dirStatus.getPath().toString(), tableName); + } + } + } + } catch (Exception e) { + fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage()); + } + assertEquals(expectedRows, getRowCount(tableName)); } } From bdc0de5c0f494fb52517faafb1896965a7a0c874 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Wed, 21 May 2025 11:48:39 +0530 Subject: [PATCH 2/7] Integrate backup WAL cleanup logic with the delete command --- .../hbase/backup/impl/BackupCommands.java | 147 ++++++++++++++ .../hbase/backup/impl/BackupSystemTable.java | 21 ++ .../backup/TestBackupDeleteWithCleanup.java | 184 ++++++++++++++++++ 3 files changed, 352 insertions(+) create mode 100644 hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 804dc7141a19..e36bce773baf 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.backup.impl; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH; @@ -47,18 +49,26 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import java.io.IOException; import java.net.URI; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TimeZone; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -71,6 +81,7 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager; import org.apache.hadoop.hbase.backup.util.BackupSet; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Connection; @@ -649,6 +660,8 @@ public void execute() throws IOException { } else if (cmdline.hasOption(OPTION_LIST)) { executeDeleteListOfBackups(cmdline, isForceDelete); } + + cleanUpUnusedBackupWALs(); } private void executeDeleteOlderThan(CommandLine cmdline, boolean isForceDelete) @@ -876,6 +889,140 @@ private boolean canAnyOtherBackupCover(List allBackups, BackupInfo c return false; } + /** + * Cleans up Write-Ahead Logs (WALs) that are no longer required for PITR after a successful + * backup deletion. + */ + private void cleanUpUnusedBackupWALs() throws IOException { + Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); + String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + + if (backupWalDir == null || backupWalDir.isEmpty()) { + System.out.println("No WAL directory specified for continuous backup. Skipping cleanup."); + return; + } + + try (Connection conn = ConnectionFactory.createConnection(conf); + BackupSystemTable sysTable = new BackupSystemTable(conn)) { + // Get list of tables under continuous backup + Map continuousBackupTables = sysTable.getContinuousBackupTableSet(); + if (continuousBackupTables.isEmpty()) { + System.out.println("No continuous backups configured. Skipping WAL cleanup."); + return; + } + + // Find the earliest timestamp after which WALs are still needed + long cutoffTimestamp = determineWALCleanupCutoffTime(sysTable); + if (cutoffTimestamp == 0) { + System.err.println("ERROR: No valid full backup found. Skipping WAL cleanup."); + return; + } + + // Update metadata before actual cleanup to avoid inconsistencies + updateBackupTableStartTimes(sysTable, cutoffTimestamp); + + // Delete WAL files older than cutoff timestamp + deleteOldWALFiles(conf, backupWalDir, cutoffTimestamp); + + } + } + + /** + * Determines the cutoff time for cleaning WAL files. + * @param sysTable Backup system table + * @return cutoff timestamp or 0 if not found + */ + private long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOException { + List backupInfos = sysTable.getBackupInfos(BackupState.COMPLETE); + Collections.reverse(backupInfos); // Start from oldest + + for (BackupInfo backupInfo : backupInfos) { + if (BackupType.FULL.equals(backupInfo.getType())) { + return backupInfo.getStartTs(); + } + } + return 0; + } + + /** + * Updates the start time for continuous backups if older than cutoff timestamp. + * @param sysTable Backup system table + * @param cutoffTimestamp Timestamp before which WALs are no longer needed + */ + private void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestamp) + throws IOException { + + Map backupTables = sysTable.getContinuousBackupTableSet(); + Set tablesToUpdate = new HashSet<>(); + + for (Map.Entry entry : backupTables.entrySet()) { + if (entry.getValue() < cutoffTimestamp) { + tablesToUpdate.add(entry.getKey()); + } + } + + if (!tablesToUpdate.isEmpty()) { + sysTable.updateContinuousBackupTableSet(tablesToUpdate, cutoffTimestamp); + } + } + + /** + * Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp. + */ + private void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime) + throws IOException { + System.out.println("Starting WAL cleanup in backup directory: " + backupWalDir + + " with cutoff time: " + cutoffTime); + + BackupFileSystemManager manager = + new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir); + FileSystem fs = manager.getBackupFs(); + Path walDir = manager.getWalsDir(); + Path bulkloadDir = manager.getBulkLoadFilesDir(); + + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + System.out.println("Listing directories under: " + walDir); + + FileStatus[] directories = fs.listStatus(walDir); + + for (FileStatus dirStatus : directories) { + if (!dirStatus.isDirectory()) { + continue; // Skip files, we only want directories + } + + Path dirPath = dirStatus.getPath(); + String dirName = dirPath.getName(); + + try { + long dayStart = parseDayDirectory(dirName, dateFormat); + System.out + .println("Checking WAL directory: " + dirName + " (Start Time: " + dayStart + ")"); + + // If WAL files of that day are older than cutoff time, delete them + if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) { + System.out.println("Deleting outdated WAL directory: " + dirPath); + fs.delete(dirPath, true); + fs.delete(new Path(bulkloadDir, dirName), true); + } + } catch (ParseException e) { + System.out.println("WARNING: Failed to parse directory name '" + dirName + + "'. Skipping. Error: " + e.getMessage()); + } catch (IOException e) { + System.out.println("WARNING: Failed to delete directory '" + dirPath + + "'. Skipping. Error: " + e.getMessage()); + } + } + + System.out.println("Completed WAL cleanup for backup directory: " + backupWalDir); + } + + private long parseDayDirectory(String dayDir, SimpleDateFormat dateFormat) + throws ParseException { + return dateFormat.parse(dayDir).getTime(); + } + @Override protected void printUsage() { System.out.println(DELETE_CMD_USAGE); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 1bdf6179585e..ab356cfbf3b5 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -1056,6 +1056,27 @@ public void addContinuousBackupTableSet(Set tables, long startTimesta } } + /** + * Updates the system table with the new start timestamps for continuous backup tables. + * @param tablesToUpdate The set of tables that need their start timestamps updated. + * @param newStartTimestamp The new start timestamp to be set. + */ + public void updateContinuousBackupTableSet(Set tablesToUpdate, long newStartTimestamp) + throws IOException { + try (Table table = connection.getTable(tableName)) { + Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET)); + + for (TableName tableName : tablesToUpdate) { + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(tableName.getNameAsString()), + Bytes.toBytes(newStartTimestamp)); + } + + table.put(put); + LOG.info("Successfully updated start timestamps for {} tables in the backup system table.", + tablesToUpdate.size()); + } + } + /** * Removes tables from the global continuous backup set. Only removes entries that currently exist * in the backup system table. diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java new file mode 100644 index 000000000000..bd57ded455c8 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.ToolRunner; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category(LargeTests.class) +public class TestBackupDeleteWithCleanup extends TestBackupBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBackupDeleteWithCleanup.class); + + String backupWalDirName = "TestBackupDeleteWithCleanup"; + + @Test + public void testBackupDeleteWithCleanupLogic() throws Exception { + Path root = TEST_UTIL.getDataTestDirOnTestFS(); + Path backupWalDir = new Path(root, backupWalDirName); + conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); + FileSystem fs = FileSystem.get(conf1); + fs.mkdirs(backupWalDir); + + // Step 1: Setup Backup Folders + long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); + setupBackupFolders(fs, backupWalDir, currentTime); + + // Log the directory structure before cleanup + logDirectoryStructure(fs, backupWalDir, "Directory structure BEFORE cleanup:"); + + // Step 2: Simulate Backup Creation + BackupSystemTable backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection()); + backupSystemTable.addContinuousBackupTableSet(Set.of(table1), + currentTime - (2 * ONE_DAY_IN_MILLISECONDS)); + + EnvironmentEdgeManager + .injectEdge(() -> System.currentTimeMillis() - (2 * ONE_DAY_IN_MILLISECONDS)); + String backupId = fullTableBackup(Lists.newArrayList(table1)); + assertTrue(checkSucceeded(backupId)); + + String anotherBackupId = fullTableBackup(Lists.newArrayList(table1)); + assertTrue(checkSucceeded(anotherBackupId)); + + // Step 3: Run Delete Command + int ret = + ToolRunner.run(conf1, new BackupDriver(), new String[] { "delete", "-l", backupId, "-fd" }); + assertEquals(0, ret); + + // Log the directory structure after cleanup + logDirectoryStructure(fs, backupWalDir, "Directory structure AFTER cleanup:"); + + // Step 4: Verify Cleanup + verifyBackupCleanup(fs, backupWalDir, currentTime); + + // Step 5: Verify System Table Update + verifySystemTableUpdate(backupSystemTable, currentTime); + } + + private static void setupBackupFolders(FileSystem fs, Path backupWalDir, long currentTime) + throws IOException { + Path walsDir = new Path(backupWalDir, WALS_DIR); + Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR); + + fs.mkdirs(walsDir); + fs.mkdirs(bulkLoadDir); + + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + + for (int i = 0; i < 5; i++) { + String dateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS))); + fs.mkdirs(new Path(walsDir, dateStr)); + fs.mkdirs(new Path(bulkLoadDir, dateStr)); + } + } + + private static void verifyBackupCleanup(FileSystem fs, Path backupWalDir, long currentTime) + throws IOException { + Path walsDir = new Path(backupWalDir, WALS_DIR); + Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + + // Expect folders older than 3 days to be deleted + for (int i = 3; i < 5; i++) { + String oldDateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS))); + Path walPath = new Path(walsDir, oldDateStr); + Path bulkLoadPath = new Path(bulkLoadDir, oldDateStr); + assertFalse("Old WAL directory (" + walPath + ") should be deleted, but it exists!", + fs.exists(walPath)); + assertFalse("Old BulkLoad directory (" + bulkLoadPath + ") should be deleted, but it exists!", + fs.exists(bulkLoadPath)); + } + + // Expect folders within the last 3 days to exist + for (int i = 0; i < 3; i++) { + String recentDateStr = + dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS))); + Path walPath = new Path(walsDir, recentDateStr); + Path bulkLoadPath = new Path(bulkLoadDir, recentDateStr); + + assertTrue("Recent WAL directory (" + walPath + ") should exist, but it is missing!", + fs.exists(walPath)); + assertTrue( + "Recent BulkLoad directory (" + bulkLoadPath + ") should exist, but it is missing!", + fs.exists(bulkLoadPath)); + } + } + + private void verifySystemTableUpdate(BackupSystemTable backupSystemTable, long currentTime) + throws IOException { + Map updatedTables = backupSystemTable.getContinuousBackupTableSet(); + + for (Map.Entry entry : updatedTables.entrySet()) { + long updatedStartTime = entry.getValue(); + + // Ensure that the updated start time is not earlier than the expected cutoff time + assertTrue("System table update failed!", + updatedStartTime >= (currentTime - (3 * ONE_DAY_IN_MILLISECONDS))); + } + } + + private static void logDirectoryStructure(FileSystem fs, Path dir, String message) + throws IOException { + System.out.println(message); + listDirectory(fs, dir, " "); + } + + private static void listDirectory(FileSystem fs, Path dir, String indent) throws IOException { + if (!fs.exists(dir)) { + System.out.println(indent + "[Missing] " + dir); + return; + } + FileStatus[] files = fs.listStatus(dir); + System.out.println(indent + dir); + for (FileStatus file : files) { + if (file.isDirectory()) { + listDirectory(fs, file.getPath(), indent + " "); + } else { + System.out.println(indent + " " + file.getPath()); + } + } + } +} From 331ca512ed6a9ab1e94e3429ce1f9d96d1cbef15 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Tue, 3 Jun 2025 19:15:56 +0530 Subject: [PATCH 3/7] address the review comments --- .../org/apache/hadoop/hbase/backup/impl/BackupCommands.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index e36bce773baf..2fddf5fe8e9a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -91,6 +91,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; +import org.apache.hbase.thirdparty.com.google.common.base.Strings; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; @@ -897,7 +898,7 @@ private void cleanUpUnusedBackupWALs() throws IOException { Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - if (backupWalDir == null || backupWalDir.isEmpty()) { + if (Strings.isNullOrEmpty(backupWalDir)) { System.out.println("No WAL directory specified for continuous backup. Skipping cleanup."); return; } From 86ba7e2461c1c8abc0cf1da62f4d5b060b5b8627 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Wed, 4 Jun 2025 12:56:52 +0530 Subject: [PATCH 4/7] address the review comments --- .../org/apache/hadoop/hbase/backup/impl/BackupCommands.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 2fddf5fe8e9a..7adba5791738 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -903,8 +903,7 @@ private void cleanUpUnusedBackupWALs() throws IOException { return; } - try (Connection conn = ConnectionFactory.createConnection(conf); - BackupSystemTable sysTable = new BackupSystemTable(conn)) { + try (BackupSystemTable sysTable = new BackupSystemTable(conn)) { // Get list of tables under continuous backup Map continuousBackupTables = sysTable.getContinuousBackupTableSet(); if (continuousBackupTables.isEmpty()) { From 3655d484e60e3534c7aac8f89f0e4ed0de22bfe3 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Wed, 4 Jun 2025 22:23:35 +0530 Subject: [PATCH 5/7] address the review comments --- .../apache/hadoop/hbase/backup/impl/BackupSystemTable.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index ab356cfbf3b5..3adc43d2cfed 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -1063,6 +1063,11 @@ public void addContinuousBackupTableSet(Set tables, long startTimesta */ public void updateContinuousBackupTableSet(Set tablesToUpdate, long newStartTimestamp) throws IOException { + if (tablesToUpdate == null || tablesToUpdate.isEmpty()) { + LOG.warn("No tables provided for updating start timestamps."); + return; + } + try (Table table = connection.getTable(tableName)) { Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET)); From 73f0e1c176087fd2fc0da6a40a4ad4c3bed53f77 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Tue, 10 Jun 2025 12:28:41 +0530 Subject: [PATCH 6/7] add more unit tests to cover all cases --- hbase-backup/pom.xml | 6 + .../hbase/backup/impl/BackupCommands.java | 6 +- .../backup/TestBackupDeleteWithCleanup.java | 6 +- .../hbase/backup/impl/TestBackupCommands.java | 177 ++++++++++++++++++ 4 files changed, 189 insertions(+), 6 deletions(-) create mode 100644 hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java diff --git a/hbase-backup/pom.xml b/hbase-backup/pom.xml index 24c4ca4390f9..daf25a8a68fa 100644 --- a/hbase-backup/pom.xml +++ b/hbase-backup/pom.xml @@ -163,6 +163,12 @@ junit test + + org.mockito + mockito-inline + 4.11.0 + test + diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 7adba5791738..11b6890ed038 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -932,7 +932,7 @@ private void cleanUpUnusedBackupWALs() throws IOException { * @param sysTable Backup system table * @return cutoff timestamp or 0 if not found */ - private long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOException { + long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOException { List backupInfos = sysTable.getBackupInfos(BackupState.COMPLETE); Collections.reverse(backupInfos); // Start from oldest @@ -949,7 +949,7 @@ private long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IO * @param sysTable Backup system table * @param cutoffTimestamp Timestamp before which WALs are no longer needed */ - private void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestamp) + void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestamp) throws IOException { Map backupTables = sysTable.getContinuousBackupTableSet(); @@ -969,7 +969,7 @@ private void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoff /** * Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp. */ - private void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime) + void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime) throws IOException { System.out.println("Starting WAL cleanup in backup directory: " + backupWalDir + " with cutoff time: " + cutoffTime); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java index bd57ded455c8..6d76ac4e89bf 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java @@ -98,7 +98,7 @@ public void testBackupDeleteWithCleanupLogic() throws Exception { verifySystemTableUpdate(backupSystemTable, currentTime); } - private static void setupBackupFolders(FileSystem fs, Path backupWalDir, long currentTime) + public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long currentTime) throws IOException { Path walsDir = new Path(backupWalDir, WALS_DIR); Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR); @@ -160,13 +160,13 @@ private void verifySystemTableUpdate(BackupSystemTable backupSystemTable, long c } } - private static void logDirectoryStructure(FileSystem fs, Path dir, String message) + public static void logDirectoryStructure(FileSystem fs, Path dir, String message) throws IOException { System.out.println(message); listDirectory(fs, dir, " "); } - private static void listDirectory(FileSystem fs, Path dir, String indent) throws IOException { + public static void listDirectory(FileSystem fs, Path dir, String indent) throws IOException { if (!fs.exists(dir)) { System.out.println(indent + "[Missing] " + dir); return; diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java new file mode 100644 index 000000000000..b2ebbd640bbd --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure; +import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.TestBackupBase; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestBackupCommands extends TestBackupBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBackupCommands.class); + + String backupWalDirName = "TestBackupWalDir"; + + /** + * Tests whether determineWALCleanupCutoffTime returns the correct FULL backup start timestamp. + */ + @Test + public void testDetermineWALCleanupCutoffTimeOfCleanupCommand() throws IOException { + // GIVEN + BackupSystemTable sysTable = mock(BackupSystemTable.class); + + BackupInfo full1 = new BackupInfo(); + full1.setType(BackupType.FULL); + full1.setStartTs(1111L); + full1.setState(BackupInfo.BackupState.COMPLETE); + + BackupInfo inc = new BackupInfo(); + inc.setType(BackupType.INCREMENTAL); + inc.setStartTs(2222L); + inc.setState(BackupInfo.BackupState.COMPLETE); + + BackupInfo full2 = new BackupInfo(); + full2.setType(BackupType.FULL); + full2.setStartTs(3333L); + full2.setState(BackupInfo.BackupState.COMPLETE); + + // Ordered as newest to oldest, will be reversed in the method + List backupInfos = List.of(full2, inc, full1); + when(sysTable.getBackupInfos(BackupInfo.BackupState.COMPLETE)) + .thenReturn(new ArrayList<>(backupInfos)); + + // WHEN + BackupCommands.DeleteCommand command = new BackupCommands.DeleteCommand(conf1, null); + long cutoff = command.determineWALCleanupCutoffTime(sysTable); + + // THEN + assertEquals("Expected oldest FULL backup timestamp", 1111L, cutoff); + } + + @Test + public void testUpdateBackupTableStartTimesOfCleanupCommand() throws IOException { + // GIVEN + BackupSystemTable mockSysTable = mock(BackupSystemTable.class); + + TableName tableA = TableName.valueOf("ns", "tableA"); + TableName tableB = TableName.valueOf("ns", "tableB"); + TableName tableC = TableName.valueOf("ns", "tableC"); + + long cutoffTimestamp = 1_000_000L; + + // Simulate current table start times + Map tableSet = Map.of(tableA, 900_000L, // Before cutoff → should be updated + tableB, 1_100_000L, // After cutoff → should NOT be updated + tableC, 800_000L // Before cutoff → should be updated + ); + + when(mockSysTable.getContinuousBackupTableSet()).thenReturn(tableSet); + + // WHEN + BackupCommands.DeleteCommand command = new BackupCommands.DeleteCommand(conf1, null); + command.updateBackupTableStartTimes(mockSysTable, cutoffTimestamp); + + // THEN + Set expectedUpdated = Set.of(tableA, tableC); + verify(mockSysTable).updateContinuousBackupTableSet(expectedUpdated, cutoffTimestamp); + } + + @Test + public void testDeleteOldWALFilesOfCleanupCommand() throws IOException { + // GIVEN + Path root = TEST_UTIL.getDataTestDirOnTestFS(); + Path backupWalDir = new Path(root, backupWalDirName); + conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); + + FileSystem fs = FileSystem.get(conf1); + fs.mkdirs(backupWalDir); + + long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); + setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WAL/bulk folders + + logDirectoryStructure(fs, backupWalDir, "Before cleanup:"); + + // Delete files older than 2 days from current time + long cutoffTime = currentTime - (2 * ONE_DAY_IN_MILLISECONDS); + + // WHEN + BackupCommands.DeleteCommand command = new BackupCommands.DeleteCommand(conf1, null); + command.deleteOldWALFiles(conf1, backupWalDir.toString(), cutoffTime); + + logDirectoryStructure(fs, backupWalDir, "After cleanup:"); + + // THEN + verifyCleanupOutcome(fs, backupWalDir, currentTime, cutoffTime); + } + + private static void verifyCleanupOutcome(FileSystem fs, Path backupWalDir, long currentTime, + long cutoffTime) throws IOException { + Path walsDir = new Path(backupWalDir, WALS_DIR); + Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + for (int i = 0; i < 5; i++) { + long dayTime = currentTime - (i * ONE_DAY_IN_MILLISECONDS); + String dayDir = dateFormat.format(new Date(dayTime)); + Path walPath = new Path(walsDir, dayDir); + Path bulkPath = new Path(bulkLoadDir, dayDir); + + if (dayTime + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) { + assertFalse("Old WAL dir should be deleted: " + walPath, fs.exists(walPath)); + assertFalse("Old BulkLoad dir should be deleted: " + bulkPath, fs.exists(bulkPath)); + } else { + assertTrue("Recent WAL dir should exist: " + walPath, fs.exists(walPath)); + assertTrue("Recent BulkLoad dir should exist: " + bulkPath, fs.exists(bulkPath)); + } + } + } +} From e3c24f04e64a256238faa09468140e2a8fb4da0b Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Tue, 10 Jun 2025 22:05:13 +0530 Subject: [PATCH 7/7] address the review comments --- hbase-backup/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-backup/pom.xml b/hbase-backup/pom.xml index daf25a8a68fa..45b5c999324b 100644 --- a/hbase-backup/pom.xml +++ b/hbase-backup/pom.xml @@ -166,7 +166,6 @@ org.mockito mockito-inline - 4.11.0 test