diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
index abdf52f14302..19159eeba921 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
@@ -27,6 +27,7 @@
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@@ -101,9 +102,15 @@ protected int executeRestore(boolean check, TableName[] fromTables, TableName[]
return -5;
}
- PointInTimeRestoreRequest pointInTimeRestoreRequest = new PointInTimeRestoreRequest.Builder()
- .withBackupRootDir(backupRootDir).withCheck(check).withFromTables(fromTables)
- .withToTables(toTables).withOverwrite(isOverwrite).withToDateTime(endTime).build();
+ // TODO: Currently hardcoding keepOriginalSplits=false and restoreRootDir via tmp dir.
+ // These should come from user input (same issue exists in normal restore).
+ // Expose them as configurable options in future.
+ PointInTimeRestoreRequest pointInTimeRestoreRequest =
+ new PointInTimeRestoreRequest.Builder().withBackupRootDir(backupRootDir).withCheck(check)
+ .withFromTables(fromTables).withToTables(toTables).withOverwrite(isOverwrite)
+ .withToDateTime(endTime).withKeepOriginalSplits(false).withRestoreRootDir(
+ BackupUtils.getTmpRestoreOutputDir(FileSystem.get(conf), conf).toString())
+ .build();
client.pointInTimeRestore(pointInTimeRestoreRequest);
} catch (Exception e) {
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
index f2462a1cfd18..d7f69c05b683 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
@@ -27,25 +27,33 @@
public final class PointInTimeRestoreRequest {
private final String backupRootDir;
+ private final String restoreRootDir;
private final boolean check;
private final TableName[] fromTables;
private final TableName[] toTables;
private final boolean overwrite;
private final long toDateTime;
+ private final boolean isKeepOriginalSplits;
private PointInTimeRestoreRequest(Builder builder) {
this.backupRootDir = builder.backupRootDir;
+ this.restoreRootDir = builder.restoreRootDir;
this.check = builder.check;
this.fromTables = builder.fromTables;
this.toTables = builder.toTables;
this.overwrite = builder.overwrite;
this.toDateTime = builder.toDateTime;
+ this.isKeepOriginalSplits = builder.isKeepOriginalSplits;
}
public String getBackupRootDir() {
return backupRootDir;
}
+ public String getRestoreRootDir() {
+ return restoreRootDir;
+ }
+
public boolean isCheck() {
return check;
}
@@ -66,19 +74,30 @@ public long getToDateTime() {
return toDateTime;
}
+ public boolean isKeepOriginalSplits() {
+ return isKeepOriginalSplits;
+ }
+
public static class Builder {
private String backupRootDir;
+ private String restoreRootDir;
private boolean check = false;
private TableName[] fromTables;
private TableName[] toTables;
private boolean overwrite = false;
private long toDateTime;
+ private boolean isKeepOriginalSplits;
public Builder withBackupRootDir(String backupRootDir) {
this.backupRootDir = backupRootDir;
return this;
}
+ public Builder withRestoreRootDir(String restoreRootDir) {
+ this.restoreRootDir = restoreRootDir;
+ return this;
+ }
+
public Builder withCheck(boolean check) {
this.check = check;
return this;
@@ -104,6 +123,11 @@ public Builder withToDateTime(long dateTime) {
return this;
}
+ public Builder withKeepOriginalSplits(boolean isKeepOriginalSplits) {
+ this.isKeepOriginalSplits = isKeepOriginalSplits;
+ return this;
+ }
+
public PointInTimeRestoreRequest build() {
return new PointInTimeRestoreRequest(this);
}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index d7ffbb580938..ce6c4d4dc683 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -20,8 +20,8 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
@@ -30,6 +30,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -41,9 +42,12 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
+import org.apache.hadoop.hbase.backup.RestoreJob;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.backup.util.BulkFilesCollector;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
@@ -305,6 +309,63 @@ private void restoreTableWithWalReplay(TableName sourceTable, TableName targetTa
backupAdmin.restore(restoreRequest);
replayWal(sourceTable, targetTable, backupMetadata.getStartTs(), endTime);
+
+ reBulkloadFiles(sourceTable, targetTable, backupMetadata.getStartTs(), endTime,
+ request.isKeepOriginalSplits(), request.getRestoreRootDir());
+ }
+
+ /**
+ * Re-applies/re-bulkloads store files discovered from WALs into the target table.
+ *
+ * Note: this method re-uses the same {@link RestoreJob} MapReduce job that we originally
+ * implemented for performing full and incremental backup restores. The MR job (obtained via
+ * {@link BackupRestoreFactory#getRestoreJob(Configuration)}) is used here to perform an HFile
+ * bulk-load of the discovered store files into {@code targetTable}.
+ * @param sourceTable source table name (used for locating bulk files and logging)
+ * @param targetTable destination table to bulk-load the HFiles into
+ * @param startTime start of WAL range (ms)
+ * @param endTime end of WAL range (ms)
+ * @param keepOriginalSplits pass-through flag to control whether original region splits are
+ * preserved
+ * @param restoreRootDir local/DFS path under which temporary and output dirs are created
+ * @throws IOException on IO or job failure
+ */
+ private void reBulkloadFiles(TableName sourceTable, TableName targetTable, long startTime,
+ long endTime, boolean keepOriginalSplits, String restoreRootDir) throws IOException {
+
+ Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
+ conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, keepOriginalSplits);
+
+ String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ Path walDirPath = new Path(walBackupDir);
+ conf.set(RestoreJob.BACKUP_ROOT_PATH_KEY, walDirPath.toString());
+
+ RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
+
+ List bulkloadFiles =
+ collectBulkFiles(sourceTable, targetTable, startTime, endTime, new Path(restoreRootDir));
+
+ if (bulkloadFiles.isEmpty()) {
+ LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping bulkload restore.",
+ sourceTable, startTime, endTime);
+ return;
+ }
+
+ Path[] pathsArray = bulkloadFiles.toArray(new Path[0]);
+
+ try {
+ // Use the existing RestoreJob MR job (the same MapReduce job used for full/incremental
+ // restores)
+ // to perform the HFile bulk-load of the discovered store files into `targetTable`.
+ restoreService.run(pathsArray, new TableName[] { sourceTable }, new Path(restoreRootDir),
+ new TableName[] { targetTable }, false);
+ LOG.info("Re-bulkload completed for {}", targetTable);
+ } catch (Exception e) {
+ String errorMessage =
+ String.format("Re-bulkload failed for %s: %s", targetTable, e.getMessage());
+ LOG.error(errorMessage, e);
+ throw new IOException(errorMessage, e);
+ }
}
/**
@@ -329,6 +390,29 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT
executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
}
+ private List collectBulkFiles(TableName sourceTable, TableName targetTable, long startTime,
+ long endTime, Path restoreRootDir) throws IOException {
+
+ String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ Path walDirPath = new Path(walBackupDir);
+ LOG.info(
+ "Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}, restore root: {}",
+ sourceTable, targetTable, startTime, endTime, walDirPath, restoreRootDir);
+
+ List validDirs =
+ getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+ if (validDirs.isEmpty()) {
+ LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.",
+ startTime, endTime);
+ return Collections.emptyList();
+ }
+
+ String walDirsCsv = String.join(",", validDirs);
+
+ return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
+ walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime);
+ }
+
/**
* Fetches valid WAL directories based on the given time range.
*/
@@ -356,7 +440,7 @@ private List getValidWalDirs(Configuration conf, Path walBackupDir, long
validDirs.add(dayDir.getPath().toString());
}
} catch (ParseException e) {
- LOG.warn("Skipping invalid directory name: " + dirName, e);
+ LOG.warn("Skipping invalid directory name: {}", dirName, e);
}
}
return validDirs;
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index a30530a98fc6..f70bf627d176 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -81,7 +81,7 @@
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index a8ff5153c36f..c2aa0aa17fd1 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -19,9 +19,9 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import java.io.IOException;
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
new file mode 100644
index 000000000000..b752c7f78e01
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.TABLES_KEY;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.TABLE_MAP_KEY;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MapReduce job that scans WAL backups and extracts referenced bulk-load store-file paths.
+ *
+ * This job is intended to be used when you want a list of HFiles / store-files referenced by WAL
+ * bulk-load descriptors. It emits a de-duplicated list of full paths (one per line) by default
+ * using the {@link DedupReducer}.
+ *
+ *
+ * Usage (CLI):
+ * {@code BulkLoadCollector [ []]}
+ *
+ */
+@InterfaceAudience.Private
+public class BulkLoadCollectorJob extends Configured implements Tool {
+ private static final Logger LOG = LoggerFactory.getLogger(BulkLoadCollectorJob.class);
+
+ public static final String NAME = "BulkLoadCollector";
+ public static final String DEFAULT_REDUCERS = "1";
+
+ public BulkLoadCollectorJob() {
+ }
+
+ protected BulkLoadCollectorJob(final Configuration c) {
+ super(c);
+ }
+
+ /**
+ * Mapper that extracts relative bulk-load paths from a WAL entry (via {@code BulkLoadProcessor}),
+ * resolves them to full paths (via
+ * {@code BackupFileSystemManager#resolveBulkLoadFullPath(Path, Path)}), and emits each full path
+ * as the map key (Text). Uses the same table-filtering semantics as other WAL mappers: if no
+ * tables are configured, all tables are processed; otherwise only the configured table set is
+ * processed. Map output: (Text fullPathString, NullWritable)
+ */
+ public static class BulkLoadCollectorMapper extends Mapper {
+ private final Map tables = new TreeMap<>();
+ private final Text out = new Text();
+
+ @Override
+ protected void map(WALKey key, WALEdit value, Context context)
+ throws IOException, InterruptedException {
+ if (key == null) {
+ if (LOG.isTraceEnabled()) LOG.trace("map: received null WALKey, skipping");
+ return;
+ }
+ if (value == null) {
+ if (LOG.isTraceEnabled())
+ LOG.trace("map: received null WALEdit for table={}, skipping", safeTable(key));
+ return;
+ }
+
+ TableName tname = key.getTableName();
+
+ // table filtering
+ if (!(tables.isEmpty() || tables.containsKey(tname))) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("map: skipping table={} because it is not in configured table list", tname);
+ }
+ return;
+ }
+
+ // Extract relative store-file paths referenced by this WALEdit.
+ // Delegates parsing to BulkLoadProcessor so parsing logic is centralized.
+ List relativePaths = BulkLoadProcessor.processBulkLoadFiles(key, value);
+ if (relativePaths.isEmpty()) return;
+
+ // Determine WAL input path for this split (used to compute date/prefix for full path)
+ Path walInputPath;
+ try {
+ walInputPath =
+ new Path(((WALInputFormat.WALSplit) context.getInputSplit()).getLogFileName());
+ } catch (ClassCastException cce) {
+ String splitClass =
+ (context.getInputSplit() == null) ? "null" : context.getInputSplit().getClass().getName();
+ LOG.warn(
+ "map: unexpected InputSplit type (not WALSplit) - cannot determine WAL input path; context.getInputSplit() class={}",
+ splitClass, cce);
+ throw new IOException("Unexpected InputSplit type: expected WALSplit but got " + splitClass,
+ cce);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("map: walInputPath={} table={} relativePathsCount={}", walInputPath, tname,
+ relativePaths.size());
+ }
+
+ // Build full path for each relative path and emit it.
+ for (Path rel : relativePaths) {
+ Path full = BackupFileSystemManager.resolveBulkLoadFullPath(walInputPath, rel);
+ out.set(full.toString());
+ context.write(out, NullWritable.get());
+ context.getCounter("BulkCollector", "StoreFilesEmitted").increment(1);
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
+ String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
+ if (tableMap == null) {
+ tableMap = tablesToUse;
+ }
+ if (tablesToUse == null) {
+ // user requested all tables; tables map remains empty to indicate "all"
+ return;
+ }
+
+ if (tablesToUse.length != tableMap.length) {
+ throw new IOException("Incorrect table mapping specified.");
+ }
+
+ int i = 0;
+ for (String table : tablesToUse) {
+ TableName from = TableName.valueOf(table);
+ TableName to = TableName.valueOf(tableMap[i++]);
+ tables.put(from, to);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("setup: configuring mapping {} -> {}", from, to);
+ }
+ }
+ }
+
+ private String safeTable(WALKey key) {
+ try {
+ return key == null ? "" : key.getTableName().toString();
+ } catch (Exception e) {
+ return "";
+ }
+ }
+ }
+
+ /**
+ * Reducer that deduplicates full-path keys emitted by the mappers. It writes each unique key
+ * exactly once. Reduce input: (Text fullPathString, Iterable) Reduce output: (Text
+ * fullPathString, NullWritable)
+ */
+ public static class DedupReducer extends Reducer {
+ @Override
+ protected void reduce(Text key, Iterable values, Context ctx)
+ throws IOException, InterruptedException {
+ // Write the unique path once.
+ ctx.write(key, NullWritable.get());
+ }
+ }
+
+ /**
+ * Create and configure a Job instance for bulk-file collection.
+ * @param args CLI args expected to be: inputDirs bulkFilesOut [tables] [tableMap]
+ * @throws IOException on misconfiguration
+ */
+ public Job createSubmittableJob(String[] args) throws IOException {
+ Configuration conf = getConf();
+
+ setupTime(conf, WALInputFormat.START_TIME_KEY);
+ setupTime(conf, WALInputFormat.END_TIME_KEY);
+
+ if (args == null || args.length < 2) {
+ throw new IOException(
+ "Usage: [ []]");
+ }
+
+ String inputDirs = args[0];
+ String bulkFilesOut = args[1];
+
+ // tables are optional (args[2])
+ String[] tables = (args.length == 2) ? new String[] {} : args[2].split(",");
+ String[] tableMap;
+ if (args.length > 3) {
+ tableMap = args[3].split(",");
+ if (tableMap.length != tables.length) {
+ throw new IOException("The same number of tables and mapping must be provided.");
+ }
+ } else {
+ // if no mapping is specified, map each table to itself
+ tableMap = tables;
+ }
+
+ LOG.info("createSubmittableJob: inputDirs='{}' bulkFilesOut='{}' tables={} tableMap={}",
+ inputDirs, bulkFilesOut, String.join(",", tables), String.join(",", tableMap));
+
+ conf.setStrings(TABLES_KEY, tables);
+ conf.setStrings(TABLE_MAP_KEY, tableMap);
+ conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+
+ // create and return the actual Job configured for bulk-file discovery
+ return BulkLoadCollectorJob.createSubmittableJob(conf, inputDirs, bulkFilesOut);
+ }
+
+ /**
+ * Low-level job wiring. Creates the Job instance and sets input, mapper, reducer and output.
+ * @param conf configuration used for the job
+ * @param inputDirs WAL input directories (comma-separated)
+ * @param bulkFilesOut output directory to write discovered full-paths
+ * @throws IOException on invalid args
+ */
+ private static Job createSubmittableJob(Configuration conf, String inputDirs, String bulkFilesOut)
+ throws IOException {
+ if (bulkFilesOut == null || bulkFilesOut.isEmpty()) {
+ throw new IOException("bulkFilesOut (output dir) must be provided.");
+ }
+ if (inputDirs == null || inputDirs.isEmpty()) {
+ throw new IOException("inputDirs (WAL input dir) must be provided.");
+ }
+
+ Job job = Job.getInstance(conf, NAME + "_" + EnvironmentEdgeManager.currentTime());
+ job.setJarByClass(BulkLoadCollectorJob.class);
+
+ // Input: use same WALInputFormat used by WALPlayer so we parse WALs consistently
+ job.setInputFormatClass(WALInputFormat.class);
+ FileInputFormat.setInputDirRecursive(job, true);
+ FileInputFormat.setInputPaths(job, inputDirs);
+
+ // Mapper: extract and emit full bulk-load file paths (Text, NullWritable)
+ job.setMapperClass(BulkLoadCollectorMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ // Reducer: deduplicate the full-path keys
+ job.setReducerClass(DedupReducer.class);
+ // default to a single reducer (single deduped file); callers can set mapreduce.job.reduces
+ int reducers = conf.getInt("mapreduce.job.reduces", Integer.parseInt(DEFAULT_REDUCERS));
+ job.setNumReduceTasks(reducers);
+
+ // Output: write plain text lines (one path per line)
+ job.setOutputFormatClass(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, new Path(bulkFilesOut));
+
+ LOG.info("createSubmittableJob: created job name='{}' reducers={}", job.getJobName(), reducers);
+
+ String codecCls = WALCellCodec.getWALCellCodecClass(conf).getName();
+ try {
+ TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+ Class.forName(codecCls));
+ } catch (Exception e) {
+ throw new IOException("Cannot determine wal codec class " + codecCls, e);
+ }
+ return job;
+ }
+
+ /**
+ * Parse a time option. Supports the user-friendly ISO-like format
+ * {@code yyyy-MM-dd'T'HH:mm:ss.SS} or milliseconds since epoch. If the option is not present,
+ * this method is a no-op.
+ * @param conf configuration containing option
+ * @param option key to read (e.g. WALInputFormat.START_TIME_KEY)
+ * @throws IOException on parse failure
+ */
+ private void setupTime(Configuration conf, String option) throws IOException {
+ String val = conf.get(option);
+ if (val == null) {
+ return;
+ }
+ long ms;
+ try {
+ // first try to parse in user-friendly form
+ ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
+ } catch (ParseException pe) {
+ try {
+ // then see if a number (milliseconds) was specified
+ ms = Long.parseLong(val);
+ } catch (NumberFormatException nfe) {
+ throw new IOException(
+ option + " must be specified either in the form 2001-02-20T16:35:06.99 "
+ + "or as number of milliseconds");
+ }
+ }
+ conf.setLong(option, ms);
+ }
+
+ /**
+ * CLI entry point.
+ * @param args job arguments (see {@link #usage(String)})
+ * @throws Exception on job failure
+ */
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new BulkLoadCollectorJob(HBaseConfiguration.create()), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage("Wrong number of arguments: " + args.length);
+ System.exit(-1);
+ }
+
+ Job job = createSubmittableJob(args);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ /**
+ * Print usage/help for the BulkLoadCollectorJob CLI/driver.
+ *
+ *
+ *
+ * args layout:
+ * args[0] = input directory (required)
+ * args[1] = output directory (required)
+ * args[2] = tables (comma-separated) (optional)
+ * args[3] = tableMappings (comma-separated) (optional; must match tables length)
+ *
+ */
+ private void usage(final String errorMsg) {
+ if (errorMsg != null && !errorMsg.isEmpty()) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+
+ System.err.println(
+ "Usage: " + NAME + " [ []]");
+ System.err.println(
+ " directory of WALs to scan (comma-separated list accepted)");
+ System.err.println(
+ " directory to write discovered store-file paths (output)");
+ System.err.println(
+ " optional comma-separated list of tables to include; if omitted, all tables are processed");
+ System.err.println(
+ " optional comma-separated list of mapped target tables; must match number of tables");
+
+ System.err.println();
+ System.err.println("Time range options (either milliseconds or yyyy-MM-dd'T'HH:mm:ss.SS):");
+ System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
+ System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
+
+ System.err.println();
+ System.err.println("Configuration alternatives (can be provided via -D):");
+ System.err
+ .println(" -D" + TABLES_KEY + "= (alternative to arg[2])");
+ System.err
+ .println(" -D" + TABLE_MAP_KEY + "= (alternative to arg[3])");
+ System.err.println(
+ " -Dmapreduce.job.reduces= (number of reducers; default 1)");
+ System.err.println();
+
+ System.err.println("Performance hints:");
+ System.err.println(" For large inputs consider disabling speculative execution:");
+ System.err
+ .println(" -Dmapreduce.map.speculative=false -Dmapreduce.reduce.speculative=false");
+
+ System.err.println();
+ System.err.println("Example:");
+ System.err.println(
+ " " + NAME + " /wals/input /out/bulkfiles ns:tbl1,ns:tbl2 ns:tbl1_mapped,ns:tbl2_mapped");
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 7a2fce4c418a..4711cba46680 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -61,11 +61,10 @@ public void run(Path[] dirPaths, TableName[] tableNames, Path restoreRootDir,
String dirs = StringUtils.join(dirPaths, ",");
if (LOG.isDebugEnabled()) {
- LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
- + " backup from directory " + dirs + " from hbase tables "
- + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
- + " to tables "
- + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
+ LOG.debug("Restore {} from directory {} from hbase tables {} to tables {}",
+ fullBackupRestore ? "full backup" : "incremental backup / bulkload files (as part of PITR)",
+ dirs, StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND),
+ StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
}
for (int i = 0; i < tableNames.length; i++) {
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
deleted file mode 100644
index 225d32172766..000000000000
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.replication;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and
- * bulk-loaded files within the specified backup root directory.
- */
-@InterfaceAudience.Private
-public class BackupFileSystemManager {
- private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class);
-
- public static final String WALS_DIR = "WALs";
- public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
- private final String peerId;
- private final FileSystem backupFs;
- private final Path backupRootDir;
- private final Path walsDir;
- private final Path bulkLoadFilesDir;
-
- public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr)
- throws IOException {
- this.peerId = peerId;
- this.backupRootDir = new Path(backupRootDirStr);
- this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
- this.walsDir = createDirectory(WALS_DIR);
- this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
- }
-
- private Path createDirectory(String dirName) throws IOException {
- Path dirPath = new Path(backupRootDir, dirName);
- backupFs.mkdirs(dirPath);
- LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath);
- return dirPath;
- }
-
- public Path getWalsDir() {
- return walsDir;
- }
-
- public Path getBulkLoadFilesDir() {
- return bulkLoadFilesDir;
- }
-
- public FileSystem getBackupFs() {
- return backupFs;
- }
-}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index 19624d04c23d..19cd2733af7b 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -38,7 +38,9 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupFileSystemManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupFileSystemManager.java
new file mode 100644
index 000000000000..a616eb69e47f
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupFileSystemManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.backup.replication.Utils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and
+ * bulk-loaded files within the specified backup root directory.
+ */
+@InterfaceAudience.Private
+public class BackupFileSystemManager {
+ private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class);
+
+ public static final String WALS_DIR = "WALs";
+ public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
+ private final String peerId;
+ private final FileSystem backupFs;
+ private final Path backupRootDir;
+ private final Path walsDir;
+ private final Path bulkLoadFilesDir;
+
+ public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr)
+ throws IOException {
+ this.peerId = peerId;
+ this.backupRootDir = new Path(backupRootDirStr);
+ this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
+ this.walsDir = createDirectory(WALS_DIR);
+ this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
+ }
+
+ private Path createDirectory(String dirName) throws IOException {
+ Path dirPath = new Path(backupRootDir, dirName);
+ backupFs.mkdirs(dirPath);
+ LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath);
+ return dirPath;
+ }
+
+ public Path getWalsDir() {
+ return walsDir;
+ }
+
+ public Path getBulkLoadFilesDir() {
+ return bulkLoadFilesDir;
+ }
+
+ public FileSystem getBackupFs() {
+ return backupFs;
+ }
+
+ public static final class WalPathInfo {
+ private final Path prefixBeforeWALs;
+ private final String dateSegment;
+
+ public WalPathInfo(Path prefixBeforeWALs, String dateSegment) {
+ this.prefixBeforeWALs = prefixBeforeWALs;
+ this.dateSegment = dateSegment;
+ }
+
+ public Path getPrefixBeforeWALs() {
+ return prefixBeforeWALs;
+ }
+
+ public String getDateSegment() {
+ return dateSegment;
+ }
+ }
+
+ /**
+ * Validate the walPath has the expected structure: .../WALs// and return
+ * WalPathInfo(prefixBeforeWALs, dateSegment).
+ * @throws IOException if the path is not in expected format
+ */
+ public static WalPathInfo extractWalPathInfo(Path walPath) throws IOException {
+ if (walPath == null) {
+ throw new IllegalArgumentException("walPath must not be null");
+ }
+
+ Path dateDir = walPath.getParent(); // .../WALs/
+ if (dateDir == null) {
+ throw new IOException("Invalid WAL path: missing date directory. Path: " + walPath);
+ }
+
+ Path walsDir = dateDir.getParent(); // .../WALs
+ if (walsDir == null) {
+ throw new IOException("Invalid WAL path: missing WALs directory. Path: " + walPath);
+ }
+
+ String walsDirName = walsDir.getName();
+ if (!WALS_DIR.equals(walsDirName)) {
+ throw new IOException("Invalid WAL path: expected '" + WALS_DIR + "' segment but found '"
+ + walsDirName + "'. Path: " + walPath);
+ }
+
+ String dateSegment = dateDir.getName();
+ if (dateSegment == null || dateSegment.isEmpty()) {
+ throw new IOException("Invalid WAL path: date segment is empty. Path: " + walPath);
+ }
+
+ Path prefixBeforeWALs = walsDir.getParent(); // might be null if path is like "/WALs/..."
+ return new WalPathInfo(prefixBeforeWALs, dateSegment);
+ }
+
+ /**
+ * Resolve the full bulk-load file path corresponding to a relative bulk-load path referenced from
+ * a WAL file path. For a WAL path like: /some/prefix/.../WALs/23-08-2025/some-wal-file and a
+ * relative bulk path like: namespace/table/region/family/file, this returns:
+ * /some/prefix/.../bulk-load-files/23-08-2025/namespace/table/region/family/file
+ * @param walPath the Path to the WAL file (must contain the {@link #WALS_DIR} segment
+ * followed by date)
+ * @param relativeBulkPath the relative bulk-load file Path
+ * @return resolved full Path for the bulk-load file
+ * @throws IOException if the WAL path does not contain the expected segments
+ */
+ public static Path resolveBulkLoadFullPath(Path walPath, Path relativeBulkPath)
+ throws IOException {
+ WalPathInfo info = extractWalPathInfo(walPath);
+
+ Path prefixBeforeWALs = info.getPrefixBeforeWALs();
+ String dateSegment = info.getDateSegment();
+
+ Path full; // Build final path:
+ // /bulk-load-files//
+ if (prefixBeforeWALs == null || prefixBeforeWALs.toString().isEmpty()) {
+ full = new Path(BULKLOAD_FILES_DIR, new Path(dateSegment, relativeBulkPath));
+ } else {
+ full = new Path(new Path(prefixBeforeWALs, BULKLOAD_FILES_DIR),
+ new Path(dateSegment, relativeBulkPath));
+ }
+ return full;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java
new file mode 100644
index 000000000000..718a662abb7b
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.mapreduce.BulkLoadCollectorJob;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility to run BulkLoadCollectorJob over a comma-separated list of WAL directories and return a
+ * deduplicated list of discovered bulk-load file paths.
+ */
+@InterfaceAudience.Private
+public final class BulkFilesCollector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BulkFilesCollector.class);
+
+ private BulkFilesCollector() {
+ /* static only */ }
+
+ /**
+ * Convenience overload: collector will create and configure BulkLoadCollectorJob internally.
+ * @param conf cluster/configuration used to initialize job and access FS
+ * @param walDirsCsv comma-separated WAL directories
+ * @param restoreRootDir parent path under which temporary output dir will be created
+ * @param sourceTable source table name (for args/logging)
+ * @param targetTable target table name (for args/logging)
+ * @param startTime start time (ms) to set in the job config (WALInputFormat.START_TIME_KEY)
+ * @param endTime end time (ms) to set in the job config (WALInputFormat.END_TIME_KEY)
+ * @return deduplicated list of Paths discovered by the collector
+ * @throws IOException on IO or job failure
+ */
+ public static List collectFromWalDirs(Configuration conf, String walDirsCsv,
+ Path restoreRootDir, TableName sourceTable, TableName targetTable, long startTime, long endTime)
+ throws IOException {
+
+ // prepare job Tool
+ Configuration jobConf = new Configuration(conf);
+ if (startTime > 0) jobConf.setLong(WALInputFormat.START_TIME_KEY, startTime);
+ if (endTime > 0) jobConf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+
+ // ignore empty WAL files by default to make collection robust
+ jobConf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true);
+
+ BulkLoadCollectorJob bulkCollector = new BulkLoadCollectorJob();
+ bulkCollector.setConf(jobConf);
+
+ return collectFromWalDirs(conf, walDirsCsv, restoreRootDir, sourceTable, targetTable,
+ bulkCollector);
+ }
+
+ /**
+ * Primary implementation: runs the provided Tool (BulkLoadCollectorJob) with args "
+ * " and returns deduped list of Paths.
+ */
+ public static List collectFromWalDirs(Configuration conf, String walDirsCsv,
+ Path restoreRootDir, TableName sourceTable, TableName targetTable, Tool bulkCollector)
+ throws IOException {
+
+ if (walDirsCsv == null || walDirsCsv.trim().isEmpty()) {
+ throw new IOException(
+ "walDirsCsv must be a non-empty comma-separated list of WAL directories");
+ }
+
+ List walDirs =
+ Arrays.stream(walDirsCsv.split(",")).map(String::trim).filter(s -> !s.isEmpty()).toList();
+
+ if (walDirs.isEmpty()) {
+ throw new IOException("walDirsCsv did not contain any entries: '" + walDirsCsv + "'");
+ }
+
+ List existing = new ArrayList<>();
+ for (String d : walDirs) {
+ Path p = new Path(d);
+ try {
+ FileSystem fsForPath = p.getFileSystem(conf);
+ if (fsForPath.exists(p)) {
+ existing.add(d);
+ } else {
+ LOG.debug("WAL dir does not exist: {}", d);
+ }
+ } catch (IOException e) {
+ // If getting FS or checking existence fails, treat as missing but log the cause.
+ LOG.warn("Error checking WAL dir {}: {}", d, e.toString());
+ }
+ }
+
+ // If any of the provided walDirs are missing, fail with an informative message.
+ List missing = new ArrayList<>(walDirs);
+ missing.removeAll(existing);
+
+ if (!missing.isEmpty()) {
+ throw new IOException(
+ "Some of the provided WAL paths do not exist: " + String.join(", ", missing));
+ }
+
+ // Create unique temporary output dir under restoreRootDir, e.g.
+ // /_wal_collect_
+ final String unique = String.format("_wal_collect_%s%d", sourceTable.getQualifierAsString(),
+ EnvironmentEdgeManager.currentTime());
+ final Path bulkFilesOut = new Path(restoreRootDir, unique);
+
+ FileSystem fs = bulkFilesOut.getFileSystem(conf);
+
+ try {
+ // If bulkFilesOut exists for some reason, delete it.
+ if (fs.exists(bulkFilesOut)) {
+ LOG.info("Temporary bulkload file collect output directory {} already exists - deleting.",
+ bulkFilesOut);
+ fs.delete(bulkFilesOut, true);
+ }
+
+ final String[] args = new String[] { walDirsCsv, bulkFilesOut.toString(),
+ sourceTable.getNameAsString(), targetTable.getNameAsString() };
+
+ LOG.info("Running bulk collector Tool with args: {}", (Object) args);
+
+ int exitCode;
+ try {
+ exitCode = bulkCollector.run(args);
+ } catch (Exception e) {
+ LOG.error("Error during BulkLoadCollectorJob for {}: {}", sourceTable, e.getMessage(), e);
+ throw new IOException("Exception during BulkLoadCollectorJob collect", e);
+ }
+
+ if (exitCode != 0) {
+ throw new IOException("Bulk collector Tool returned non-zero exit code: " + exitCode);
+ }
+
+ LOG.info("BulkLoadCollectorJob collect completed successfully for {}", sourceTable);
+
+ // read and dedupe
+ List results = readBulkFilesListFromOutput(fs, bulkFilesOut);
+ LOG.info("BulkFilesCollector: discovered {} unique bulk-load files", results.size());
+ return results;
+ } finally {
+ // best-effort cleanup
+ try {
+ if (fs.exists(bulkFilesOut)) {
+ boolean deleted = fs.delete(bulkFilesOut, true);
+ if (!deleted) {
+ LOG.warn("Could not delete temporary bulkFilesOut directory {}", bulkFilesOut);
+ } else {
+ LOG.debug("Deleted temporary bulkFilesOut directory {}", bulkFilesOut);
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Exception while deleting temporary bulkload file collect output dir {}: {}",
+ bulkFilesOut, ioe.getMessage(), ioe);
+ }
+ }
+ }
+
+ // reads all non-hidden files under bulkFilesOut, collects lines in insertion order, returns Paths
+ private static List readBulkFilesListFromOutput(FileSystem fs, Path bulkFilesOut)
+ throws IOException {
+ if (!fs.exists(bulkFilesOut)) {
+ LOG.warn("BulkFilesCollector: bulkFilesOut directory does not exist: {}", bulkFilesOut);
+ return new ArrayList<>();
+ }
+
+ RemoteIterator it = fs.listFiles(bulkFilesOut, true);
+ Set dedupe = new LinkedHashSet<>();
+
+ while (it.hasNext()) {
+ LocatedFileStatus status = it.next();
+ Path p = status.getPath();
+ String name = p.getName();
+ // skip hidden/system files like _SUCCESS or _logs
+ if (name.startsWith("_") || name.startsWith(".")) continue;
+
+ try (FSDataInputStream in = fs.open(p);
+ BufferedReader br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ line = line.trim();
+ if (line.isEmpty()) continue;
+ dedupe.add(line);
+ }
+ }
+ }
+
+ List result = new ArrayList<>(dedupe.size());
+ for (String s : dedupe)
+ result.add(new Path(s));
+
+ LOG.info("Collected {} unique bulk-load store files.", result.size());
+ return result;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkLoadProcessor.java
similarity index 63%
rename from hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
rename to hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkLoadProcessor.java
index 6e1271313bcd..4ab8bfb104e4 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkLoadProcessor.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.backup.replication;
+package org.apache.hadoop.hbase.backup.util;
import java.io.IOException;
import java.util.ArrayList;
@@ -26,19 +26,16 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/**
- * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase replication.
+ * Processes bulk load files from Write-Ahead Log (WAL) entries.
*
- * This utility class extracts and constructs the file paths of bulk-loaded files based on WAL
- * entries. It processes bulk load descriptors and their associated store descriptors to generate
- * the paths for each bulk-loaded file.
- *
- * The class is designed for scenarios where replicable bulk load operations need to be parsed and
- * their file paths need to be determined programmatically.
+ * Used by backup/restore and replication flows to discover HFiles referenced by bulk-load WALEdits.
+ * Returned {@link Path}s are constructed from the namespace/table/region/family/file components.
*
*/
@InterfaceAudience.Private
@@ -46,20 +43,41 @@ public final class BulkLoadProcessor {
private BulkLoadProcessor() {
}
+ /**
+ * Extract bulk-load file {@link Path}s from a list of {@link WAL.Entry}.
+ * @param walEntries list of WAL entries.
+ * @return list of Paths in discovery order; empty list if none
+ * @throws IOException if descriptor parsing fails
+ */
public static List processBulkLoadFiles(List walEntries) throws IOException {
List bulkLoadFilePaths = new ArrayList<>();
for (WAL.Entry entry : walEntries) {
- WALEdit edit = entry.getEdit();
- for (Cell cell : edit.getCells()) {
- if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
- TableName tableName = entry.getKey().getTableName();
- String namespace = tableName.getNamespaceAsString();
- String table = tableName.getQualifierAsString();
- bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table));
- }
+ bulkLoadFilePaths.addAll(processBulkLoadFiles(entry.getKey(), entry.getEdit()));
+ }
+ return bulkLoadFilePaths;
+ }
+
+ /**
+ * Extract bulk-load file {@link Path}s from a single WAL entry.
+ * @param key WALKey containing table information; if null returns empty list
+ * @param edit WALEdit to scan; if null returns empty list
+ * @return list of Paths referenced by bulk-load descriptor(s) in this edit; may be empty or
+ * contain duplicates
+ * @throws IOException if descriptor parsing fails
+ */
+ public static List processBulkLoadFiles(WALKey key, WALEdit edit) throws IOException {
+ List bulkLoadFilePaths = new ArrayList<>();
+
+ for (Cell cell : edit.getCells()) {
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ TableName tableName = key.getTableName();
+ String namespace = tableName.getNamespaceAsString();
+ String table = tableName.getQualifierAsString();
+ bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table));
}
}
+
return bulkLoadFilePaths;
}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
index ae26cf960501..24f5237866db 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
@@ -28,9 +28,17 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,4 +112,32 @@ public static int getRowCount(HBaseTestingUtil testUtil, TableName tableName) th
return HBaseTestingUtil.countRows(table);
}
}
+
+ public static void generateHFiles(Path outputDir, Configuration conf, String cfName)
+ throws IOException {
+ String hFileName = "MyHFile";
+ int numRows = 1000;
+
+ FileSystem fs = FileSystem.get(conf);
+ outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+ byte[] from = Bytes.toBytes(cfName + "begin");
+ byte[] to = Bytes.toBytes(cfName + "end");
+
+ Path familyDir = new Path(outputDir, cfName);
+ HFileTestUtil.createHFile(conf, fs, new Path(familyDir, hFileName), Bytes.toBytes(cfName),
+ Bytes.toBytes("qualifier"), from, to, numRows);
+ }
+
+ public static void bulkLoadHFiles(TableName tableName, Path inputDir, Connection conn,
+ Configuration conf) throws IOException {
+ conf.setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
+
+ try (Table table = conn.getTable(tableName)) {
+ BulkLoadHFiles loader = new BulkLoadHFilesTool(conf);
+ loader.bulkLoad(table.getName(), inputDir);
+ } finally {
+ conf.setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
+ }
+ }
}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
index 8bd2fe4cc78c..c6c6f5e9799e 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
@@ -19,9 +19,9 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -42,7 +42,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
-import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ToolRunner;
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
index a1ce9c97a687..54667752f01b 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
@@ -36,6 +37,21 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
+/**
+ * Integration-style tests for Point-in-Time Restore (PITR).
+ *
+ * These tests exercise the full backup / continuous backup / restore flow: - create backups at
+ * multiple historical points in time (via {@code BackupDriver}) - exercise WAL-based
+ * replication/continuous backup - validate Point-in-Time Restore behavior (successful restores,
+ * failure cases)
+ *
+ *
+ * NOTE: Some tests also create HFiles and perform HBase bulk-loads (HFile -> table) so the restore
+ * flow is validated when bulk-loaded storefiles are present in WALs. This ensures the
+ * BulkLoadCollector/BulkFilesCollector logic (discovering bulk-loaded store files referenced from
+ * WAL bulk-load descriptors) is exercised by the test suite.
+ *
+ */
@Category(LargeTests.class)
public class TestPointInTimeRestore extends TestBackupBase {
@ClassRule
@@ -67,8 +83,8 @@ private static void setUpBackups() throws Exception {
// Simulate a backup taken 20 days ago
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS);
- PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert initial data into
- // table1
+ // Insert initial data into table1
+ PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);
// Perform a full backup for table1 with continuous backup enabled
String[] args =
@@ -80,6 +96,12 @@ private static void setUpBackups() throws Exception {
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 15 * ONE_DAY_IN_MILLISECONDS);
PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Add more data to table1
+
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
+ PITRTestUtil.generateHFiles(dir, TEST_UTIL.getConfiguration(), Bytes.toString(famName));
+ PITRTestUtil.bulkLoadHFiles(table1, dir, TEST_UTIL.getConnection(),
+ TEST_UTIL.getConfiguration());
+
PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Insert data into table2
PITRTestUtil.waitForReplication(); // Ensure replication is complete
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
index 15ab2b2bdbe1..30ee495df55b 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
@@ -20,9 +20,9 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure;
import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJob.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJob.java
new file mode 100644
index 000000000000..20295d7e4ea3
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJob.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+/**
+ * Unit tests for BulkLoadCollectorJob (mapper, reducer and job creation/validation).
+ */
+@Category({ MapReduceTests.class, LargeTests.class })
+public class TestBulkLoadCollectorJob {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBulkLoadCollectorJob.class);
+
+ private Configuration conf;
+
+ @Before
+ public void setUp() {
+ // fresh configuration for each test
+ conf = HBaseConfiguration.create();
+ }
+
+ @After
+ public void tearDown() {
+ // nothing for now
+ }
+
+ /**
+ * Ensures {@link BulkLoadCollectorJob#createSubmittableJob(String[])} correctly configures
+ * input/output paths and parses time options into the job configuration.
+ */
+ @Test
+ public void testCreateSubmittableJobValid() throws Exception {
+ // set a start time option to make sure setupTime runs and applies it
+ String dateStr = "2001-02-20T16:35:06.99";
+ conf.set(WALInputFormat.START_TIME_KEY, dateStr);
+
+ BulkLoadCollectorJob jobDriver = new BulkLoadCollectorJob(conf);
+ String inputDirs = new Path("file:/wals/input").toString();
+ String outDir = new Path("file:/out/bulk").toString();
+ Job job = jobDriver.createSubmittableJob(new String[] { inputDirs, outDir });
+
+ // Input path set
+ Path[] inPaths = FileInputFormat.getInputPaths(job);
+ assertEquals(1, inPaths.length);
+ assertEquals(inputDirs, inPaths[0].toString());
+
+ // Output path set
+ Path out = FileOutputFormat.getOutputPath(job);
+ assertEquals(new Path(outDir), out);
+
+ // Ensure the conf had START_TIME_KEY parsed to a long (setupTime executed)
+ long parsed = conf.getLong(WALInputFormat.START_TIME_KEY, -1L);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS");
+ long expected = sdf.parse(dateStr).getTime();
+ assertEquals(expected, parsed);
+ }
+
+ /**
+ * Verifies that {@link BulkLoadCollectorJob#createSubmittableJob(String[])} throws an IOException
+ * when called with insufficient or null arguments.
+ */
+ @Test(expected = IOException.class)
+ public void testCreateSubmittableJob_throwsForInsufficientArgs() throws Exception {
+ BulkLoadCollectorJob jobDriver = new BulkLoadCollectorJob(conf);
+ // this call must throw IOException for the test to pass
+ jobDriver.createSubmittableJob(new String[] { "file:/only/one/arg" });
+ }
+
+ @Test(expected = IOException.class)
+ public void testCreateSubmittableJob_throwsForNullArgs() throws Exception {
+ BulkLoadCollectorJob jobDriver = new BulkLoadCollectorJob(conf);
+ // this call must throw IOException for the test to pass
+ jobDriver.createSubmittableJob(null);
+ }
+
+ /**
+ * Verifies that {@link BulkLoadCollectorJob.BulkLoadCollectorMapper} ignores WAL entries whose
+ * table is not present in the configured tables map.
+ */
+ @Test
+ public void testMapperIgnoresWhenTableNotInMap() throws Exception {
+ // Prepare mapper and a mocked MapReduce context
+ BulkLoadCollectorJob.BulkLoadCollectorMapper mapper =
+ new BulkLoadCollectorJob.BulkLoadCollectorMapper();
+ @SuppressWarnings("unchecked")
+ Mapper.Context ctx = mock(Mapper.Context.class);
+
+ // Build a Configuration that only allows a single table: ns:allowed
+ // Note: TABLES_KEY / TABLE_MAP_KEY are the same constants used by the mapper.setup(...)
+ Configuration cfgForTest = new Configuration(conf);
+ cfgForTest.setStrings(WALPlayer.TABLES_KEY, "ns:allowed");
+ cfgForTest.setStrings(WALPlayer.TABLE_MAP_KEY, "ns:allowed"); // maps to itself
+
+ // Have the mocked context return our test configuration when mapper.setup() runs
+ when(ctx.getConfiguration()).thenReturn(cfgForTest);
+ mapper.setup(ctx);
+
+ // Create a WALKey for a table that is NOT in the allowed map (ns:other)
+ WALKey keyForOtherTable = mock(WALKey.class);
+ when(keyForOtherTable.getTableName()).thenReturn(TableName.valueOf("ns:other"));
+ WALEdit walEdit = mock(WALEdit.class);
+
+ // Static-mock BulkLoadProcessor to ensure it would not be relied on:
+ // even if invoked unexpectedly, it returns a non-empty list, but we will assert no writes
+ // occurred.
+ try (MockedStatic proc = Mockito.mockStatic(BulkLoadProcessor.class)) {
+ proc.when(() -> BulkLoadProcessor.processBulkLoadFiles(any(), any()))
+ .thenReturn(Collections.singletonList(new Path("x")));
+
+ // Invoke mapper - because the table is not allowed, mapper should do nothing
+ mapper.map(keyForOtherTable, walEdit, ctx);
+
+ // Assert: mapper did not write any output to the context
+ verify(ctx, never()).write(any(Text.class), any(NullWritable.class));
+ }
+ }
+
+ /**
+ * Verifies that {@link BulkLoadCollectorJob.BulkLoadCollectorMapper} safely handles null inputs.
+ *
+ * The mapper should ignore WAL entries when either the WAL key or the WALEdit value is null, and
+ * must not emit any output in those cases.
+ *
+ * @throws Exception on test failure
+ */
+ @Test
+ public void testMapperHandlesNullKeyOrValue() throws Exception {
+ BulkLoadCollectorJob.BulkLoadCollectorMapper mapper =
+ new BulkLoadCollectorJob.BulkLoadCollectorMapper();
+ @SuppressWarnings("unchecked")
+ Mapper.Context ctx = mock(Mapper.Context.class);
+ when(ctx.getConfiguration()).thenReturn(conf);
+ mapper.setup(ctx);
+
+ // null key
+ mapper.map(null, mock(WALEdit.class), ctx);
+ // null value
+ mapper.map(mock(WALKey.class), null, ctx);
+
+ // ensure no writes
+ verify(ctx, never()).write(any(Text.class), any(NullWritable.class));
+ }
+
+ /**
+ * Verifies that {@link BulkLoadCollectorJob.DedupReducer} writes each unique key exactly once.
+ */
+ @Test
+ public void testDedupReducerWritesOnce() throws Exception {
+ BulkLoadCollectorJob.DedupReducer reducer = new BulkLoadCollectorJob.DedupReducer();
+ @SuppressWarnings("unchecked")
+ Reducer.Context ctx = mock(Reducer.Context.class);
+
+ Text key = new Text("/some/path");
+
+ // Simulate three duplicate values for the same key; reducer should still write the key once.
+ Iterable vals =
+ Arrays.asList(NullWritable.get(), NullWritable.get(), NullWritable.get());
+
+ reducer.reduce(key, vals, ctx);
+
+ // verify exactly once write with the same key
+ verify(ctx, times(1)).write(eq(key), eq(NullWritable.get()));
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJobIntegration.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJobIntegration.java
new file mode 100644
index 000000000000..b72f931732a4
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJobIntegration.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Integration-like unit test for BulkLoadCollectorJob.
+ *
+ * - Creates a WAL with a BULK_LOAD descriptor (ProtobufLogWriter).
+ *
+ * - Runs BulkLoadCollectorJob.
+ *
+ * - Verifies the job emits the expected store-file path.
+ */
+@Category({ MapReduceTests.class, LargeTests.class })
+public class TestBulkLoadCollectorJobIntegration {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBulkLoadCollectorJobIntegration.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestBulkLoadCollectorJobIntegration.class);
+ private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ private static Configuration conf;
+ private static FileSystem fs;
+ private static Path hbaseDir;
+ static final TableName tableName = TableName.valueOf(getName());
+ static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
+ private static final byte[] family = Bytes.toBytes("column");
+ private static Path logDir;
+ protected MultiVersionConcurrencyControl mvcc;
+ protected static NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+ private static String getName() {
+ return "TestBulkLoadCollectorJobIntegration";
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ if (hbaseDir != null && fs != null) fs.delete(hbaseDir, true);
+ mvcc = new MultiVersionConcurrencyControl();
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ conf.setInt("dfs.blocksize", 1024 * 1024);
+ conf.setInt("dfs.replication", 1);
+
+ // Start a mini DFS cluster
+ TEST_UTIL.startMiniDFSCluster(3);
+
+ conf = TEST_UTIL.getConfiguration();
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+
+ hbaseDir = TEST_UTIL.createRootDir();
+
+ // Use a deterministic test WAL directory under the test filesystem
+ logDir = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "WALs/23-11-2024");
+ fs.mkdirs(logDir);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (fs != null && hbaseDir != null) fs.delete(hbaseDir, true);
+ TEST_UTIL.shutdownMiniDFSCluster();
+ }
+
+ /**
+ * Test that BulkLoadCollectorJob discovers and emits store-file paths from WAL files created
+ * using WALFactory (no RegionServer needed).
+ */
+ @Test
+ public void testBulkLoadCollectorEmitsStoreFilesFromWAL() throws Exception {
+ // Create WAL entry with BULK_LOAD descriptor
+ final String storeFileName = "storefile-abc.hfile";
+ WAL.Entry entry =
+ createBulkLoadWalEntry(info.getEncodedName(), Bytes.toString(family), storeFileName);
+
+ // Verify the processor would extract relative paths
+ List relativePaths =
+ BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), entry.getEdit());
+ LOG.debug("BulkLoadProcessor returned {} relative path(s): {}", relativePaths.size(),
+ relativePaths);
+ assertEquals("Expected exactly one relative path from BulkLoadProcessor", 1,
+ relativePaths.size());
+
+ // Build WAL file path and write WAL using ProtobufLogWriter into logDir
+ String walFileName = "wal-" + EnvironmentEdgeManager.currentTime();
+ Path walFilePath = new Path(logDir, walFileName);
+ fs.mkdirs(logDir);
+
+ FSHLogProvider.Writer writer = null;
+ try {
+ writer = new ProtobufLogWriter();
+ long blockSize = WALUtil.getWALBlockSize(conf, fs, walFilePath);
+ writer.init(fs, walFilePath, conf, true, blockSize,
+ StreamSlowMonitor.create(conf, walFileName));
+ writer.append(entry);
+ writer.sync(true);
+ writer.close();
+ } catch (Exception e) {
+ throw new IOException("Failed to write WAL via ProtobufLogWriter", e);
+ } finally {
+ try {
+ if (writer != null) writer.close();
+ } catch (Exception ignore) {
+ }
+ }
+
+ // Assert WAL file exists and has content
+ boolean exists = fs.exists(walFilePath);
+ long len = exists ? fs.getFileStatus(walFilePath).getLen() : -1L;
+ assertTrue("WAL file should exist at " + walFilePath, exists);
+ assertTrue("WAL file should have non-zero length, actual=" + len, len > 0);
+
+ // Run the MR job
+ Path walInputDir = logDir;
+ Path outDir = new Path("/tmp/test-bulk-files-output-" + System.currentTimeMillis());
+
+ int res = ToolRunner.run(TEST_UTIL.getConfiguration(),
+ new BulkLoadCollectorJob(TEST_UTIL.getConfiguration()),
+ new String[] { walInputDir.toString(), outDir.toString() });
+ assertEquals("BulkLoadCollectorJob should exit with code 0", 0, res);
+
+ // Inspect job output
+ FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+ assertTrue("Output directory should exist", dfs.exists(outDir));
+
+ List partFiles = Arrays.stream(dfs.listStatus(outDir)).map(FileStatus::getPath)
+ .filter(p -> p.getName().startsWith("part-")).toList();
+
+ assertFalse("Expect at least one part file in output", partFiles.isEmpty());
+
+ // Read all lines (collect while stream is open)
+ List lines = partFiles.stream().flatMap(p -> {
+ try (FSDataInputStream in = dfs.open(p);
+ BufferedReader r = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
+ List fileLines = r.lines().toList();
+ return fileLines.stream();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }).toList();
+
+ assertFalse("Job should have emitted at least one storefile path", lines.isEmpty());
+
+ boolean found = lines.stream().anyMatch(l -> l.contains(storeFileName));
+ assertTrue(
+ "Expected emitted path to contain store file name: " + storeFileName + " ; got: " + lines,
+ found);
+
+ // cleanup
+ dfs.delete(outDir, true);
+ }
+
+ private WAL.Entry createBulkLoadWalEntry(String regionName, String family, String... storeFiles) {
+
+ WALProtos.StoreDescriptor.Builder storeDescBuilder =
+ WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family))
+ .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles));
+
+ WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = WALProtos.BulkLoadDescriptor.newBuilder()
+ .setReplicate(true).setEncodedRegionName(ByteString.copyFromUtf8(regionName))
+ .setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000)
+ .addStores(storeDescBuilder);
+
+ byte[] valueBytes = bulkDescBuilder.build().toByteArray();
+
+ WALEdit edit = new WALEdit();
+ Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put)
+ .setRow(new byte[] { 1 }).setFamily(WALEdit.METAFAMILY).setQualifier(WALEdit.BULK_LOAD)
+ .setValue(valueBytes).build();
+ edit.add(cell);
+
+ long ts = EnvironmentEdgeManager.currentTime();
+ WALKeyImpl key = getWalKeyImpl(ts, scopes);
+ return new WAL.Entry(key, edit);
+ }
+
+ protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap scopes) {
+ return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index cc9200882e3d..78991a463da1 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -19,8 +19,6 @@
import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
@@ -29,6 +27,8 @@
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -65,6 +65,7 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBackupFileSystemManager.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBackupFileSystemManager.java
new file mode 100644
index 000000000000..2cca13bf19ca
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBackupFileSystemManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Unit tests for {@link BackupFileSystemManager}.
+ */
+@Category(SmallTests.class)
+public class TestBackupFileSystemManager {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBackupFileSystemManager.class);
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * extractWalPathInfo: happy path where WALs dir has a prefix. e.g.
+ * /some/prefix/WALs/2025-09-14/some-wal
+ */
+ @Test
+ public void testExtractWalPathInfo_withPrefix() throws Exception {
+ Path walPath =
+ new Path("/some/prefix/" + BackupFileSystemManager.WALS_DIR + "/2025-09-14/wal-000");
+ BackupFileSystemManager.WalPathInfo info = BackupFileSystemManager.extractWalPathInfo(walPath);
+
+ assertNotNull("WalPathInfo should not be null", info);
+ // prefixBeforeWALs should be "/some/prefix"
+ assertEquals("/some/prefix", info.getPrefixBeforeWALs().toString());
+ assertEquals("2025-09-14", info.getDateSegment());
+ }
+
+ /**
+ * extractWalPathInfo: case where WALs is at root (leading slash). e.g. /WALs/2025-09-14/some-wal
+ * Expect prefixBeforeWALs to be "/" (root) or non-null; resolution should still work.
+ */
+ @Test
+ public void testExtractWalPathInfo_rootWALs() throws Exception {
+ Path walPath = new Path("/" + BackupFileSystemManager.WALS_DIR + "/2025-09-14/wal-123");
+ BackupFileSystemManager.WalPathInfo info = BackupFileSystemManager.extractWalPathInfo(walPath);
+
+ assertNotNull(info);
+ // parent of "/WALs" in Hadoop Path is "/" (root). Ensure date segment parsed.
+ assertEquals("2025-09-14", info.getDateSegment());
+ assertNotNull("prefixBeforeWALs should not be null for root-style path",
+ info.getPrefixBeforeWALs());
+ // prefix might be "/" (expected), be tolerant: assert it ends with "/" or equals "/"
+ assertTrue(info.getPrefixBeforeWALs().toString().equals("/")
+ || !info.getPrefixBeforeWALs().toString().isEmpty());
+ }
+
+ /**
+ * extractWalPathInfo: null input should throw IllegalArgumentException.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testExtractWalPathInfo_nullPath() throws Exception {
+ BackupFileSystemManager.extractWalPathInfo(null);
+ }
+
+ /**
+ * extractWalPathInfo: missing date directory -> should throw IOException. Example: path that has
+ * no parent for the wal file.
+ */
+ @Test(expected = IOException.class)
+ public void testExtractWalPathInfo_missingDateDir() throws Exception {
+ // A single segment path (no parents) e.g. "walfile"
+ Path walPath = new Path("walfile");
+ BackupFileSystemManager.extractWalPathInfo(walPath);
+ }
+
+ /**
+ * extractWalPathInfo: WALs segment name mismatch -> should throw IOException. e.g.
+ * /prefix/NOT_WALs/2025/wal
+ */
+ @Test(expected = IOException.class)
+ public void testExtractWalPathInfo_wrongWALsegment() throws Exception {
+ Path walPath = new Path("/prefix/NOT_WALS/2025/wal");
+ BackupFileSystemManager.extractWalPathInfo(walPath);
+ }
+
+ /**
+ * resolveBulkLoadFullPath: normal case with prefix.
+ */
+ @Test
+ public void testResolveBulkLoadFullPath_withPrefix() throws Exception {
+ Path walPath =
+ new Path("/some/prefix/" + BackupFileSystemManager.WALS_DIR + "/2025-08-30/wal-1");
+ Path relative = new Path("namespace/table/region/family/file1");
+ Path full = BackupFileSystemManager.resolveBulkLoadFullPath(walPath, relative);
+
+ // expected: /some/prefix/bulk-load-files/2025-08-30/namespace/table/region/family/file1
+ String expected = "/some/prefix/" + BackupFileSystemManager.BULKLOAD_FILES_DIR
+ + "/2025-08-30/namespace/table/region/family/file1";
+ assertEquals(expected, full.toString());
+ }
+
+ /**
+ * resolveBulkLoadFullPath: when WALs is under root (prefix is root) - ensure path resolved under
+ * /bulk-load-files//...
+ */
+ @Test
+ public void testResolveBulkLoadFullPath_rootWALs() throws Exception {
+ Path walPath = new Path("/" + BackupFileSystemManager.WALS_DIR + "/2025-08-30/wal-2");
+ Path relative = new Path("ns/tbl/r/f");
+ Path full = BackupFileSystemManager.resolveBulkLoadFullPath(walPath, relative);
+
+ String expected = "/" + BackupFileSystemManager.BULKLOAD_FILES_DIR + "/2025-08-30/ns/tbl/r/f";
+ assertEquals(expected, full.toString());
+ }
+
+ /**
+ * Integration-y test: constructor should create directories under the provided backup root. Uses
+ * a temporary folder (local fs).
+ */
+ @Test
+ public void testConstructorCreatesDirectories() throws Exception {
+ File root = tmp.newFolder("backupRoot");
+ String rootPath = root.getAbsolutePath();
+
+ Configuration conf = HBaseConfiguration.create();
+ BackupFileSystemManager mgr = new BackupFileSystemManager("peer-1", conf, rootPath);
+
+ FileSystem fs = mgr.getBackupFs();
+ Path wals = mgr.getWalsDir();
+ Path bulk = mgr.getBulkLoadFilesDir();
+
+ assertTrue("WALs dir should exist", fs.exists(wals));
+ assertTrue("bulk-load-files dir should exist", fs.exists(bulk));
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBulkLoadProcessor.java
similarity index 69%
rename from hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java
rename to hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBulkLoadProcessor.java
index 9837f9e926d2..1d3b8ab09eaa 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBulkLoadProcessor.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.backup.replication;
+package org.apache.hadoop.hbase.backup.util;
import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
import static org.junit.Assert.assertEquals;
@@ -119,6 +119,18 @@ public void testProcessBulkLoadFiles_validEntry() throws IOException {
assertTrue(paths.get(1).toString().contains("ns/tbl/region123/cf1/file2"));
}
+ @Test
+ public void testProcessBulkLoadFiles_validEntry_singleEntryApi() throws IOException {
+ WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", true,
+ "cf1", "file1", "file2");
+
+ List paths = BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), entry.getEdit());
+
+ assertEquals(2, paths.size());
+ assertTrue(paths.get(0).toString().contains("ns/tbl/region123/cf1/file1"));
+ assertTrue(paths.get(1).toString().contains("ns/tbl/region123/cf1/file2"));
+ }
+
/**
* Verifies that a non-replicable bulk load entry is ignored.
*/
@@ -132,6 +144,16 @@ public void testProcessBulkLoadFiles_nonReplicableSkipped() throws IOException {
assertTrue(paths.isEmpty());
}
+ @Test
+ public void testProcessBulkLoadFiles_nonReplicableSkipped_singleEntryApi() throws IOException {
+ WAL.Entry entry =
+ createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", false, "cf1", "file1");
+
+ List paths = BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), entry.getEdit());
+
+ assertTrue(paths.isEmpty());
+ }
+
/**
* Verifies that entries without the BULK_LOAD qualifier are ignored.
*/
@@ -146,6 +168,16 @@ public void testProcessBulkLoadFiles_noBulkLoadQualifier() throws IOException {
assertTrue(paths.isEmpty());
}
+ @Test
+ public void testProcessBulkLoadFiles_noBulkLoadQualifier_singleEntryApi() throws IOException {
+ WALEdit edit = new WALEdit();
+ WALKeyImpl key = new WALKeyImpl(new byte[] {}, TableName.valueOf("ns", "tbl"), 0L, 0L, null);
+
+ List paths = BulkLoadProcessor.processBulkLoadFiles(key, edit);
+
+ assertTrue(paths.isEmpty());
+ }
+
/**
* Verifies that multiple WAL entries with different column families produce the correct set of
* file paths.
@@ -163,4 +195,38 @@ public void testProcessBulkLoadFiles_multipleFamilies() throws IOException {
assertTrue(paths.stream().anyMatch(p -> p.toString().contains("cf1/file1")));
assertTrue(paths.stream().anyMatch(p -> p.toString().contains("cf2/fileA")));
}
+
+ @Test
+ public void testProcessBulkLoadFiles_multipleFamilies_singleEntryApi() throws IOException {
+ WAL.Entry entry =
+ createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", true, "cf1", "file1");
+ WAL.Entry entry2 =
+ createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", true, "cf2", "fileA");
+
+ List paths1 = BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), entry.getEdit());
+ List paths2 = BulkLoadProcessor.processBulkLoadFiles(entry2.getKey(), entry2.getEdit());
+
+ // combine to mimic processing multiple entries
+ paths1.addAll(paths2);
+
+ assertEquals(2, paths1.size());
+ assertTrue(paths1.stream().anyMatch(p -> p.toString().contains("cf1/file1")));
+ assertTrue(paths1.stream().anyMatch(p -> p.toString().contains("cf2/fileA")));
+ }
+
+ /**
+ * Sanity check: list-based API should still work and return the same results as invoking the
+ * single-entry API for the same entry (ensures delegation/backwards compatibility).
+ */
+ @Test
+ public void testProcessBulkLoadFiles_listApi_delegatesToSingle() throws IOException {
+ WAL.Entry entry =
+ createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", true, "cf1", "file1");
+
+ List single = BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), entry.getEdit());
+ List listApi = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+
+ assertEquals(single.size(), listApi.size());
+ assertTrue(listApi.get(0).toString().contains("ns/tbl/region123/cf1/file1"));
+ }
}
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index b5c1d39a550c..b2bda03c44ac 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -66,7 +66,7 @@ public class WALInputFormat extends InputFormat {
/**
* {@link InputSplit} for {@link WAL} files. Each split represent exactly one log file.
*/
- static class WALSplit extends InputSplit implements Writable {
+ public static class WALSplit extends InputSplit implements Writable {
private String logFileName;
private long fileSize;
private long startTime;