diff --git a/hbase-backup/pom.xml b/hbase-backup/pom.xml
index 24c4ca4390f9..45b5c999324b 100644
--- a/hbase-backup/pom.xml
+++ b/hbase-backup/pom.xml
@@ -163,6 +163,11 @@
junit
test
+
+ org.mockito
+ mockito-inline
+ test
+
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index 804dc7141a19..11b6890ed038 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.backup.impl;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
@@ -47,18 +49,26 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import java.io.IOException;
import java.net.URI;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -71,6 +81,7 @@
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
@@ -80,6 +91,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
@@ -649,6 +661,8 @@ public void execute() throws IOException {
} else if (cmdline.hasOption(OPTION_LIST)) {
executeDeleteListOfBackups(cmdline, isForceDelete);
}
+
+ cleanUpUnusedBackupWALs();
}
private void executeDeleteOlderThan(CommandLine cmdline, boolean isForceDelete)
@@ -876,6 +890,139 @@ private boolean canAnyOtherBackupCover(List allBackups, BackupInfo c
return false;
}
+ /**
+ * Cleans up Write-Ahead Logs (WALs) that are no longer required for PITR after a successful
+ * backup deletion.
+ */
+ private void cleanUpUnusedBackupWALs() throws IOException {
+ Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
+ String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+
+ if (Strings.isNullOrEmpty(backupWalDir)) {
+ System.out.println("No WAL directory specified for continuous backup. Skipping cleanup.");
+ return;
+ }
+
+ try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
+ // Get list of tables under continuous backup
+ Map continuousBackupTables = sysTable.getContinuousBackupTableSet();
+ if (continuousBackupTables.isEmpty()) {
+ System.out.println("No continuous backups configured. Skipping WAL cleanup.");
+ return;
+ }
+
+ // Find the earliest timestamp after which WALs are still needed
+ long cutoffTimestamp = determineWALCleanupCutoffTime(sysTable);
+ if (cutoffTimestamp == 0) {
+ System.err.println("ERROR: No valid full backup found. Skipping WAL cleanup.");
+ return;
+ }
+
+ // Update metadata before actual cleanup to avoid inconsistencies
+ updateBackupTableStartTimes(sysTable, cutoffTimestamp);
+
+ // Delete WAL files older than cutoff timestamp
+ deleteOldWALFiles(conf, backupWalDir, cutoffTimestamp);
+
+ }
+ }
+
+ /**
+ * Determines the cutoff time for cleaning WAL files.
+ * @param sysTable Backup system table
+ * @return cutoff timestamp or 0 if not found
+ */
+ long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOException {
+ List backupInfos = sysTable.getBackupInfos(BackupState.COMPLETE);
+ Collections.reverse(backupInfos); // Start from oldest
+
+ for (BackupInfo backupInfo : backupInfos) {
+ if (BackupType.FULL.equals(backupInfo.getType())) {
+ return backupInfo.getStartTs();
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Updates the start time for continuous backups if older than cutoff timestamp.
+ * @param sysTable Backup system table
+ * @param cutoffTimestamp Timestamp before which WALs are no longer needed
+ */
+ void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestamp)
+ throws IOException {
+
+ Map backupTables = sysTable.getContinuousBackupTableSet();
+ Set tablesToUpdate = new HashSet<>();
+
+ for (Map.Entry entry : backupTables.entrySet()) {
+ if (entry.getValue() < cutoffTimestamp) {
+ tablesToUpdate.add(entry.getKey());
+ }
+ }
+
+ if (!tablesToUpdate.isEmpty()) {
+ sysTable.updateContinuousBackupTableSet(tablesToUpdate, cutoffTimestamp);
+ }
+ }
+
+ /**
+ * Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp.
+ */
+ void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
+ throws IOException {
+ System.out.println("Starting WAL cleanup in backup directory: " + backupWalDir
+ + " with cutoff time: " + cutoffTime);
+
+ BackupFileSystemManager manager =
+ new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
+ FileSystem fs = manager.getBackupFs();
+ Path walDir = manager.getWalsDir();
+ Path bulkloadDir = manager.getBulkLoadFilesDir();
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ System.out.println("Listing directories under: " + walDir);
+
+ FileStatus[] directories = fs.listStatus(walDir);
+
+ for (FileStatus dirStatus : directories) {
+ if (!dirStatus.isDirectory()) {
+ continue; // Skip files, we only want directories
+ }
+
+ Path dirPath = dirStatus.getPath();
+ String dirName = dirPath.getName();
+
+ try {
+ long dayStart = parseDayDirectory(dirName, dateFormat);
+ System.out
+ .println("Checking WAL directory: " + dirName + " (Start Time: " + dayStart + ")");
+
+ // If WAL files of that day are older than cutoff time, delete them
+ if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
+ System.out.println("Deleting outdated WAL directory: " + dirPath);
+ fs.delete(dirPath, true);
+ fs.delete(new Path(bulkloadDir, dirName), true);
+ }
+ } catch (ParseException e) {
+ System.out.println("WARNING: Failed to parse directory name '" + dirName
+ + "'. Skipping. Error: " + e.getMessage());
+ } catch (IOException e) {
+ System.out.println("WARNING: Failed to delete directory '" + dirPath
+ + "'. Skipping. Error: " + e.getMessage());
+ }
+ }
+
+ System.out.println("Completed WAL cleanup for backup directory: " + backupWalDir);
+ }
+
+ private long parseDayDirectory(String dayDir, SimpleDateFormat dateFormat)
+ throws ParseException {
+ return dateFormat.parse(dayDir).getTime();
+ }
+
@Override
protected void printUsage() {
System.out.println(DELETE_CMD_USAGE);
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 1bdf6179585e..3adc43d2cfed 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -1056,6 +1056,32 @@ public void addContinuousBackupTableSet(Set tables, long startTimesta
}
}
+ /**
+ * Updates the system table with the new start timestamps for continuous backup tables.
+ * @param tablesToUpdate The set of tables that need their start timestamps updated.
+ * @param newStartTimestamp The new start timestamp to be set.
+ */
+ public void updateContinuousBackupTableSet(Set tablesToUpdate, long newStartTimestamp)
+ throws IOException {
+ if (tablesToUpdate == null || tablesToUpdate.isEmpty()) {
+ LOG.warn("No tables provided for updating start timestamps.");
+ return;
+ }
+
+ try (Table table = connection.getTable(tableName)) {
+ Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));
+
+ for (TableName tableName : tablesToUpdate) {
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(tableName.getNameAsString()),
+ Bytes.toBytes(newStartTimestamp));
+ }
+
+ table.put(put);
+ LOG.info("Successfully updated start timestamps for {} tables in the backup system table.",
+ tablesToUpdate.size());
+ }
+ }
+
/**
* Removes tables from the global continuous backup set. Only removes entries that currently exist
* in the backup system table.
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index 34fcd76bf9c5..eeacc8fbf34c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -23,6 +23,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -304,7 +305,7 @@ private void backupWalEntries(long day, List walEntries) throws IOExc
walWriter.append(entry);
}
walWriter.sync(true);
- uploadBulkLoadFiles(bulkLoadFiles);
+ uploadBulkLoadFiles(day, bulkLoadFiles);
} catch (UncheckedIOException e) {
String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day;
LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day,
@@ -314,9 +315,7 @@ private void backupWalEntries(long day, List walEntries) throws IOExc
}
private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
- // Convert dayInMillis to "yyyy-MM-dd" format
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
- String dayDirectoryName = dateFormat.format(new Date(dayInMillis));
+ String dayDirectoryName = formatToDateString(dayInMillis);
FileSystem fs = backupFileSystemManager.getBackupFs();
Path walsDir = backupFileSystemManager.getWalsDir();
@@ -376,7 +375,7 @@ private void close() {
}
}
- private void uploadBulkLoadFiles(List bulkLoadFiles) throws IOException {
+ private void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) throws IOException {
LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId),
bulkLoadFiles.size());
@@ -384,9 +383,13 @@ private void uploadBulkLoadFiles(List bulkLoadFiles) throws IOException {
LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
}
+ String dayDirectoryName = formatToDateString(dayInMillis);
+ Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName);
+ backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);
+
for (Path file : bulkLoadFiles) {
Path sourcePath = getBulkLoadFileStagingPath(file);
- Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), file);
+ Path destPath = new Path(bulkloadDir, file);
try {
LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath,
@@ -407,6 +410,15 @@ private void uploadBulkLoadFiles(List bulkLoadFiles) throws IOException {
LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId));
}
+ /**
+ * Convert dayInMillis to "yyyy-MM-dd" format
+ */
+ private String formatToDateString(long dayInMillis) {
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return dateFormat.format(new Date(dayInMillis));
+ }
+
private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException {
FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
Path rootDir = CommonFSUtils.getRootDir(conf);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
new file mode 100644
index 000000000000..6d76ac4e89bf
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestBackupDeleteWithCleanup extends TestBackupBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBackupDeleteWithCleanup.class);
+
+ String backupWalDirName = "TestBackupDeleteWithCleanup";
+
+ @Test
+ public void testBackupDeleteWithCleanupLogic() throws Exception {
+ Path root = TEST_UTIL.getDataTestDirOnTestFS();
+ Path backupWalDir = new Path(root, backupWalDirName);
+ conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+ FileSystem fs = FileSystem.get(conf1);
+ fs.mkdirs(backupWalDir);
+
+ // Step 1: Setup Backup Folders
+ long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+ setupBackupFolders(fs, backupWalDir, currentTime);
+
+ // Log the directory structure before cleanup
+ logDirectoryStructure(fs, backupWalDir, "Directory structure BEFORE cleanup:");
+
+ // Step 2: Simulate Backup Creation
+ BackupSystemTable backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection());
+ backupSystemTable.addContinuousBackupTableSet(Set.of(table1),
+ currentTime - (2 * ONE_DAY_IN_MILLISECONDS));
+
+ EnvironmentEdgeManager
+ .injectEdge(() -> System.currentTimeMillis() - (2 * ONE_DAY_IN_MILLISECONDS));
+ String backupId = fullTableBackup(Lists.newArrayList(table1));
+ assertTrue(checkSucceeded(backupId));
+
+ String anotherBackupId = fullTableBackup(Lists.newArrayList(table1));
+ assertTrue(checkSucceeded(anotherBackupId));
+
+ // Step 3: Run Delete Command
+ int ret =
+ ToolRunner.run(conf1, new BackupDriver(), new String[] { "delete", "-l", backupId, "-fd" });
+ assertEquals(0, ret);
+
+ // Log the directory structure after cleanup
+ logDirectoryStructure(fs, backupWalDir, "Directory structure AFTER cleanup:");
+
+ // Step 4: Verify Cleanup
+ verifyBackupCleanup(fs, backupWalDir, currentTime);
+
+ // Step 5: Verify System Table Update
+ verifySystemTableUpdate(backupSystemTable, currentTime);
+ }
+
+ public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long currentTime)
+ throws IOException {
+ Path walsDir = new Path(backupWalDir, WALS_DIR);
+ Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
+
+ fs.mkdirs(walsDir);
+ fs.mkdirs(bulkLoadDir);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+ for (int i = 0; i < 5; i++) {
+ String dateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS)));
+ fs.mkdirs(new Path(walsDir, dateStr));
+ fs.mkdirs(new Path(bulkLoadDir, dateStr));
+ }
+ }
+
+ private static void verifyBackupCleanup(FileSystem fs, Path backupWalDir, long currentTime)
+ throws IOException {
+ Path walsDir = new Path(backupWalDir, WALS_DIR);
+ Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+ // Expect folders older than 3 days to be deleted
+ for (int i = 3; i < 5; i++) {
+ String oldDateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS)));
+ Path walPath = new Path(walsDir, oldDateStr);
+ Path bulkLoadPath = new Path(bulkLoadDir, oldDateStr);
+ assertFalse("Old WAL directory (" + walPath + ") should be deleted, but it exists!",
+ fs.exists(walPath));
+ assertFalse("Old BulkLoad directory (" + bulkLoadPath + ") should be deleted, but it exists!",
+ fs.exists(bulkLoadPath));
+ }
+
+ // Expect folders within the last 3 days to exist
+ for (int i = 0; i < 3; i++) {
+ String recentDateStr =
+ dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS)));
+ Path walPath = new Path(walsDir, recentDateStr);
+ Path bulkLoadPath = new Path(bulkLoadDir, recentDateStr);
+
+ assertTrue("Recent WAL directory (" + walPath + ") should exist, but it is missing!",
+ fs.exists(walPath));
+ assertTrue(
+ "Recent BulkLoad directory (" + bulkLoadPath + ") should exist, but it is missing!",
+ fs.exists(bulkLoadPath));
+ }
+ }
+
+ private void verifySystemTableUpdate(BackupSystemTable backupSystemTable, long currentTime)
+ throws IOException {
+ Map updatedTables = backupSystemTable.getContinuousBackupTableSet();
+
+ for (Map.Entry entry : updatedTables.entrySet()) {
+ long updatedStartTime = entry.getValue();
+
+ // Ensure that the updated start time is not earlier than the expected cutoff time
+ assertTrue("System table update failed!",
+ updatedStartTime >= (currentTime - (3 * ONE_DAY_IN_MILLISECONDS)));
+ }
+ }
+
+ public static void logDirectoryStructure(FileSystem fs, Path dir, String message)
+ throws IOException {
+ System.out.println(message);
+ listDirectory(fs, dir, " ");
+ }
+
+ public static void listDirectory(FileSystem fs, Path dir, String indent) throws IOException {
+ if (!fs.exists(dir)) {
+ System.out.println(indent + "[Missing] " + dir);
+ return;
+ }
+ FileStatus[] files = fs.listStatus(dir);
+ System.out.println(indent + dir);
+ for (FileStatus file : files) {
+ if (file.isDirectory()) {
+ listDirectory(fs, file.getPath(), indent + " ");
+ } else {
+ System.out.println(indent + " " + file.getPath());
+ }
+ }
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
new file mode 100644
index 000000000000..b2ebbd640bbd
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure;
+import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.TestBackupBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestBackupCommands extends TestBackupBase {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBackupCommands.class);
+
+ String backupWalDirName = "TestBackupWalDir";
+
+ /**
+ * Tests whether determineWALCleanupCutoffTime returns the correct FULL backup start timestamp.
+ */
+ @Test
+ public void testDetermineWALCleanupCutoffTimeOfCleanupCommand() throws IOException {
+ // GIVEN
+ BackupSystemTable sysTable = mock(BackupSystemTable.class);
+
+ BackupInfo full1 = new BackupInfo();
+ full1.setType(BackupType.FULL);
+ full1.setStartTs(1111L);
+ full1.setState(BackupInfo.BackupState.COMPLETE);
+
+ BackupInfo inc = new BackupInfo();
+ inc.setType(BackupType.INCREMENTAL);
+ inc.setStartTs(2222L);
+ inc.setState(BackupInfo.BackupState.COMPLETE);
+
+ BackupInfo full2 = new BackupInfo();
+ full2.setType(BackupType.FULL);
+ full2.setStartTs(3333L);
+ full2.setState(BackupInfo.BackupState.COMPLETE);
+
+ // Ordered as newest to oldest, will be reversed in the method
+ List backupInfos = List.of(full2, inc, full1);
+ when(sysTable.getBackupInfos(BackupInfo.BackupState.COMPLETE))
+ .thenReturn(new ArrayList<>(backupInfos));
+
+ // WHEN
+ BackupCommands.DeleteCommand command = new BackupCommands.DeleteCommand(conf1, null);
+ long cutoff = command.determineWALCleanupCutoffTime(sysTable);
+
+ // THEN
+ assertEquals("Expected oldest FULL backup timestamp", 1111L, cutoff);
+ }
+
+ @Test
+ public void testUpdateBackupTableStartTimesOfCleanupCommand() throws IOException {
+ // GIVEN
+ BackupSystemTable mockSysTable = mock(BackupSystemTable.class);
+
+ TableName tableA = TableName.valueOf("ns", "tableA");
+ TableName tableB = TableName.valueOf("ns", "tableB");
+ TableName tableC = TableName.valueOf("ns", "tableC");
+
+ long cutoffTimestamp = 1_000_000L;
+
+ // Simulate current table start times
+ Map tableSet = Map.of(tableA, 900_000L, // Before cutoff → should be updated
+ tableB, 1_100_000L, // After cutoff → should NOT be updated
+ tableC, 800_000L // Before cutoff → should be updated
+ );
+
+ when(mockSysTable.getContinuousBackupTableSet()).thenReturn(tableSet);
+
+ // WHEN
+ BackupCommands.DeleteCommand command = new BackupCommands.DeleteCommand(conf1, null);
+ command.updateBackupTableStartTimes(mockSysTable, cutoffTimestamp);
+
+ // THEN
+ Set expectedUpdated = Set.of(tableA, tableC);
+ verify(mockSysTable).updateContinuousBackupTableSet(expectedUpdated, cutoffTimestamp);
+ }
+
+ @Test
+ public void testDeleteOldWALFilesOfCleanupCommand() throws IOException {
+ // GIVEN
+ Path root = TEST_UTIL.getDataTestDirOnTestFS();
+ Path backupWalDir = new Path(root, backupWalDirName);
+ conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+
+ FileSystem fs = FileSystem.get(conf1);
+ fs.mkdirs(backupWalDir);
+
+ long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+ setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WAL/bulk folders
+
+ logDirectoryStructure(fs, backupWalDir, "Before cleanup:");
+
+ // Delete files older than 2 days from current time
+ long cutoffTime = currentTime - (2 * ONE_DAY_IN_MILLISECONDS);
+
+ // WHEN
+ BackupCommands.DeleteCommand command = new BackupCommands.DeleteCommand(conf1, null);
+ command.deleteOldWALFiles(conf1, backupWalDir.toString(), cutoffTime);
+
+ logDirectoryStructure(fs, backupWalDir, "After cleanup:");
+
+ // THEN
+ verifyCleanupOutcome(fs, backupWalDir, currentTime, cutoffTime);
+ }
+
+ private static void verifyCleanupOutcome(FileSystem fs, Path backupWalDir, long currentTime,
+ long cutoffTime) throws IOException {
+ Path walsDir = new Path(backupWalDir, WALS_DIR);
+ Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ for (int i = 0; i < 5; i++) {
+ long dayTime = currentTime - (i * ONE_DAY_IN_MILLISECONDS);
+ String dayDir = dateFormat.format(new Date(dayTime));
+ Path walPath = new Path(walsDir, dayDir);
+ Path bulkPath = new Path(bulkLoadDir, dayDir);
+
+ if (dayTime + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
+ assertFalse("Old WAL dir should be deleted: " + walPath, fs.exists(walPath));
+ assertFalse("Old BulkLoad dir should be deleted: " + bulkPath, fs.exists(bulkPath));
+ } else {
+ assertTrue("Recent WAL dir should exist: " + walPath, fs.exists(walPath));
+ assertTrue("Recent BulkLoad dir should exist: " + bulkPath, fs.exists(bulkPath));
+ }
+ }
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index cd1f758f7607..253675f85d97 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
@@ -282,6 +283,7 @@ public void testDayWiseWALBackup() throws IOException {
long oneDayBackTime = currentTime - ONE_DAY_IN_MILLISECONDS;
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
String expectedPrevDayDir = dateFormat.format(new Date(oneDayBackTime));
String expectedCurrentDayDir = dateFormat.format(new Date(currentTime));
@@ -437,8 +439,22 @@ private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles,
assertEquals(0, getRowCount(tableName));
replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName);
- replayBulkLoadHFilesIfPresent(new Path(backupRootDir, BULKLOAD_FILES_DIR).toString(),
- tableName);
+
+ // replay Bulk loaded HFiles if Present
+ try {
+ Path bulkloadDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
+ if (fs.exists(bulkloadDir)) {
+ FileStatus[] directories = fs.listStatus(bulkloadDir);
+ for (FileStatus dirStatus : directories) {
+ if (dirStatus.isDirectory()) {
+ replayBulkLoadHFilesIfPresent(dirStatus.getPath().toString(), tableName);
+ }
+ }
+ }
+ } catch (Exception e) {
+ fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
+ }
+
assertEquals(expectedRows, getRowCount(tableName));
}
}