From 0802f3a98cede10799f05c16ac619ffbfe91ef82 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Tue, 26 Aug 2025 16:43:09 +0530 Subject: [PATCH 1/9] [HBASE-29520] Utilize Backed-up Bulkloaded Files in Incremental Backup --- .../impl/AbstractPitrRestoreHandler.java | 2 +- .../hbase/backup/impl/BackupCommands.java | 2 +- .../impl/IncrementalTableBackupClient.java | 23 ++++- .../ContinuousBackupReplicationEndpoint.java | 24 +---- .../hadoop/hbase/backup/util/BackupUtils.java | 13 +++ .../hadoop/hbase/backup/TestBackupBase.java | 38 ++++++++ .../backup/TestBackupDeleteWithCleanup.java | 2 +- .../hbase/backup/TestContinuousBackup.java | 6 -- .../TestIncrementalBackupWithContinuous.java | 88 ++++++++++++++----- .../hbase/backup/impl/TestBackupCommands.java | 2 +- ...stContinuousBackupReplicationEndpoint.java | 50 +++-------- 11 files changed, 160 insertions(+), 90 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index 8072277bf684..048ed882fe8d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -21,8 +21,8 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 3ae97c487ef3..ccd8a6961ae2 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -49,8 +49,8 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import java.io.IOException; import java.net.URI; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 956d10a656f8..63bd40634180 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -19,9 +19,11 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import java.io.IOException; import java.net.URI; @@ -166,6 +168,25 @@ protected List handleBulkLoad(List tablesToBackup) throws I Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); + // For continuous backup, bulkload files are copied from backup directory defined by + // CONF_BACKUP_ROOT_DIR, instead of source cluster. + String backupRootDir = conf.get(CONF_BACKUP_ROOT_DIR); + if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { + long bulkloadTs = bulkLoad.getTimestamp(); + String dayDirectoryName = BackupUtils.formatToDateString(bulkloadTs); + Path blkLoadBkpPath = + new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + dayDirectoryName); + Path bulkloadDir = new Path(blkLoadBkpPath, + srcTable.getNamespaceAsString() + Path.SEPARATOR + srcTable.getNameAsString()); + FileSystem bkpFs = FileSystem.get(bulkloadDir.toUri(), conf); + // bulkloadDir.getFileSystem(conf); + Path bkpbldPath = + new Path(bulkloadDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); + if (bkpFs.exists(bkpbldPath)) { + p = bkpbldPath; + } + } + String srcTableQualifier = srcTable.getQualifierAsString(); String srcTableNs = srcTable.getNamespaceAsString(); Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index 69c445c484d8..19624d04c23d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -21,11 +21,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; -import java.text.SimpleDateFormat; -import java.util.Date; import java.util.List; import java.util.Map; -import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -41,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; @@ -94,7 +92,6 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1); public static final String WAL_FILE_PREFIX = "wal_file."; - public static final String DATE_FORMAT = "yyyy-MM-dd"; @Override public void init(Context context) throws IOException { @@ -330,7 +327,7 @@ private void backupWalEntries(long day, List walEntries) throws IOExc } private FSHLogProvider.Writer createWalWriter(long dayInMillis) { - String dayDirectoryName = formatToDateString(dayInMillis); + String dayDirectoryName = BackupUtils.formatToDateString(dayInMillis); FileSystem fs = backupFileSystemManager.getBackupFs(); Path walsDir = backupFileSystemManager.getWalsDir(); @@ -408,7 +405,7 @@ void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId), bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); } - String dayDirectoryName = formatToDateString(dayInMillis); + String dayDirectoryName = BackupUtils.formatToDateString(dayInMillis); Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName); try { backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir); @@ -446,7 +443,7 @@ void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) } catch (IOException e) { throw new BulkLoadUploadException( String.format("%s Failed to copy bulk load file %s to %s on day %s", - Utils.logPeerId(peerId), file, destPath, formatToDateString(dayInMillis)), + Utils.logPeerId(peerId), file, destPath, BackupUtils.formatToDateString(dayInMillis)), e); } } @@ -495,19 +492,6 @@ static void copyWithCleanup(FileSystem srcFS, Path src, FileSystem dstFS, Path d } } - /** - * Convert dayInMillis to "yyyy-MM-dd" format - */ - @RestrictedApi( - explanation = "Package-private for test visibility only. Do not use outside tests.", - link = "", - allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)") - String formatToDateString(long dayInMillis) { - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - return dateFormat.format(new Date(dayInMillis)); - } - private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException { FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); Path rootDir = CommonFSUtils.getRootDir(conf); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index 34fa82ef45f5..bf309104775c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -24,14 +24,17 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URLDecoder; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TimeZone; import java.util.TreeMap; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; @@ -86,6 +89,7 @@ public final class BackupUtils { private static final Logger LOG = LoggerFactory.getLogger(BackupUtils.class); public static final String LOGNAME_SEPARATOR = "."; public static final int MILLISEC_IN_HOUR = 3600000; + public static final String DATE_FORMAT = "yyyy-MM-dd"; private BackupUtils() { throw new AssertionError("Instantiating utility class..."); @@ -932,4 +936,13 @@ private static boolean continuousBackupReplicationPeerExists(Admin admin) throws return admin.listReplicationPeers().stream() .anyMatch(peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)); } + + /** + * Convert dayInMillis to "yyyy-MM-dd" format + */ + public static String formatToDateString(long dayInMillis) { + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + return dateFormat.format(new Date(dayInMillis)); + } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 131b63773458..9d3fb429e2e0 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hbase.backup; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; @@ -31,6 +34,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -46,11 +50,13 @@ import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient; import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager; import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -64,6 +70,7 @@ import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner; import org.apache.hadoop.hbase.regionserver.LogRoller; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.SecureTestUtil; @@ -114,6 +121,8 @@ public class TestBackupBase { protected static boolean autoRestoreOnFailure; protected static boolean useSecondCluster; + private final String replicationEndpoint = ContinuousBackupReplicationEndpoint.class.getName(); + static class IncrementalTableBackupClientForTest extends IncrementalTableBackupClient { public IncrementalTableBackupClientForTest() { } @@ -304,6 +313,7 @@ public static void setUpHelper() throws Exception { conf1.set(CONF_BACKUP_MAX_WAL_SIZE, "10240"); conf1.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); + conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); if (secure) { // set the always on security provider @@ -571,6 +581,34 @@ protected void dumpBackupDir() throws IOException { } } + BackupManifest getLatestBackupManifest(List backups) throws IOException { + BackupInfo newestBackup = backups.get(0); + return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), + newestBackup.getBackupId()); + } + + public void addReplicationPeer(String peerId, Path backupRootDir, + Map> tableMap, Admin admin) throws IOException { + addReplicationPeer(peerId, backupRootDir, tableMap, admin, replicationEndpoint); + } + + public void addReplicationPeer(String peerId, Path backupRootDir, + Map> tableMap, Admin admin, String customReplicationEndpointImpl) + throws IOException { + Map additionalArgs = new HashMap<>(); + additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString()); + additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString()); + additionalArgs.put(CONF_BACKUP_MAX_WAL_SIZE, "10240"); + additionalArgs.put(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); + additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); + + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setReplicationEndpointImpl(customReplicationEndpointImpl).setReplicateAllUserTables(false) + .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build(); + + admin.addReplicationPeer(peerId, peerConfig); + } + void deleteContinuousBackupReplicationPeerIfExists(Admin admin) throws IOException { if ( admin.listReplicationPeers().stream() diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java index d22f4c9cda98..2bfa43b9039b 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java @@ -20,8 +20,8 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java index 0cc34ed63eb0..2fdfa8b73f8b 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java @@ -271,12 +271,6 @@ String[] buildBackupArgs(String backupType, TableName[] tables, boolean continuo } } - BackupManifest getLatestBackupManifest(List backups) throws IOException { - BackupInfo newestBackup = backups.get(0); - return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), - newestBackup.getBackupId()); - } - private void verifyTableInBackupSystemTable(TableName table) throws IOException { try (BackupSystemTable backupTable = new BackupSystemTable(TEST_UTIL.getConnection())) { Map tableBackupMap = backupTable.getContinuousBackupTableSet(); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index 0978ff3ebef5..ead80824c93c 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.backup; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.junit.Assert.assertEquals; @@ -26,6 +30,8 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -44,10 +50,14 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; +import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -57,7 +67,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets; @Category(LargeTests.class) -public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { +public class TestIncrementalBackupWithContinuous extends TestBackupBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -67,6 +77,43 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class); private static final int ROWS_IN_BULK_LOAD = 100; + private String backupWalDirName = "TestContinuousBackupWalDir"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Set the configuration properties as required + conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); + + // TEST_UTIL.startMiniZKCluster(); + // TEST_UTIL.startMiniCluster(3); + } + + @Before + public void beforeTest() throws IOException { + Path root = TEST_UTIL.getDataTestDirOnTestFS(); + Path backupWalDir = new Path(root, backupWalDirName); + FileSystem fs = FileSystem.get(conf1); + fs.mkdirs(backupWalDir); + conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); + conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); + conf1.setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); + } + + @After + public void afterTest() throws IOException { + Path root = TEST_UTIL.getDataTestDirOnTestFS(); + Path backupWalDir = new Path(root, backupWalDirName); + FileSystem fs = FileSystem.get(conf1); + + if (fs.exists(backupWalDir)) { + fs.delete(backupWalDir, true); + } + + conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR); + // deleteContinuousBackupReplicationPeerIfExists(TEST_UTIL.getAdmin()); + } @Test public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception { @@ -80,18 +127,13 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception int before = table.getBackupHistory().size(); // Run continuous backup - String[] args = buildBackupArgs("full", new TableName[] { tableName }, true); - int ret = ToolRunner.run(conf1, new BackupDriver(), args); - assertEquals("Full Backup should succeed", 0, ret); + String backup1 = backupTables(BackupType.FULL, List.of(tableName), BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup1)); // Verify backup history increased and all the backups are succeeded LOG.info("Verify backup history increased and all the backups are succeeded"); List backups = table.getBackupHistory(); assertEquals("Backup history should increase", before + 1, backups.size()); - for (BackupInfo data : List.of(backups.get(0))) { - String backupId = data.getBackupId(); - assertTrue(checkSucceeded(backupId)); - } // Verify backup manifest contains the correct tables LOG.info("Verify backup manifest contains the correct tables"); @@ -105,27 +147,22 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception // Run incremental backup LOG.info("Run incremental backup now"); before = table.getBackupHistory().size(); - args = buildBackupArgs("incremental", new TableName[] { tableName }, false); - ret = ToolRunner.run(conf1, new BackupDriver(), args); - assertEquals("Incremental Backup should succeed", 0, ret); + String backup2 = + backupTables(BackupType.INCREMENTAL, List.of(tableName), BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup2)); LOG.info("Incremental backup completed"); // Verify backup history increased and all the backups are succeeded backups = table.getBackupHistory(); - String incrementalBackupid = null; assertEquals("Backup history should increase", before + 1, backups.size()); - for (BackupInfo data : List.of(backups.get(0))) { - String backupId = data.getBackupId(); - incrementalBackupid = backupId; - assertTrue(checkSucceeded(backupId)); - } TEST_UTIL.truncateTable(tableName); + // Restore incremental backup TableName[] tables = new TableName[] { tableName }; BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection()); - client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, incrementalBackupid, false, - tables, tables, true)); + client.restore( + BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, tables, tables, true)); assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName)); } finally { @@ -136,9 +173,18 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception @Test public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws Exception { conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); + conf1.set(CONF_BACKUP_ROOT_DIR, BACKUP_ROOT_DIR); String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName1 = TableName.valueOf("table_" + methodName); + String peerId = "peerId"; TEST_UTIL.createTable(tableName1, famName); + Path backupRootDir = new Path(BACKUP_ROOT_DIR, methodName); + TEST_UTIL.getTestFileSystem().mkdirs(backupRootDir); + + Map> tableMap = new HashMap<>(); + tableMap.put(tableName1, new ArrayList<>()); + addReplicationPeer(peerId, backupRootDir, tableMap, TEST_UTIL.getAdmin()); + try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { // The test starts with no data, and no bulk loaded rows. @@ -157,7 +203,7 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); loadTable(TEST_UTIL.getConnection().getTable(tableName1)); - Thread.sleep(10000); + Thread.sleep(20000); performBulkLoad("bulkPostIncr", methodName, tableName1); assertEquals(2, systemTable.readBulkloadRows(List.of(tableName1)).size()); @@ -168,7 +214,7 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws assertTrue(checkSucceeded(backup2)); // bulkPostIncr Bulkload entry should not be deleted post incremental backup - assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + // assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); TEST_UTIL.truncateTable(tableName1); // Restore incremental backup diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java index e00ebd6099f5..d040eb4441a6 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java @@ -21,8 +21,8 @@ import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure; import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java index 8f8e83dbda6b..71d966f46d26 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java @@ -21,15 +21,11 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -52,7 +48,6 @@ import java.util.Map; import java.util.Set; import java.util.TimeZone; -import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -65,6 +60,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.TestBackupBase; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -92,7 +89,7 @@ import org.slf4j.LoggerFactory; @Category({ ReplicationTests.class, LargeTests.class }) -public class TestContinuousBackupReplicationEndpoint { +public class TestContinuousBackupReplicationEndpoint extends TestBackupBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestContinuousBackupReplicationEndpoint.class); @@ -102,13 +99,12 @@ public class TestContinuousBackupReplicationEndpoint { private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private static final Configuration conf = TEST_UTIL.getConfiguration(); - private static Admin admin; - private final String replicationEndpoint = ContinuousBackupReplicationEndpoint.class.getName(); private static final String CF_NAME = "cf"; private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier"); static FileSystem fs = null; static Path root; + private static Admin admin; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -145,7 +141,7 @@ public void testWALAndBulkLoadFileBackup() throws IOException { Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap); + addReplicationPeer(peerId, backupRootDir, tableMap, admin); loadRandomData(tableName, 100); assertEquals(100, getRowCount(tableName)); @@ -182,7 +178,7 @@ public void testMultiTableWALBackup() throws IOException { initialTableMap.put(table1, new ArrayList<>()); initialTableMap.put(table2, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, initialTableMap); + addReplicationPeer(peerId, backupRootDir, initialTableMap, admin); for (TableName table : List.of(table1, table2, table3)) { loadRandomData(table, 50); @@ -227,7 +223,7 @@ public void testWALBackupWithPeerRestart() throws IOException, InterruptedExcept Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap); + addReplicationPeer(peerId, backupRootDir, tableMap, admin); AtomicBoolean stopLoading = new AtomicBoolean(false); @@ -283,7 +279,7 @@ public void testDayWiseWALBackup() throws IOException { Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap); + addReplicationPeer(peerId, backupRootDir, tableMap, admin); // Mock system time using ManualEnvironmentEdge ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); @@ -355,7 +351,7 @@ public void testBulkLoadFileUploadRetry() throws IOException { Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap, + addReplicationPeer(peerId, backupRootDir, tableMap, admin, FailingOnceContinuousBackupReplicationEndpoint.class.getName()); loadRandomData(tableName, 100); @@ -461,7 +457,7 @@ public void testBulkLoadFileUploadWithStaleFileRetry() throws Exception { Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap, + addReplicationPeer(peerId, backupRootDir, tableMap, admin, PartiallyUploadedBulkloadFileEndpoint.class.getName()); loadRandomData(tableName, 100); @@ -497,7 +493,7 @@ protected void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) firstAttempt = false; try { // Construct destination path and create a partial file - String dayDirectoryName = formatToDateString(dayInMillis); + String dayDirectoryName = BackupUtils.formatToDateString(dayInMillis); BackupFileSystemManager backupFileSystemManager = new BackupFileSystemManager("peer1", conf, conf.get(CONF_BACKUP_ROOT_DIR)); Path bulkloadDir = @@ -550,28 +546,6 @@ private void deleteTable(TableName tableName) throws IOException { admin.deleteTable(tableName); } - private void addReplicationPeer(String peerId, Path backupRootDir, - Map> tableMap) throws IOException { - addReplicationPeer(peerId, backupRootDir, tableMap, replicationEndpoint); - } - - private void addReplicationPeer(String peerId, Path backupRootDir, - Map> tableMap, String customReplicationEndpointImpl) - throws IOException { - Map additionalArgs = new HashMap<>(); - additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString()); - additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString()); - additionalArgs.put(CONF_BACKUP_MAX_WAL_SIZE, "10240"); - additionalArgs.put(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); - additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); - - ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() - .setReplicationEndpointImpl(customReplicationEndpointImpl).setReplicateAllUserTables(false) - .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build(); - - admin.addReplicationPeer(peerId, peerConfig); - } - private void deleteReplicationPeer(String peerId) throws IOException { admin.disableReplicationPeer(peerId); admin.removeReplicationPeer(peerId); From eb775fa03a855acedc6239730a65e1d8de57b50c Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Wed, 27 Aug 2025 23:30:52 +0530 Subject: [PATCH 2/9] test fix --- .../impl/IncrementalTableBackupClient.java | 10 ++++--- .../ContinuousBackupReplicationEndpoint.java | 2 +- .../hadoop/hbase/backup/TestBackupBase.java | 2 ++ .../TestIncrementalBackupWithContinuous.java | 26 ++++++++++++------- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 63bd40634180..096f29a95682 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -169,8 +169,8 @@ protected List handleBulkLoad(List tablesToBackup) throws I Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); // For continuous backup, bulkload files are copied from backup directory defined by - // CONF_BACKUP_ROOT_DIR, instead of source cluster. - String backupRootDir = conf.get(CONF_BACKUP_ROOT_DIR); + // CONF_CONTINUOUS_BACKUP_WAL_DIR, instead of source cluster. + String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { long bulkloadTs = bulkLoad.getTimestamp(); String dayDirectoryName = BackupUtils.formatToDateString(bulkloadTs); @@ -178,11 +178,13 @@ protected List handleBulkLoad(List tablesToBackup) throws I new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + dayDirectoryName); Path bulkloadDir = new Path(blkLoadBkpPath, srcTable.getNamespaceAsString() + Path.SEPARATOR + srcTable.getNameAsString()); - FileSystem bkpFs = FileSystem.get(bulkloadDir.toUri(), conf); + FileSystem backupFs = FileSystem.get(bulkloadDir.toUri(), conf); // bulkloadDir.getFileSystem(conf); Path bkpbldPath = new Path(bulkloadDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - if (bkpFs.exists(bkpbldPath)) { + LOG.info("ANKIT Found backup bulkload file {}", bkpbldPath); + if (backupFs.exists(bkpbldPath)) { + LOG.info("ANKIT Found backup bulkload file {}", bkpbldPath); p = bkpbldPath; } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index 19624d04c23d..7b9cb9771ce9 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -432,7 +432,7 @@ void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) Path destPath = new Path(bulkloadDir, file); try { - LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, + LOG.debug("{} ANKIT Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, destPath); copyWithCleanup(CommonFSUtils.getRootDirFileSystem(conf), sourcePath, diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 9d3fb429e2e0..ac94981a40cc 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.backup; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; @@ -314,6 +315,7 @@ public static void setUpHelper() throws Exception { conf1.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); + conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); if (secure) { // set the always on security provider diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index ead80824c93c..1f5d5ba6fc2f 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -79,15 +79,17 @@ public class TestIncrementalBackupWithContinuous extends TestBackupBase { private static final int ROWS_IN_BULK_LOAD = 100; private String backupWalDirName = "TestContinuousBackupWalDir"; + /* @BeforeClass public static void setUpBeforeClass() throws Exception { // Set the configuration properties as required - conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); - conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); + //conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); + //conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); // TEST_UTIL.startMiniZKCluster(); // TEST_UTIL.startMiniCluster(3); } + */ @Before public void beforeTest() throws IOException { @@ -173,17 +175,19 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception @Test public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws Exception { conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); - conf1.set(CONF_BACKUP_ROOT_DIR, BACKUP_ROOT_DIR); + // conf1.set(CONF_BACKUP_ROOT_DIR, BACKUP_ROOT_DIR); String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName1 = TableName.valueOf("table_" + methodName); - String peerId = "peerId"; + //String peerId = "peerId"; TEST_UTIL.createTable(tableName1, famName); - Path backupRootDir = new Path(BACKUP_ROOT_DIR, methodName); - TEST_UTIL.getTestFileSystem().mkdirs(backupRootDir); + //Path backupRootDir = new Path(BACKUP_ROOT_DIR, methodName); + //TEST_UTIL.getTestFileSystem().mkdirs(backupRootDir); + //Path backupRootDir = new Path(TEST_UTIL.getDataTestDirOnTestFS(), methodName); + //conf1.set(CONF_BACKUP_ROOT_DIR, backupRootDir); - Map> tableMap = new HashMap<>(); - tableMap.put(tableName1, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap, TEST_UTIL.getAdmin()); + //Map> tableMap = new HashMap<>(); + //tableMap.put(tableName1, new ArrayList<>()); + //addReplicationPeer(peerId, backupRootDir, tableMap, TEST_UTIL.getAdmin()); try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { @@ -196,6 +200,8 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup1)); + boolean rep = conf1.getBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, false); + String[] clusterid = conf1.getStrings(REPLICATION_CLUSTER_ID); loadTable(TEST_UTIL.getConnection().getTable(tableName1)); expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH; performBulkLoad("bulkPreIncr", methodName, tableName1); @@ -203,7 +209,7 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); loadTable(TEST_UTIL.getConnection().getTable(tableName1)); - Thread.sleep(20000); + Thread.sleep(15000); performBulkLoad("bulkPostIncr", methodName, tableName1); assertEquals(2, systemTable.readBulkloadRows(List.of(tableName1)).size()); From f7e54d6537966f641797eee89246232fe92324cb Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 28 Aug 2025 01:20:22 +0530 Subject: [PATCH 3/9] Refactoring --- .../impl/IncrementalTableBackupClient.java | 26 ++++---- .../ContinuousBackupReplicationEndpoint.java | 2 +- .../hadoop/hbase/backup/TestBackupBase.java | 31 ---------- .../TestIncrementalBackupWithContinuous.java | 59 +++---------------- ...stContinuousBackupReplicationEndpoint.java | 45 +++++++++++--- 5 files changed, 56 insertions(+), 107 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 096f29a95682..29cce0291372 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; @@ -169,23 +168,22 @@ protected List handleBulkLoad(List tablesToBackup) throws I Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); // For continuous backup, bulkload files are copied from backup directory defined by - // CONF_CONTINUOUS_BACKUP_WAL_DIR, instead of source cluster. + // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster. String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { - long bulkloadTs = bulkLoad.getTimestamp(); - String dayDirectoryName = BackupUtils.formatToDateString(bulkloadTs); - Path blkLoadBkpPath = + String dayDirectoryName = BackupUtils.formatToDateString(bulkLoad.getTimestamp()); + Path bulkLoadBackupPath = new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + dayDirectoryName); - Path bulkloadDir = new Path(blkLoadBkpPath, + Path bulkLoadDir = new Path(bulkLoadBackupPath, srcTable.getNamespaceAsString() + Path.SEPARATOR + srcTable.getNameAsString()); - FileSystem backupFs = FileSystem.get(bulkloadDir.toUri(), conf); - // bulkloadDir.getFileSystem(conf); - Path bkpbldPath = - new Path(bulkloadDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - LOG.info("ANKIT Found backup bulkload file {}", bkpbldPath); - if (backupFs.exists(bkpbldPath)) { - LOG.info("ANKIT Found backup bulkload file {}", bkpbldPath); - p = bkpbldPath; + FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf); + Path fullBulkLoadBackupPath = + new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); + if (backupFs.exists(fullBulkLoadBackupPath)) { + LOG.info("Backup bulkload file {} found", fullBulkLoadBackupPath); + p = fullBulkLoadBackupPath; + } else { + LOG.warn("Backup bulkload file {} NOT found", fullBulkLoadBackupPath); } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index 7b9cb9771ce9..19624d04c23d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -432,7 +432,7 @@ void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) Path destPath = new Path(bulkloadDir, file); try { - LOG.debug("{} ANKIT Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, + LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, destPath); copyWithCleanup(CommonFSUtils.getRootDirFileSystem(conf), sourcePath, diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index ac94981a40cc..1aa6a5a67af3 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -21,8 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; @@ -35,7 +33,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -57,7 +54,6 @@ import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager; import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; -import org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -71,7 +67,6 @@ import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner; import org.apache.hadoop.hbase.regionserver.LogRoller; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.SecureTestUtil; @@ -122,8 +117,6 @@ public class TestBackupBase { protected static boolean autoRestoreOnFailure; protected static boolean useSecondCluster; - private final String replicationEndpoint = ContinuousBackupReplicationEndpoint.class.getName(); - static class IncrementalTableBackupClientForTest extends IncrementalTableBackupClient { public IncrementalTableBackupClientForTest() { } @@ -314,8 +307,6 @@ public static void setUpHelper() throws Exception { conf1.set(CONF_BACKUP_MAX_WAL_SIZE, "10240"); conf1.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); - conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); - conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); if (secure) { // set the always on security provider @@ -589,28 +580,6 @@ BackupManifest getLatestBackupManifest(List backups) throws IOExcept newestBackup.getBackupId()); } - public void addReplicationPeer(String peerId, Path backupRootDir, - Map> tableMap, Admin admin) throws IOException { - addReplicationPeer(peerId, backupRootDir, tableMap, admin, replicationEndpoint); - } - - public void addReplicationPeer(String peerId, Path backupRootDir, - Map> tableMap, Admin admin, String customReplicationEndpointImpl) - throws IOException { - Map additionalArgs = new HashMap<>(); - additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString()); - additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString()); - additionalArgs.put(CONF_BACKUP_MAX_WAL_SIZE, "10240"); - additionalArgs.put(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); - additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); - - ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() - .setReplicationEndpointImpl(customReplicationEndpointImpl).setReplicateAllUserTables(false) - .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build(); - - admin.addReplicationPeer(peerId, peerConfig); - } - void deleteContinuousBackupReplicationPeerIfExists(Admin admin) throws IOException { if ( admin.listReplicationPeers().stream() diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index 1f5d5ba6fc2f..34d265850f29 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hbase.backup; -import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; -import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; +import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_MISSING_FILES; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.junit.Assert.assertEquals; @@ -30,8 +28,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -50,14 +46,12 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; -import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -77,30 +71,15 @@ public class TestIncrementalBackupWithContinuous extends TestBackupBase { LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class); private static final int ROWS_IN_BULK_LOAD = 100; - private String backupWalDirName = "TestContinuousBackupWalDir"; - - /* - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // Set the configuration properties as required - //conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); - //conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); - - // TEST_UTIL.startMiniZKCluster(); - // TEST_UTIL.startMiniCluster(3); - } - */ + private static final String backupWalDirName = "TestContinuousBackupWalDir"; @Before public void beforeTest() throws IOException { Path root = TEST_UTIL.getDataTestDirOnTestFS(); Path backupWalDir = new Path(root, backupWalDirName); - FileSystem fs = FileSystem.get(conf1); - fs.mkdirs(backupWalDir); conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); - conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); - conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); - conf1.setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); + conf1.setBoolean(IGNORE_MISSING_FILES, true); + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); } @After @@ -108,19 +87,16 @@ public void afterTest() throws IOException { Path root = TEST_UTIL.getDataTestDirOnTestFS(); Path backupWalDir = new Path(root, backupWalDirName); FileSystem fs = FileSystem.get(conf1); - if (fs.exists(backupWalDir)) { fs.delete(backupWalDir, true); } - conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR); - // deleteContinuousBackupReplicationPeerIfExists(TEST_UTIL.getAdmin()); + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); + deleteContinuousBackupReplicationPeerIfExists(TEST_UTIL.getAdmin()); } @Test public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception { - LOG.info("Testing incremental backup with continuous backup"); - conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName = TableName.valueOf("table_" + methodName); Table t1 = TEST_UTIL.createTable(tableName, famName); @@ -167,30 +143,16 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, tables, tables, true)); assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName)); - } finally { - conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); } } @Test public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws Exception { - conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); - // conf1.set(CONF_BACKUP_ROOT_DIR, BACKUP_ROOT_DIR); String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName1 = TableName.valueOf("table_" + methodName); - //String peerId = "peerId"; TEST_UTIL.createTable(tableName1, famName); - //Path backupRootDir = new Path(BACKUP_ROOT_DIR, methodName); - //TEST_UTIL.getTestFileSystem().mkdirs(backupRootDir); - //Path backupRootDir = new Path(TEST_UTIL.getDataTestDirOnTestFS(), methodName); - //conf1.set(CONF_BACKUP_ROOT_DIR, backupRootDir); - - //Map> tableMap = new HashMap<>(); - //tableMap.put(tableName1, new ArrayList<>()); - //addReplicationPeer(peerId, backupRootDir, tableMap, TEST_UTIL.getAdmin()); try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { - // The test starts with no data, and no bulk loaded rows. int expectedRowCount = 0; assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); @@ -200,8 +162,6 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup1)); - boolean rep = conf1.getBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, false); - String[] clusterid = conf1.getStrings(REPLICATION_CLUSTER_ID); loadTable(TEST_UTIL.getConnection().getTable(tableName1)); expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH; performBulkLoad("bulkPreIncr", methodName, tableName1); @@ -220,7 +180,7 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws assertTrue(checkSucceeded(backup2)); // bulkPostIncr Bulkload entry should not be deleted post incremental backup - // assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); TEST_UTIL.truncateTable(tableName1); // Restore incremental backup @@ -229,14 +189,11 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws client.restore( BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, tables, tables, true)); assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); - } finally { - conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); } } @Test public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception { - conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName1 = TableName.valueOf("table_" + methodName); TEST_UTIL.createTable(tableName1, famName); @@ -280,8 +237,6 @@ public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception { new TableName[] { restoredTable }, restoreTs, null); int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); assertNotEquals("Restore should fail since there is one bulkload without any backup", 0, ret); - } finally { - conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java index 71d966f46d26..cc9200882e3d 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java @@ -21,7 +21,11 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup; @@ -48,6 +52,7 @@ import java.util.Map; import java.util.Set; import java.util.TimeZone; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -60,7 +65,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.TestBackupBase; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -89,7 +93,7 @@ import org.slf4j.LoggerFactory; @Category({ ReplicationTests.class, LargeTests.class }) -public class TestContinuousBackupReplicationEndpoint extends TestBackupBase { +public class TestContinuousBackupReplicationEndpoint { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestContinuousBackupReplicationEndpoint.class); @@ -99,12 +103,13 @@ public class TestContinuousBackupReplicationEndpoint extends TestBackupBase { private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private static final Configuration conf = TEST_UTIL.getConfiguration(); + private static Admin admin; + private final String replicationEndpoint = ContinuousBackupReplicationEndpoint.class.getName(); private static final String CF_NAME = "cf"; private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier"); static FileSystem fs = null; static Path root; - private static Admin admin; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -141,7 +146,7 @@ public void testWALAndBulkLoadFileBackup() throws IOException { Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap, admin); + addReplicationPeer(peerId, backupRootDir, tableMap); loadRandomData(tableName, 100); assertEquals(100, getRowCount(tableName)); @@ -178,7 +183,7 @@ public void testMultiTableWALBackup() throws IOException { initialTableMap.put(table1, new ArrayList<>()); initialTableMap.put(table2, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, initialTableMap, admin); + addReplicationPeer(peerId, backupRootDir, initialTableMap); for (TableName table : List.of(table1, table2, table3)) { loadRandomData(table, 50); @@ -223,7 +228,7 @@ public void testWALBackupWithPeerRestart() throws IOException, InterruptedExcept Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap, admin); + addReplicationPeer(peerId, backupRootDir, tableMap); AtomicBoolean stopLoading = new AtomicBoolean(false); @@ -279,7 +284,7 @@ public void testDayWiseWALBackup() throws IOException { Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap, admin); + addReplicationPeer(peerId, backupRootDir, tableMap); // Mock system time using ManualEnvironmentEdge ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); @@ -351,7 +356,7 @@ public void testBulkLoadFileUploadRetry() throws IOException { Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap, admin, + addReplicationPeer(peerId, backupRootDir, tableMap, FailingOnceContinuousBackupReplicationEndpoint.class.getName()); loadRandomData(tableName, 100); @@ -457,7 +462,7 @@ public void testBulkLoadFileUploadWithStaleFileRetry() throws Exception { Map> tableMap = new HashMap<>(); tableMap.put(tableName, new ArrayList<>()); - addReplicationPeer(peerId, backupRootDir, tableMap, admin, + addReplicationPeer(peerId, backupRootDir, tableMap, PartiallyUploadedBulkloadFileEndpoint.class.getName()); loadRandomData(tableName, 100); @@ -546,6 +551,28 @@ private void deleteTable(TableName tableName) throws IOException { admin.deleteTable(tableName); } + private void addReplicationPeer(String peerId, Path backupRootDir, + Map> tableMap) throws IOException { + addReplicationPeer(peerId, backupRootDir, tableMap, replicationEndpoint); + } + + private void addReplicationPeer(String peerId, Path backupRootDir, + Map> tableMap, String customReplicationEndpointImpl) + throws IOException { + Map additionalArgs = new HashMap<>(); + additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString()); + additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString()); + additionalArgs.put(CONF_BACKUP_MAX_WAL_SIZE, "10240"); + additionalArgs.put(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); + additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); + + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setReplicationEndpointImpl(customReplicationEndpointImpl).setReplicateAllUserTables(false) + .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build(); + + admin.addReplicationPeer(peerId, peerConfig); + } + private void deleteReplicationPeer(String peerId) throws IOException { admin.disableReplicationPeer(peerId); admin.removeReplicationPeer(peerId); From 97b24646b50e7706c638482e32db2306203ae807 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 28 Aug 2025 02:04:24 +0530 Subject: [PATCH 4/9] Minor test fix --- .../hbase/backup/impl/IncrementalTableBackupClient.java | 4 ++-- .../java/org/apache/hadoop/hbase/backup/TestBackupBase.java | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 29cce0291372..eb450cf28102 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -180,10 +180,10 @@ protected List handleBulkLoad(List tablesToBackup) throws I Path fullBulkLoadBackupPath = new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); if (backupFs.exists(fullBulkLoadBackupPath)) { - LOG.info("Backup bulkload file {} found", fullBulkLoadBackupPath); + LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath); p = fullBulkLoadBackupPath; } else { - LOG.warn("Backup bulkload file {} NOT found", fullBulkLoadBackupPath); + LOG.warn("Backup bulkload file not found {}", fullBulkLoadBackupPath); } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 1aa6a5a67af3..6357ae3d2eb1 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -307,6 +307,8 @@ public static void setUpHelper() throws Exception { conf1.set(CONF_BACKUP_MAX_WAL_SIZE, "10240"); conf1.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); + conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); if (secure) { // set the always on security provider From aa9d04c95de7274291a28a10b50fd540f3498794 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 28 Aug 2025 10:36:59 +0530 Subject: [PATCH 5/9] Rerun tests --- .../hadoop/hbase/backup/impl/IncrementalTableBackupClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index eb450cf28102..57d81f8f12b5 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -167,7 +167,7 @@ protected List handleBulkLoad(List tablesToBackup) throws I Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - // For continuous backup, bulkload files are copied from backup directory defined by + // For continuous backup: bulkload files are copied from backup directory defined by // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster. String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { From 4d89d3846bc05b1edda3ca430d456f0c2901cc9b Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 28 Aug 2025 17:59:03 +0530 Subject: [PATCH 6/9] Minor change to rerun tests --- .../hadoop/hbase/backup/impl/IncrementalTableBackupClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 57d81f8f12b5..eb450cf28102 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -167,7 +167,7 @@ protected List handleBulkLoad(List tablesToBackup) throws I Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - // For continuous backup: bulkload files are copied from backup directory defined by + // For continuous backup, bulkload files are copied from backup directory defined by // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster. String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { From d489d23f0eb544f67189be8f03f44b228923a912 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 28 Aug 2025 20:35:24 +0530 Subject: [PATCH 7/9] Removing IGNORE_MISSING_FILES from test --- .../hbase/backup/TestIncrementalBackupWithContinuous.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index 34d265850f29..e67e50ebee36 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.backup; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; -import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_MISSING_FILES; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.junit.Assert.assertEquals; @@ -78,7 +77,6 @@ public void beforeTest() throws IOException { Path root = TEST_UTIL.getDataTestDirOnTestFS(); Path backupWalDir = new Path(root, backupWalDirName); conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); - conf1.setBoolean(IGNORE_MISSING_FILES, true); conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); } From d9f3c950000016e395f24817ca80db95b33704e5 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Tue, 2 Sep 2025 15:11:48 +0530 Subject: [PATCH 8/9] Minor change to rerun tests --- .../hadoop/hbase/backup/impl/IncrementalTableBackupClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index eb450cf28102..57d81f8f12b5 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -167,7 +167,7 @@ protected List handleBulkLoad(List tablesToBackup) throws I Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - // For continuous backup, bulkload files are copied from backup directory defined by + // For continuous backup: bulkload files are copied from backup directory defined by // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster. String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { From 12940d5babb0fe64488431fd7ce78cf52253941c Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 4 Sep 2025 16:47:33 +0530 Subject: [PATCH 9/9] Added ignore empty file flag --- .../java/org/apache/hadoop/hbase/backup/TestBackupBase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 6357ae3d2eb1..159514bd45b1 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; +import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; import java.util.ArrayList; @@ -309,6 +310,7 @@ public static void setUpHelper() throws Exception { conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); conf1.set(REPLICATION_CLUSTER_ID, "clusterId1"); + conf1.setBoolean(IGNORE_EMPTY_FILES, true); if (secure) { // set the always on security provider