Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -101,9 +102,15 @@ protected int executeRestore(boolean check, TableName[] fromTables, TableName[]
return -5;
}

PointInTimeRestoreRequest pointInTimeRestoreRequest = new PointInTimeRestoreRequest.Builder()
.withBackupRootDir(backupRootDir).withCheck(check).withFromTables(fromTables)
.withToTables(toTables).withOverwrite(isOverwrite).withToDateTime(endTime).build();
// TODO: Currently hardcoding keepOriginalSplits=false and restoreRootDir via tmp dir.
// These should come from user input (same issue exists in normal restore).
// Expose them as configurable options in future.
PointInTimeRestoreRequest pointInTimeRestoreRequest =
new PointInTimeRestoreRequest.Builder().withBackupRootDir(backupRootDir).withCheck(check)
.withFromTables(fromTables).withToTables(toTables).withOverwrite(isOverwrite)
.withToDateTime(endTime).withKeepOriginalSplits(false).withRestoreRootDir(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: withKeepOriginalSplits should this be configurable ? in what case it should be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be configurable. But it is hardcoded in the already existing code as well. so, we need to fix in both cases. and I think it should be a separate ticket. But for the moment I added this comment

// TODO: Currently hardcoding keepOriginalSplits=false and restoreRootDir via tmp dir.
      // These should come from user input (same issue exists in normal restore).
      // Expose them as configurable options in future.

BackupUtils.getTmpRestoreOutputDir(FileSystem.get(conf), conf).toString())
.build();

client.pointInTimeRestore(pointInTimeRestoreRequest);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,33 @@
public final class PointInTimeRestoreRequest {

private final String backupRootDir;
private final String restoreRootDir;
private final boolean check;
private final TableName[] fromTables;
private final TableName[] toTables;
private final boolean overwrite;
private final long toDateTime;
private final boolean isKeepOriginalSplits;

private PointInTimeRestoreRequest(Builder builder) {
this.backupRootDir = builder.backupRootDir;
this.restoreRootDir = builder.restoreRootDir;
this.check = builder.check;
this.fromTables = builder.fromTables;
this.toTables = builder.toTables;
this.overwrite = builder.overwrite;
this.toDateTime = builder.toDateTime;
this.isKeepOriginalSplits = builder.isKeepOriginalSplits;
}

public String getBackupRootDir() {
return backupRootDir;
}

public String getRestoreRootDir() {
return restoreRootDir;
}

public boolean isCheck() {
return check;
}
Expand All @@ -66,19 +74,30 @@ public long getToDateTime() {
return toDateTime;
}

public boolean isKeepOriginalSplits() {
return isKeepOriginalSplits;
}

public static class Builder {
private String backupRootDir;
private String restoreRootDir;
private boolean check = false;
private TableName[] fromTables;
private TableName[] toTables;
private boolean overwrite = false;
private long toDateTime;
private boolean isKeepOriginalSplits;

public Builder withBackupRootDir(String backupRootDir) {
this.backupRootDir = backupRootDir;
return this;
}

public Builder withRestoreRootDir(String restoreRootDir) {
this.restoreRootDir = restoreRootDir;
return this;
}

public Builder withCheck(boolean check) {
this.check = check;
return this;
Expand All @@ -104,6 +123,11 @@ public Builder withToDateTime(long dateTime) {
return this;
}

public Builder withKeepOriginalSplits(boolean isKeepOriginalSplits) {
this.isKeepOriginalSplits = isKeepOriginalSplits;
return this;
}

public PointInTimeRestoreRequest build() {
return new PointInTimeRestoreRequest(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;

Expand All @@ -30,6 +30,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand All @@ -41,9 +42,12 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
import org.apache.hadoop.hbase.backup.RestoreJob;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.backup.util.BulkFilesCollector;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
Expand Down Expand Up @@ -305,6 +309,63 @@ private void restoreTableWithWalReplay(TableName sourceTable, TableName targetTa

backupAdmin.restore(restoreRequest);
replayWal(sourceTable, targetTable, backupMetadata.getStartTs(), endTime);

reBulkloadFiles(sourceTable, targetTable, backupMetadata.getStartTs(), endTime,
request.isKeepOriginalSplits(), request.getRestoreRootDir());
}

/**
* Re-applies/re-bulkloads store files discovered from WALs into the target table.
* <p>
* <b>Note:</b> this method re-uses the same {@link RestoreJob} MapReduce job that we originally
* implemented for performing full and incremental backup restores. The MR job (obtained via
* {@link BackupRestoreFactory#getRestoreJob(Configuration)}) is used here to perform an HFile
* bulk-load of the discovered store files into {@code targetTable}.
* @param sourceTable source table name (used for locating bulk files and logging)
* @param targetTable destination table to bulk-load the HFiles into
* @param startTime start of WAL range (ms)
* @param endTime end of WAL range (ms)
* @param keepOriginalSplits pass-through flag to control whether original region splits are
* preserved
* @param restoreRootDir local/DFS path under which temporary and output dirs are created
* @throws IOException on IO or job failure
*/
private void reBulkloadFiles(TableName sourceTable, TableName targetTable, long startTime,
long endTime, boolean keepOriginalSplits, String restoreRootDir) throws IOException {

Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, keepOriginalSplits);

String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
Path walDirPath = new Path(walBackupDir);
conf.set(RestoreJob.BACKUP_ROOT_PATH_KEY, walDirPath.toString());

RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);

List<Path> bulkloadFiles =
collectBulkFiles(sourceTable, targetTable, startTime, endTime, new Path(restoreRootDir));

if (bulkloadFiles.isEmpty()) {
LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping bulkload restore.",
sourceTable, startTime, endTime);
return;
}

Path[] pathsArray = bulkloadFiles.toArray(new Path[0]);

try {
// Use the existing RestoreJob MR job (the same MapReduce job used for full/incremental
// restores)
// to perform the HFile bulk-load of the discovered store files into `targetTable`.
restoreService.run(pathsArray, new TableName[] { sourceTable }, new Path(restoreRootDir),
new TableName[] { targetTable }, false);
LOG.info("Re-bulkload completed for {}", targetTable);
} catch (Exception e) {
String errorMessage =
String.format("Re-bulkload failed for %s: %s", targetTable, e.getMessage());
LOG.error(errorMessage, e);
throw new IOException(errorMessage, e);
}
}

/**
Expand All @@ -329,6 +390,29 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT
executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
}

private List<Path> collectBulkFiles(TableName sourceTable, TableName targetTable, long startTime,
long endTime, Path restoreRootDir) throws IOException {

String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
Path walDirPath = new Path(walBackupDir);
LOG.info(
"Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}, restore root: {}",
sourceTable, targetTable, startTime, endTime, walDirPath, restoreRootDir);

List<String> validDirs =
getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
if (validDirs.isEmpty()) {
LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.",
startTime, endTime);
return Collections.emptyList();
}

String walDirsCsv = String.join(",", validDirs);

return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime);
}

/**
* Fetches valid WAL directories based on the given time range.
*/
Expand Down Expand Up @@ -356,7 +440,7 @@ private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, long
validDirs.add(dayDir.getPath().toString());
}
} catch (ParseException e) {
LOG.warn("Skipping invalid directory name: " + dirName, e);
LOG.warn("Skipping invalid directory name: {}", dirName, e);
Copy link

Copilot AI Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] This change from concatenated string to parameterized logging is good, but the comment on line 440 should be updated to reflect the corrected logging style for consistency.

Copilot uses AI. Check for mistakes.
}
}
return validDirs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;

import java.io.IOException;
Expand Down
Loading