Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,6 @@ private void deleteAllBackupWALFiles(Configuration conf, String backupWalDir)
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();

// Delete contents under WAL directory
if (fs.exists(walDir)) {
Expand All @@ -1015,15 +1014,6 @@ private void deleteAllBackupWALFiles(Configuration conf, String backupWalDir)
System.out.println("Deleted all contents under WAL directory: " + walDir);
}

// Delete contents under bulk load directory
if (fs.exists(bulkloadDir)) {
FileStatus[] bulkContents = fs.listStatus(bulkloadDir);
for (FileStatus item : bulkContents) {
fs.delete(item.getPath(), true); // recursive delete of each child
}
System.out.println("Deleted all contents under Bulk Load directory: " + bulkloadDir);
}

} catch (IOException e) {
System.out.println("WARNING: Failed to delete contents under backup directories: "
+ backupWalDir + ". Error: " + e.getMessage());
Expand All @@ -1032,7 +1022,7 @@ private void deleteAllBackupWALFiles(Configuration conf, String backupWalDir)
}

/**
* Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp.
* Cleans up old WAL files based on the determined cutoff timestamp.
*/
void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
throws IOException {
Expand All @@ -1043,7 +1033,6 @@ void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();

SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Expand All @@ -1069,7 +1058,6 @@ void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
System.out.println("Deleting outdated WAL directory: " + dirPath);
fs.delete(dirPath, true);
fs.delete(new Path(bulkloadDir, dirName), true);
}
} catch (ParseException e) {
System.out.println("WARNING: Failed to parse directory name '" + dirName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,6 @@ private void handleContinuousBackup(Admin admin) throws IOException {
// set overall backup status: complete. Here we make sure to complete the backup.
// After this checkpoint, even if entering cancel process, will let the backup finished
backupInfo.setState(BackupState.COMPLETE);

if (!conf.getBoolean("hbase.replication.bulkload.enabled", false)) {
System.out.println("NOTE: Bulkload replication is not enabled. "
+ "Bulk loaded files will not be backed up as part of continuous backup. "
+ "To ensure bulk loaded files are included in the backup, please enable bulkload replication "
+ "(hbase.replication.bulkload.enabled=true) and configure other necessary settings "
+ "to properly enable bulkload replication.");
}
}

private void handleNonContinuousBackup(Admin admin) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,25 @@
import org.slf4j.LoggerFactory;

/**
* Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and
* bulk-loaded files within the specified backup root directory.
* Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) files within
* the specified backup root directory.
*/
@InterfaceAudience.Private
public class BackupFileSystemManager {
private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class);

public static final String WALS_DIR = "WALs";
public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
private final String peerId;
private final FileSystem backupFs;
private final Path backupRootDir;
private final Path walsDir;
private final Path bulkLoadFilesDir;

public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr)
throws IOException {
this.peerId = peerId;
this.backupRootDir = new Path(backupRootDirStr);
this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
this.walsDir = createDirectory(WALS_DIR);
this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
}

private Path createDirectory(String dirName) throws IOException {
Expand All @@ -61,10 +58,6 @@ public Path getWalsDir() {
return walsDir;
}

public Path getBulkLoadFilesDir() {
return bulkLoadFilesDir;
}

public FileSystem getBackupFs() {
return backupFs;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand All @@ -45,7 +43,6 @@
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL;
Expand All @@ -56,8 +53,8 @@
/**
* ContinuousBackupReplicationEndpoint is responsible for replicating WAL entries to a backup
* storage. It organizes WAL entries by day and periodically flushes the data, ensuring that WAL
* files do not exceed the configured size. The class includes mechanisms for handling the WAL
* files, performing bulk load backups, and ensuring that the replication process is safe.
* files do not exceed the configured size. The class includes mechanisms for handling the WAL files
* and ensuring that the replication process is safe.
*/
@InterfaceAudience.Private
public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint {
Expand Down Expand Up @@ -292,20 +289,11 @@ private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOExc

try {
FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, this::createWalWriter);
List<Path> bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(walEntries);

if (LOG.isTraceEnabled()) {
LOG.trace("{} Processed {} bulk load files for WAL entries", Utils.logPeerId(peerId),
bulkLoadFiles.size());
LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId),
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
}

for (WAL.Entry entry : walEntries) {
walWriter.append(entry);
}
walWriter.sync(true);
uploadBulkLoadFiles(day, bulkLoadFiles);
} catch (UncheckedIOException e) {
String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day;
LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day,
Expand Down Expand Up @@ -375,41 +363,6 @@ private void close() {
}
}

private void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles) throws IOException {
LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId),
bulkLoadFiles.size());

if (LOG.isTraceEnabled()) {
LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
}
String dayDirectoryName = formatToDateString(dayInMillis);
Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName);
backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);

for (Path file : bulkLoadFiles) {
Path sourcePath = getBulkLoadFileStagingPath(file);
Path destPath = new Path(bulkloadDir, file);

try {
LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath,
destPath);

FileUtil.copy(CommonFSUtils.getRootDirFileSystem(conf), sourcePath,
backupFileSystemManager.getBackupFs(), destPath, false, conf);

LOG.info("{} Bulk load file {} successfully backed up to {}", Utils.logPeerId(peerId), file,
destPath);
} catch (IOException e) {
LOG.error("{} Failed to back up bulk load file {}: {}", Utils.logPeerId(peerId), file,
e.getMessage(), e);
throw e;
}
}

LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId));
}

/**
* Convert dayInMillis to "yyyy-MM-dd" format
*/
Expand All @@ -419,48 +372,6 @@ private String formatToDateString(long dayInMillis) {
return dateFormat.format(new Date(dayInMillis));
}

private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException {
FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
Path rootDir = CommonFSUtils.getRootDir(conf);
Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
Path baseNamespaceDir = new Path(rootDir, baseNSDir);
Path hFileArchiveDir =
new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));

LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}", Utils.logPeerId(peerId),
relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir);

Path result =
findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, relativePathFromNamespace);

if (result == null) {
LOG.error("{} No bulk loaded file found in relative path: {}", Utils.logPeerId(peerId),
relativePathFromNamespace);
throw new IOException(
"No Bulk loaded file found in relative path: " + relativePathFromNamespace);
}

LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), result);
return result;
}

private static Path findExistingPath(FileSystem rootFs, Path baseNamespaceDir,
Path hFileArchiveDir, Path filePath) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Checking for bulk load file at: {} and {}", new Path(baseNamespaceDir, filePath),
new Path(hFileArchiveDir, filePath));
}

for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath),
new Path(hFileArchiveDir, filePath) }) {
if (rootFs.exists(candidate)) {
LOG.debug("Found bulk load file at: {}", candidate);
return candidate;
}
}
return null;
}

private void shutdownFlushExecutor() {
if (flushExecutor != null) {
LOG.info("{} Initiating WAL flush executor shutdown.", Utils.logPeerId(peerId));
Expand Down
Loading