diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
new file mode 100644
index 000000000000..225d32172766
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and
+ * bulk-loaded files within the specified backup root directory.
+ */
+@InterfaceAudience.Private
+public class BackupFileSystemManager {
+ private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class);
+
+ public static final String WALS_DIR = "WALs";
+ public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
+ private final String peerId;
+ private final FileSystem backupFs;
+ private final Path backupRootDir;
+ private final Path walsDir;
+ private final Path bulkLoadFilesDir;
+
+ public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr)
+ throws IOException {
+ this.peerId = peerId;
+ this.backupRootDir = new Path(backupRootDirStr);
+ this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
+ this.walsDir = createDirectory(WALS_DIR);
+ this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
+ }
+
+ private Path createDirectory(String dirName) throws IOException {
+ Path dirPath = new Path(backupRootDir, dirName);
+ backupFs.mkdirs(dirPath);
+ LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath);
+ return dirPath;
+ }
+
+ public Path getWalsDir() {
+ return walsDir;
+ }
+
+ public Path getBulkLoadFilesDir() {
+ return bulkLoadFilesDir;
+ }
+
+ public FileSystem getBackupFs() {
+ return backupFs;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
new file mode 100644
index 000000000000..6e1271313bcd
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase replication.
+ *
+ * This utility class extracts and constructs the file paths of bulk-loaded files based on WAL
+ * entries. It processes bulk load descriptors and their associated store descriptors to generate
+ * the paths for each bulk-loaded file.
+ *
+ * The class is designed for scenarios where replicable bulk load operations need to be parsed and
+ * their file paths need to be determined programmatically.
+ *
+ */
+@InterfaceAudience.Private
+public final class BulkLoadProcessor {
+ private BulkLoadProcessor() {
+ }
+
+ public static List processBulkLoadFiles(List walEntries) throws IOException {
+ List bulkLoadFilePaths = new ArrayList<>();
+
+ for (WAL.Entry entry : walEntries) {
+ WALEdit edit = entry.getEdit();
+ for (Cell cell : edit.getCells()) {
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ TableName tableName = entry.getKey().getTableName();
+ String namespace = tableName.getNamespaceAsString();
+ String table = tableName.getQualifierAsString();
+ bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table));
+ }
+ }
+ }
+ return bulkLoadFilePaths;
+ }
+
+ private static List processBulkLoadDescriptor(Cell cell, String namespace, String table)
+ throws IOException {
+ List bulkLoadFilePaths = new ArrayList<>();
+ WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+
+ if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == null) {
+ return bulkLoadFilePaths; // Skip if not replicable
+ }
+
+ String regionName = bld.getEncodedRegionName().toStringUtf8();
+ for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) {
+ bulkLoadFilePaths
+ .addAll(processStoreDescriptor(storeDescriptor, namespace, table, regionName));
+ }
+
+ return bulkLoadFilePaths;
+ }
+
+ private static List processStoreDescriptor(WALProtos.StoreDescriptor storeDescriptor,
+ String namespace, String table, String regionName) {
+ List paths = new ArrayList<>();
+ String columnFamily = storeDescriptor.getFamilyName().toStringUtf8();
+
+ for (String storeFile : storeDescriptor.getStoreFileList()) {
+ paths.add(new Path(namespace,
+ new Path(table, new Path(regionName, new Path(columnFamily, storeFile)))));
+ }
+
+ return paths;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
new file mode 100644
index 000000000000..c973af8102e7
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ContinuousBackupReplicationEndpoint is responsible for replicating WAL entries to a backup
+ * storage. It organizes WAL entries by day and periodically flushes the data, ensuring that WAL
+ * files do not exceed the configured size. The class includes mechanisms for handling the WAL
+ * files, performing bulk load backups, and ensuring that the replication process is safe.
+ */
+@InterfaceAudience.Private
+public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContinuousBackupReplicationEndpoint.class);
+ public static final String CONF_PEER_UUID = "hbase.backup.wal.replication.peerUUID";
+ public static final String CONF_BACKUP_ROOT_DIR = "hbase.backup.root.dir";
+ public static final String CONF_BACKUP_MAX_WAL_SIZE = "hbase.backup.max.wal.size";
+ public static final long DEFAULT_MAX_WAL_SIZE = 128 * 1024 * 1024;
+
+ public static final String CONF_STAGED_WAL_FLUSH_INITIAL_DELAY =
+ "hbase.backup.staged.wal.flush.initial.delay.seconds";
+ public static final int DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS = 5 * 60; // 5 minutes
+ public static final String CONF_STAGED_WAL_FLUSH_INTERVAL =
+ "hbase.backup.staged.wal.flush.interval.seconds";
+ public static final int DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS = 5 * 60; // 5 minutes
+ public static final int EXECUTOR_TERMINATION_TIMEOUT_SECONDS = 60; // TODO: configurable??
+
+ private final Map walWriters = new ConcurrentHashMap<>();
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private ReplicationSourceInterface replicationSource;
+ private Configuration conf;
+ private BackupFileSystemManager backupFileSystemManager;
+ private UUID peerUUID;
+ private String peerId;
+ private ScheduledExecutorService flushExecutor;
+
+ public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1);
+ public static final String WAL_FILE_PREFIX = "wal_file.";
+ public static final String DATE_FORMAT = "yyyy-MM-dd";
+
+ @Override
+ public void init(Context context) throws IOException {
+ super.init(context);
+ this.replicationSource = context.getReplicationSource();
+ this.peerId = context.getPeerId();
+ this.conf = HBaseConfiguration.create(context.getConfiguration());
+
+ initializePeerUUID();
+ initializeBackupFileSystemManager();
+ startWalFlushExecutor();
+ LOG.info("{} Initialization complete", Utils.logPeerId(peerId));
+ }
+
+ private void initializePeerUUID() throws IOException {
+ String peerUUIDStr = conf.get(CONF_PEER_UUID);
+ if (peerUUIDStr == null || peerUUIDStr.isEmpty()) {
+ throw new IOException("Peer UUID is not specified. Please configure " + CONF_PEER_UUID);
+ }
+ try {
+ this.peerUUID = UUID.fromString(peerUUIDStr);
+ LOG.info("{} Peer UUID initialized to {}", Utils.logPeerId(peerId), peerUUID);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("Invalid Peer UUID format: " + peerUUIDStr, e);
+ }
+ }
+
+ private void initializeBackupFileSystemManager() throws IOException {
+ String backupRootDir = conf.get(CONF_BACKUP_ROOT_DIR);
+ if (backupRootDir == null || backupRootDir.isEmpty()) {
+ throw new IOException(
+ "Backup root directory is not specified. Configure " + CONF_BACKUP_ROOT_DIR);
+ }
+
+ try {
+ this.backupFileSystemManager = new BackupFileSystemManager(peerId, conf, backupRootDir);
+ LOG.info("{} BackupFileSystemManager initialized successfully for {}",
+ Utils.logPeerId(peerId), backupRootDir);
+ } catch (IOException e) {
+ throw new IOException("Failed to initialize BackupFileSystemManager", e);
+ }
+ }
+
+ private void startWalFlushExecutor() {
+ int initialDelay = conf.getInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY,
+ DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS);
+ int flushInterval =
+ conf.getInt(CONF_STAGED_WAL_FLUSH_INTERVAL, DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS);
+
+ flushExecutor = Executors.newSingleThreadScheduledExecutor();
+ flushExecutor.scheduleAtFixedRate(this::flushAndBackupSafely, initialDelay, flushInterval,
+ TimeUnit.SECONDS);
+ LOG.info("{} Scheduled WAL flush executor started with initial delay {}s and interval {}s",
+ Utils.logPeerId(peerId), initialDelay, flushInterval);
+ }
+
+ private void flushAndBackupSafely() {
+ lock.lock();
+ try {
+ LOG.info("{} Periodic WAL flush triggered", Utils.logPeerId(peerId));
+ flushWriters();
+ replicationSource.persistOffsets();
+ LOG.info("{} Periodic WAL flush and offset persistence completed successfully",
+ Utils.logPeerId(peerId));
+ } catch (IOException e) {
+ LOG.error("{} Error during WAL flush: {}", Utils.logPeerId(peerId), e.getMessage(), e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void flushWriters() throws IOException {
+ LOG.info("{} Flushing {} WAL writers", Utils.logPeerId(peerId), walWriters.size());
+ for (Map.Entry entry : walWriters.entrySet()) {
+ FSHLogProvider.Writer writer = entry.getValue();
+ if (writer != null) {
+ LOG.debug("{} Closing WAL writer for day: {}", Utils.logPeerId(peerId), entry.getKey());
+ try {
+ writer.close();
+ LOG.debug("{} Successfully closed WAL writer for day: {}", Utils.logPeerId(peerId),
+ entry.getKey());
+ } catch (IOException e) {
+ LOG.error("{} Failed to close WAL writer for day: {}. Error: {}", Utils.logPeerId(peerId),
+ entry.getKey(), e.getMessage(), e);
+ throw e;
+ }
+ }
+ }
+ walWriters.clear();
+ LOG.info("{} WAL writers flushed and cleared", Utils.logPeerId(peerId));
+ }
+
+ @Override
+ public UUID getPeerUUID() {
+ return peerUUID;
+ }
+
+ @Override
+ public void start() {
+ LOG.info("{} Starting ContinuousBackupReplicationEndpoint", Utils.logPeerId(peerId));
+ startAsync();
+ }
+
+ @Override
+ protected void doStart() {
+ LOG.info("{} ContinuousBackupReplicationEndpoint started successfully.",
+ Utils.logPeerId(peerId));
+ notifyStarted();
+ }
+
+ @Override
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
+ final List entries = replicateContext.getEntries();
+ if (entries.isEmpty()) {
+ LOG.debug("{} No WAL entries to replicate", Utils.logPeerId(peerId));
+ return ReplicationResult.SUBMITTED;
+ }
+
+ LOG.debug("{} Received {} WAL entries for replication", Utils.logPeerId(peerId),
+ entries.size());
+
+ Map> groupedEntries = groupEntriesByDay(entries);
+ LOG.debug("{} Grouped WAL entries by day: {}", Utils.logPeerId(peerId),
+ groupedEntries.keySet());
+
+ lock.lock();
+ try {
+ for (Map.Entry> entry : groupedEntries.entrySet()) {
+ LOG.debug("{} Backing up {} WAL entries for day {}", Utils.logPeerId(peerId),
+ entry.getValue().size(), entry.getKey());
+ backupWalEntries(entry.getKey(), entry.getValue());
+ }
+
+ if (isAnyWriterFull()) {
+ LOG.debug("{} Some WAL writers reached max size, triggering flush",
+ Utils.logPeerId(peerId));
+ flushWriters();
+ LOG.debug("{} Replication committed after WAL flush", Utils.logPeerId(peerId));
+ return ReplicationResult.COMMITTED;
+ }
+
+ LOG.debug("{} Replication submitted successfully", Utils.logPeerId(peerId));
+ return ReplicationResult.SUBMITTED;
+ } catch (IOException e) {
+ LOG.error("{} Replication failed. Error details: {}", Utils.logPeerId(peerId), e.getMessage(),
+ e);
+ return ReplicationResult.FAILED;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private Map> groupEntriesByDay(List entries) {
+ return entries.stream().collect(
+ Collectors.groupingBy(entry -> (entry.getKey().getWriteTime() / ONE_DAY_IN_MILLISECONDS)
+ * ONE_DAY_IN_MILLISECONDS));
+ }
+
+ private boolean isAnyWriterFull() {
+ return walWriters.values().stream().anyMatch(this::isWriterFull);
+ }
+
+ private boolean isWriterFull(FSHLogProvider.Writer writer) {
+ long maxWalSize = conf.getLong(CONF_BACKUP_MAX_WAL_SIZE, DEFAULT_MAX_WAL_SIZE);
+ return writer.getLength() >= maxWalSize;
+ }
+
+ private void backupWalEntries(long day, List walEntries) throws IOException {
+ LOG.debug("{} Starting backup of {} WAL entries for day {}", Utils.logPeerId(peerId),
+ walEntries.size(), day);
+
+ try {
+ FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, this::createWalWriter);
+ List bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(walEntries);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Processed {} bulk load files for WAL entries", Utils.logPeerId(peerId),
+ bulkLoadFiles.size());
+ LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId),
+ bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
+ }
+
+ for (WAL.Entry entry : walEntries) {
+ walWriter.append(entry);
+ }
+ walWriter.sync(true);
+ uploadBulkLoadFiles(bulkLoadFiles);
+ } catch (UncheckedIOException e) {
+ String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day;
+ LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day,
+ e.getMessage(), e);
+ throw new IOException(errorMsg, e);
+ }
+ }
+
+ private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
+ // Convert dayInMillis to "yyyy-MM-dd" format
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ String dayDirectoryName = dateFormat.format(new Date(dayInMillis));
+
+ FileSystem fs = backupFileSystemManager.getBackupFs();
+ Path walsDir = backupFileSystemManager.getWalsDir();
+
+ try {
+ // Create a directory for the day
+ Path dayDir = new Path(walsDir, dayDirectoryName);
+ fs.mkdirs(dayDir);
+
+ // Generate a unique WAL file name
+ long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+ String walFileName = WAL_FILE_PREFIX + currentTime + "." + UUID.randomUUID();
+ Path walFilePath = new Path(dayDir, walFileName);
+
+ // Initialize the WAL writer
+ FSHLogProvider.Writer writer =
+ ObjectStoreProtobufWalWriter.class.getDeclaredConstructor().newInstance();
+ writer.init(fs, walFilePath, conf, true, WALUtil.getWALBlockSize(conf, fs, walFilePath),
+ StreamSlowMonitor.create(conf, walFileName));
+
+ LOG.info("{} WAL writer created: {}", Utils.logPeerId(peerId), walFilePath);
+ return writer;
+ } catch (Exception e) {
+ throw new UncheckedIOException(
+ Utils.logPeerId(peerId) + " Failed to initialize WAL Writer for day: " + dayDirectoryName,
+ new IOException(e));
+ }
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("{} Stopping ContinuousBackupReplicationEndpoint...", Utils.logPeerId(peerId));
+ stopAsync();
+ }
+
+ @Override
+ protected void doStop() {
+ close();
+ LOG.info("{} ContinuousBackupReplicationEndpoint stopped successfully.",
+ Utils.logPeerId(peerId));
+ notifyStopped();
+ }
+
+ private void close() {
+ LOG.info("{} Closing WAL replication component...", Utils.logPeerId(peerId));
+ shutdownFlushExecutor();
+ lock.lock();
+ try {
+ flushWriters();
+ replicationSource.persistOffsets();
+ } catch (IOException e) {
+ LOG.error("{} Failed to Flush Open Wal Writers: {}", Utils.logPeerId(peerId), e.getMessage(),
+ e);
+ } finally {
+ lock.unlock();
+ LOG.info("{} WAL replication component closed.", Utils.logPeerId(peerId));
+ }
+ }
+
+ private void uploadBulkLoadFiles(List bulkLoadFiles) throws IOException {
+ LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId),
+ bulkLoadFiles.size());
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
+ bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
+ }
+ for (Path file : bulkLoadFiles) {
+ Path sourcePath = getBulkLoadFileStagingPath(file);
+ Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), file);
+
+ try {
+ LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath,
+ destPath);
+
+ FileUtil.copy(CommonFSUtils.getRootDirFileSystem(conf), sourcePath,
+ backupFileSystemManager.getBackupFs(), destPath, false, conf);
+
+ LOG.info("{} Bulk load file {} successfully backed up to {}", Utils.logPeerId(peerId), file,
+ destPath);
+ } catch (IOException e) {
+ LOG.error("{} Failed to back up bulk load file {}: {}", Utils.logPeerId(peerId), file,
+ e.getMessage(), e);
+ throw e;
+ }
+ }
+
+ LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId));
+ }
+
+ private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException {
+ FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
+ Path rootDir = CommonFSUtils.getRootDir(conf);
+ Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
+ Path baseNamespaceDir = new Path(rootDir, baseNSDir);
+ Path hFileArchiveDir =
+ new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
+
+ LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}", Utils.logPeerId(peerId),
+ relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir);
+
+ Path result =
+ findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, relativePathFromNamespace);
+
+ if (result == null) {
+ LOG.error("{} No bulk loaded file found in relative path: {}", Utils.logPeerId(peerId),
+ relativePathFromNamespace);
+ throw new IOException(
+ "No Bulk loaded file found in relative path: " + relativePathFromNamespace);
+ }
+
+ LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), result);
+ return result;
+ }
+
+ private static Path findExistingPath(FileSystem rootFs, Path baseNamespaceDir,
+ Path hFileArchiveDir, Path filePath) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Checking for bulk load file at: {} and {}", new Path(baseNamespaceDir, filePath),
+ new Path(hFileArchiveDir, filePath));
+ }
+
+ for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath),
+ new Path(hFileArchiveDir, filePath) }) {
+ if (rootFs.exists(candidate)) {
+ LOG.debug("Found bulk load file at: {}", candidate);
+ return candidate;
+ }
+ }
+ return null;
+ }
+
+ private void shutdownFlushExecutor() {
+ if (flushExecutor != null) {
+ LOG.info("{} Initiating WAL flush executor shutdown.", Utils.logPeerId(peerId));
+
+ flushExecutor.shutdown();
+ try {
+ if (
+ !flushExecutor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ ) {
+ LOG.warn("{} Flush executor did not terminate within timeout, forcing shutdown.",
+ Utils.logPeerId(peerId));
+ flushExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ flushExecutor.shutdownNow();
+ LOG.warn("{} Flush executor shutdown was interrupted.", Utils.logPeerId(peerId), e);
+ }
+ LOG.info("{} WAL flush thread stopped.", Utils.logPeerId(peerId));
+ }
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java
new file mode 100644
index 000000000000..27f4fbdc027e
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
+import org.apache.hadoop.hbase.util.AtomicUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A custom implementation of {@link ProtobufLogWriter} that provides support for writing
+ * protobuf-based WAL (Write-Ahead Log) entries to object store-backed files.
+ *
+ * This class overrides the {@link ProtobufLogWriter#sync(boolean)} and
+ * {@link ProtobufLogWriter#initOutput(FileSystem, Path, boolean, int, short, long, StreamSlowMonitor, boolean)}
+ * methods to ensure compatibility with object stores, while ignoring specific capability checks
+ * such as HFLUSH and HSYNC. These checks are often not supported by some object stores, and
+ * bypassing them ensures smooth operation in such environments.
+ *
+ */
+@InterfaceAudience.Private
+public class ObjectStoreProtobufWalWriter extends ProtobufLogWriter {
+ private final AtomicLong syncedLength = new AtomicLong(0);
+
+ @Override
+ public void sync(boolean forceSync) throws IOException {
+ FSDataOutputStream fsDataOutputstream = this.output;
+ if (fsDataOutputstream == null) {
+ return; // Presume closed
+ }
+ // Special case for Hadoop S3: Unlike traditional file systems, where flush() ensures data is
+ // durably written, in Hadoop S3, flush() only writes data to the internal buffer and does not
+ // immediately persist it to S3. The actual upload to S3 happens asynchronously, typically when
+ // a block is full or when close() is called, which finalizes the upload process.
+ fsDataOutputstream.flush();
+ AtomicUtils.updateMax(this.syncedLength, fsDataOutputstream.getPos());
+ }
+
+ @Override
+ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
+ short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
+ throws IOException {
+ try {
+ super.initOutput(fs, path, overwritable, bufferSize, replication, blockSize, monitor,
+ noLocalWrite);
+ } catch (CommonFSUtils.StreamLacksCapabilityException e) {
+ // Ignore capability check for HFLUSH and HSYNC capabilities
+ // Some object stores may not support these capabilities, so we bypass the exception handling
+ // to ensure compatibility with such stores.
+ }
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java
new file mode 100644
index 000000000000..69365674acca
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class Utils {
+ private Utils() {
+ }
+
+ public static String logPeerId(String peerId) {
+ return "[Source for peer " + peerId + "]:";
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
new file mode 100644
index 000000000000..cd1f758f7607
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestContinuousBackupReplicationEndpoint {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestContinuousBackupReplicationEndpoint.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestContinuousBackupReplicationEndpoint.class);
+
+ private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ private static final Configuration conf = TEST_UTIL.getConfiguration();
+ private static Admin admin;
+
+ private final String replicationEndpoint = ContinuousBackupReplicationEndpoint.class.getName();
+ private static final String CF_NAME = "cf";
+ private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier");
+ static FileSystem fs = null;
+ static Path root;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Set the configuration properties as required
+ conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ conf.set(REPLICATION_CLUSTER_ID, "clusterId1");
+
+ TEST_UTIL.startMiniZKCluster();
+ TEST_UTIL.startMiniCluster(3);
+ fs = FileSystem.get(conf);
+ root = TEST_UTIL.getDataTestDirOnTestFS();
+ admin = TEST_UTIL.getAdmin();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testWALAndBulkLoadFileBackup() throws IOException {
+ String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ String peerId = "peerId";
+
+ createTable(tableName);
+
+ Path backupRootDir = new Path(root, methodName);
+ fs.mkdirs(backupRootDir);
+
+ Map> tableMap = new HashMap<>();
+ tableMap.put(tableName, new ArrayList<>());
+
+ addReplicationPeer(peerId, backupRootDir, tableMap);
+
+ loadRandomData(tableName, 100);
+ assertEquals(100, getRowCount(tableName));
+
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
+ generateHFiles(dir);
+ bulkLoadHFiles(tableName, dir);
+ assertEquals(1100, getRowCount(tableName));
+
+ waitForReplication(15000);
+ deleteReplicationPeer(peerId);
+
+ verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100));
+
+ deleteTable(tableName);
+ }
+
+ @Test
+ public void testMultiTableWALBackup() throws IOException {
+ String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName table1 = TableName.valueOf("table_" + methodName + "1");
+ TableName table2 = TableName.valueOf("table_" + methodName + "2");
+ TableName table3 = TableName.valueOf("table_" + methodName + "3");
+ String peerId = "peerMulti";
+
+ for (TableName table : List.of(table1, table2, table3)) {
+ createTable(table);
+ }
+
+ Path backupRootDir = new Path(root, methodName);
+ fs.mkdirs(backupRootDir);
+
+ Map> initialTableMap = new HashMap<>();
+ initialTableMap.put(table1, new ArrayList<>());
+ initialTableMap.put(table2, new ArrayList<>());
+
+ addReplicationPeer(peerId, backupRootDir, initialTableMap);
+
+ for (TableName table : List.of(table1, table2, table3)) {
+ loadRandomData(table, 50);
+ assertEquals(50, getRowCount(table));
+ }
+
+ waitForReplication(15000);
+
+ // Update the Replication Peer to Include table3
+ admin.updateReplicationPeerConfig(peerId,
+ ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig(peerId))
+ .setTableCFsMap(
+ Map.of(table1, new ArrayList<>(), table2, new ArrayList<>(), table3, new ArrayList<>()))
+ .build());
+
+ for (TableName table : List.of(table1, table2, table3)) {
+ loadRandomData(table, 50);
+ assertEquals(100, getRowCount(table));
+ }
+
+ waitForReplication(15000);
+ deleteReplicationPeer(peerId);
+
+ verifyBackup(backupRootDir.toString(), false, Map.of(table1, 100, table2, 100, table3, 50));
+
+ for (TableName table : List.of(table1, table2, table3)) {
+ deleteTable(table);
+ }
+ }
+
+ @Test
+ public void testWALBackupWithPeerRestart() throws IOException, InterruptedException {
+ String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ String peerId = "peerId";
+
+ createTable(tableName);
+
+ Path backupRootDir = new Path(root, methodName);
+ fs.mkdirs(backupRootDir);
+
+ Map> tableMap = new HashMap<>();
+ tableMap.put(tableName, new ArrayList<>());
+
+ addReplicationPeer(peerId, backupRootDir, tableMap);
+
+ AtomicBoolean stopLoading = new AtomicBoolean(false);
+
+ // Start a separate thread to load data continuously
+ Thread dataLoaderThread = new Thread(() -> {
+ try {
+ while (!stopLoading.get()) {
+ loadRandomData(tableName, 10);
+ Thread.sleep(1000); // Simulate delay
+ }
+ } catch (Exception e) {
+ LOG.error("Data loading thread encountered an error", e);
+ }
+ });
+
+ dataLoaderThread.start();
+
+ // Main thread enables and disables replication peer
+ try {
+ for (int i = 0; i < 5; i++) {
+ LOG.info("Disabling replication peer...");
+ admin.disableReplicationPeer(peerId);
+ Thread.sleep(2000);
+
+ LOG.info("Enabling replication peer...");
+ admin.enableReplicationPeer(peerId);
+ Thread.sleep(2000);
+ }
+ } finally {
+ stopLoading.set(true); // Stop the data loader thread
+ dataLoaderThread.join();
+ }
+
+ waitForReplication(20000);
+ deleteReplicationPeer(peerId);
+
+ verifyBackup(backupRootDir.toString(), false, Map.of(tableName, getRowCount(tableName)));
+
+ deleteTable(tableName);
+ }
+
+ @Test
+ public void testDayWiseWALBackup() throws IOException {
+ String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ String peerId = "peerId";
+
+ createTable(tableName);
+
+ Path backupRootDir = new Path(root, methodName);
+ fs.mkdirs(backupRootDir);
+
+ Map> tableMap = new HashMap<>();
+ tableMap.put(tableName, new ArrayList<>());
+
+ addReplicationPeer(peerId, backupRootDir, tableMap);
+
+ // Mock system time using ManualEnvironmentEdge
+ ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
+ EnvironmentEdgeManagerTestHelper.injectEdge(manualEdge);
+
+ long currentTime = System.currentTimeMillis();
+ long oneDayBackTime = currentTime - ONE_DAY_IN_MILLISECONDS;
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ String expectedPrevDayDir = dateFormat.format(new Date(oneDayBackTime));
+ String expectedCurrentDayDir = dateFormat.format(new Date(currentTime));
+
+ manualEdge.setValue(oneDayBackTime);
+ loadRandomData(tableName, 100);
+ assertEquals(100, getRowCount(tableName));
+
+ manualEdge.setValue(currentTime);
+ loadRandomData(tableName, 100);
+ assertEquals(200, getRowCount(tableName));
+
+ // Reset time mocking
+ EnvironmentEdgeManagerTestHelper.reset();
+
+ waitForReplication(15000);
+ deleteReplicationPeer(peerId);
+
+ verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 200));
+
+ // Verify that WALs are stored in two directories, one for each day
+ Path walDir = new Path(backupRootDir, WALS_DIR);
+ Set walDirectories = new HashSet<>();
+
+ FileStatus[] fileStatuses = fs.listStatus(walDir);
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isDirectory()) {
+ String dirName = fileStatus.getPath().getName();
+ walDirectories.add(dirName);
+ }
+ }
+
+ assertEquals("WALs should be stored in exactly two directories", 2, walDirectories.size());
+ assertTrue("Expected previous day's WAL directory missing",
+ walDirectories.contains(expectedPrevDayDir));
+ assertTrue("Expected current day's WAL directory missing",
+ walDirectories.contains(expectedCurrentDayDir));
+
+ deleteTable(tableName);
+ }
+
+ private void createTable(TableName tableName) throws IOException {
+ ColumnFamilyDescriptor columnFamilyDescriptor =
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF_NAME)).setScope(1).build();
+ TableDescriptor tableDescriptor =
+ TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(columnFamilyDescriptor).build();
+
+ if (!admin.tableExists(tableName)) {
+ admin.createTable(tableDescriptor);
+ }
+ }
+
+ private void deleteTable(TableName tableName) throws IOException {
+ admin.disableTable(tableName);
+ admin.truncateTable(tableName, false);
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+
+ private void addReplicationPeer(String peerId, Path backupRootDir,
+ Map> tableMap) throws IOException {
+ Map additionalArgs = new HashMap<>();
+ additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString());
+ additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString());
+ additionalArgs.put(CONF_BACKUP_MAX_WAL_SIZE, "10240");
+ additionalArgs.put(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10");
+ additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10");
+
+ ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setReplicationEndpointImpl(replicationEndpoint).setReplicateAllUserTables(false)
+ .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build();
+
+ admin.addReplicationPeer(peerId, peerConfig);
+ }
+
+ private void deleteReplicationPeer(String peerId) throws IOException {
+ admin.disableReplicationPeer(peerId);
+ admin.removeReplicationPeer(peerId);
+ }
+
+ private void loadRandomData(TableName tableName, int totalRows) throws IOException {
+ int rowSize = 32;
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+ TEST_UTIL.loadRandomRows(table, Bytes.toBytes(CF_NAME), rowSize, totalRows);
+ }
+ }
+
+ private void bulkLoadHFiles(TableName tableName, Path inputDir) throws IOException {
+ TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
+
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+ BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
+ loader.bulkLoad(table.getName(), inputDir);
+ } finally {
+ TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
+ }
+ }
+
+ private void bulkLoadHFiles(TableName tableName, Map> family2Files)
+ throws IOException {
+ TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
+
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+ BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
+ loader.bulkLoad(table.getName(), family2Files);
+ } finally {
+ TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
+ }
+ }
+
+ private void generateHFiles(Path outputDir) throws IOException {
+ String hFileName = "MyHFile";
+ int numRows = 1000;
+ outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+ byte[] from = Bytes.toBytes(CF_NAME + "begin");
+ byte[] to = Bytes.toBytes(CF_NAME + "end");
+
+ Path familyDir = new Path(outputDir, CF_NAME);
+ HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new Path(familyDir, hFileName),
+ Bytes.toBytes(CF_NAME), QUALIFIER, from, to, numRows);
+ }
+
+ private void waitForReplication(int durationInMillis) {
+ LOG.info("Waiting for replication to complete for {} ms", durationInMillis);
+ try {
+ Thread.sleep(durationInMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Thread was interrupted while waiting", e);
+ }
+ }
+
+ /**
+ * Verifies the backup process by: 1. Checking whether any WAL (Write-Ahead Log) files were
+ * generated in the backup directory. 2. Checking whether any bulk-loaded files were generated in
+ * the backup directory. 3. Replaying the WAL and bulk-loaded files (if present) to restore data
+ * and check consistency by verifying that the restored data matches the expected row count for
+ * each table.
+ */
+ private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles,
+ Map tablesWithExpectedRows) throws IOException {
+ verifyWALBackup(backupRootDir);
+ if (hasBulkLoadFiles) {
+ verifyBulkLoadBackup(backupRootDir);
+ }
+
+ for (Map.Entry entry : tablesWithExpectedRows.entrySet()) {
+ TableName tableName = entry.getKey();
+ int expectedRows = entry.getValue();
+
+ admin.disableTable(tableName);
+ admin.truncateTable(tableName, false);
+ assertEquals(0, getRowCount(tableName));
+
+ replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName);
+ replayBulkLoadHFilesIfPresent(new Path(backupRootDir, BULKLOAD_FILES_DIR).toString(),
+ tableName);
+ assertEquals(expectedRows, getRowCount(tableName));
+ }
+ }
+
+ private void verifyWALBackup(String backupRootDir) throws IOException {
+ Path walDir = new Path(backupRootDir, WALS_DIR);
+ assertTrue("WAL directory does not exist!", fs.exists(walDir));
+
+ RemoteIterator fileStatusIterator = fs.listFiles(walDir, true);
+ List walFiles = new ArrayList<>();
+
+ while (fileStatusIterator.hasNext()) {
+ LocatedFileStatus fileStatus = fileStatusIterator.next();
+ Path filePath = fileStatus.getPath();
+
+ // Check if the file starts with the expected WAL prefix
+ if (!fileStatus.isDirectory() && filePath.getName().startsWith(WAL_FILE_PREFIX)) {
+ walFiles.add(filePath);
+ }
+ }
+
+ assertNotNull("No WAL files found!", walFiles);
+ assertFalse("Expected some WAL files but found none!", walFiles.isEmpty());
+ }
+
+ private void verifyBulkLoadBackup(String backupRootDir) throws IOException {
+ Path bulkLoadFilesDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
+ assertTrue("BulkLoad Files directory does not exist!", fs.exists(bulkLoadFilesDir));
+
+ FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir);
+ assertNotNull("No Bulk load files found!", bulkLoadFiles);
+ assertTrue("Expected some Bulk load files but found none!", bulkLoadFiles.length > 0);
+ }
+
+ private void replayWALs(String walDir, TableName tableName) {
+ WALPlayer player = new WALPlayer();
+ try {
+ assertEquals(0, ToolRunner.run(TEST_UTIL.getConfiguration(), player,
+ new String[] { walDir, tableName.getQualifierAsString() }));
+ } catch (Exception e) {
+ fail("Failed to replay WALs properly: " + e.getMessage());
+ }
+ }
+
+ private void replayBulkLoadHFilesIfPresent(String bulkLoadDir, TableName tableName) {
+ try {
+ Path tableBulkLoadDir = new Path(bulkLoadDir + "/default/" + tableName);
+ if (fs.exists(tableBulkLoadDir)) {
+ RemoteIterator fileStatusIterator = fs.listFiles(tableBulkLoadDir, true);
+ List bulkLoadFiles = new ArrayList<>();
+
+ while (fileStatusIterator.hasNext()) {
+ LocatedFileStatus fileStatus = fileStatusIterator.next();
+ Path filePath = fileStatus.getPath();
+
+ if (!fileStatus.isDirectory()) {
+ bulkLoadFiles.add(filePath);
+ }
+ }
+ bulkLoadHFiles(tableName, Map.of(Bytes.toBytes(CF_NAME), bulkLoadFiles));
+ }
+ } catch (Exception e) {
+ fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
+ }
+ }
+
+ private int getRowCount(TableName tableName) throws IOException {
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+ return HBaseTestingUtil.countRows(table);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 5edd5b3e8c92..fc5c2bf62659 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -51,6 +52,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Context {
+ private final ReplicationSourceInterface replicationSource;
private final Server server;
private final Configuration localConf;
private final Configuration conf;
@@ -63,10 +65,12 @@ class Context {
private final Abortable abortable;
@InterfaceAudience.Private
- public Context(final Server server, final Configuration localConf, final Configuration conf,
- final FileSystem fs, final String peerId, final UUID clusterId,
- final ReplicationPeer replicationPeer, final MetricsSource metrics,
- final TableDescriptors tableDescriptors, final Abortable abortable) {
+ public Context(final ReplicationSourceInterface replicationSource, final Server server,
+ final Configuration localConf, final Configuration conf, final FileSystem fs,
+ final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer,
+ final MetricsSource metrics, final TableDescriptors tableDescriptors,
+ final Abortable abortable) {
+ this.replicationSource = replicationSource;
this.server = server;
this.localConf = localConf;
this.conf = conf;
@@ -79,6 +83,10 @@ public Context(final Server server, final Configuration localConf, final Configu
this.abortable = abortable;
}
+ public ReplicationSourceInterface getReplicationSource() {
+ return replicationSource;
+ }
+
public Server getServer() {
return server;
}
@@ -208,7 +216,7 @@ public int getTimeout() {
* the context are assumed to be persisted in the target cluster.
* @param replicateContext a context where WAL entries and other parameters can be obtained.
*/
- boolean replicate(ReplicateContext replicateContext);
+ ReplicationResult replicate(ReplicateContext replicateContext);
// The below methods are inspired by Guava Service. See
// https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java
new file mode 100644
index 000000000000..03ed0ce6799f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public enum ReplicationResult {
+ /* Batch has been replicated and persisted successfully. */
+ COMMITTED,
+
+ /* Batch has been submitted for replication, but not persisted yet. */
+ SUBMITTED,
+
+ /* Batch replicaton failed, should be re-tried */
+ FAILED
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java
index 229cec57e976..a9674407bd2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java
@@ -59,10 +59,10 @@ private void checkCell(Cell cell) {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream())
.forEach(this::checkCell);
- return true;
+ return ReplicationResult.COMMITTED;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 6bdc97732644..4f9a4909d784 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -424,7 +425,7 @@ private long parallelReplicate(ReplicateContext replicateContext, List> batches = createBatches(replicateContext.getEntries());
@@ -458,7 +459,7 @@ public boolean replicate(ReplicateContext replicateContext) {
try {
// replicate the batches to sink side.
parallelReplicate(replicateContext, batches);
- return true;
+ return ReplicationResult.COMMITTED;
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
@@ -467,14 +468,14 @@ public boolean replicate(ReplicateContext replicateContext) {
batches = filterNotExistTableEdits(batches);
if (batches.isEmpty()) {
LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
- return true;
+ return ReplicationResult.COMMITTED;
}
} else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
batches = filterNotExistColumnFamilyEdits(batches);
if (batches.isEmpty()) {
LOG.warn("After filter not exist column family's edits, 0 edits to replicate, "
+ "just return");
- return true;
+ return ReplicationResult.COMMITTED;
}
} else {
LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
@@ -506,7 +507,7 @@ public boolean replicate(ReplicateContext replicateContext) {
}
}
}
- return false; // in case we exited before replicating
+ return ReplicationResult.FAILED; // in case we exited before replicating
}
protected boolean isPeerEnabled() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 094fa4aaa786..d6d59a39b527 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
@@ -166,6 +167,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
*/
private final List baseFilterOutWALEntries;
+ private final Map lastEntryBatch = new ConcurrentHashMap<>();
+
ReplicationSource() {
// Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
this(p -> !AbstractFSWALProvider.isMetaFile(p),
@@ -318,8 +321,8 @@ private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndp
if (server instanceof HRegionServer) {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
- replicationEndpoint
- .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
+ replicationEndpoint.init(
+ new ReplicationEndpoint.Context(this, server, conf, replicationPeer.getConfiguration(), fs,
replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
replicationEndpoint.start();
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
@@ -861,4 +864,32 @@ public String logPeerId() {
public long getTotalReplicatedEdits() {
return totalReplicatedEdits.get();
}
+
+ @Override
+ public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) {
+ String walName = entryBatch.getLastWalPath().getName();
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName);
+
+ synchronized (lastEntryBatch) { // Synchronize addition and processing
+ lastEntryBatch.put(walPrefix, entryBatch);
+
+ if (replicated == ReplicationResult.COMMITTED) {
+ processAndClearEntries();
+ }
+ }
+ }
+
+ public void persistOffsets() {
+ synchronized (lastEntryBatch) {
+ processAndClearEntries();
+ }
+ }
+
+ private void processAndClearEntries() {
+ // Process all entries
+ lastEntryBatch
+ .forEach((prefix, batch) -> getSourceManager().logPositionAndCleanOldLogs(this, batch));
+ // Clear all processed entries
+ lastEntryBatch.clear();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 69ad2887064a..f482cc73e717 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -207,7 +208,11 @@ default boolean isRecovered() {
* @param entryBatch the wal entry batch we just shipped
* @return The instance of queueStorage used by this ReplicationSource.
*/
- default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
- getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
+ default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) {
+
+ }
+
+ default public void persistOffsets() {
+
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 6d0730d76b6e..ee819faa77b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -155,7 +156,7 @@ private void shipEdits(WALEntryBatch entryBatch) {
List entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
- updateLogPosition(entryBatch);
+ updateLogPosition(entryBatch, ReplicationResult.COMMITTED);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
@@ -182,21 +183,23 @@ private void shipEdits(WALEntryBatch entryBatch) {
long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
- boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
+ ReplicationResult replicated = source.getReplicationEndpoint().replicate(replicateContext);
long endTimeNs = System.nanoTime();
- if (!replicated) {
+ if (replicated == ReplicationResult.FAILED) {
continue;
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
- // Clean up hfile references
- for (Entry entry : entries) {
- cleanUpHFileRefs(entry.getEdit());
- LOG.trace("shipped entry {}: ", entry);
+ if (replicated == ReplicationResult.COMMITTED) {
+ // Clean up hfile references
+ for (Entry entry : entries) {
+ cleanUpHFileRefs(entry.getEdit());
+ LOG.trace("shipped entry {}: ", entry);
+ }
}
// Log and clean up WAL logs
- updateLogPosition(entryBatch);
+ updateLogPosition(entryBatch, replicated);
// offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
// this sizeExcludeBulkLoad has to use same calculation that when calling
@@ -253,7 +256,7 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException {
}
}
- private boolean updateLogPosition(WALEntryBatch batch) {
+ private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) {
boolean updated = false;
// if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
// record on zk, so let's call it. The last wal position maybe zero if end of file is true and
@@ -263,7 +266,7 @@ private boolean updateLogPosition(WALEntryBatch batch) {
batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
|| batch.getLastWalPosition() != currentPosition
) {
- source.logPositionAndCleanOldLogs(batch);
+ source.logPositionAndCleanOldLogs(batch, replicated);
updated = true;
}
// if end of file is true, then we can just skip to the next file in queue.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
index b97a08c01c38..a32ce78b0c78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -63,7 +64,7 @@ public void peerConfigUpdated(ReplicationPeerConfig rpc) {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
if (!delegator.canReplicateToSameCluster()) {
// Only when the replication is inter cluster replication we need to
// convert the visibility tags to
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
index e6a39e7fede1..f0e627316cd4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
@@ -42,8 +42,8 @@ public WALEntryFilter getWALEntryfilter() {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
- return true;
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
+ return ReplicationResult.COMMITTED;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
index f54c39316997..a8c76033d02d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -81,7 +81,7 @@ public UUID getPeerUUID() {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
synchronized (WRITER) {
try {
for (Entry entry : replicateContext.getEntries()) {
@@ -92,7 +92,7 @@ public boolean replicate(ReplicateContext replicateContext) {
throw new UncheckedIOException(e);
}
}
- return true;
+ return ReplicationResult.COMMITTED;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 058564dc0ecf..d9a75b8ca8a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -59,8 +59,8 @@ public void setUp() throws Exception {
when(replicationPeer.getPeerConfig()).thenReturn(peerConfig);
when(peerConfig.getClusterKey()).thenReturn("hbase+zk://server1:2181/hbase");
ReplicationEndpoint.Context context =
- new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null,
- null, null, replicationPeer, null, null, null);
+ new ReplicationEndpoint.Context(null, null, UTIL.getConfiguration(), UTIL.getConfiguration(),
+ null, null, null, replicationPeer, null, null, null);
endpoint = new DummyHBaseReplicationEndpoint();
endpoint.init(context);
}
@@ -199,8 +199,8 @@ protected Collection fetchPeerAddresses() {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
- return false;
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
+ return ReplicationResult.FAILED;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java
index 70cae18b4561..c98b46c8e4be 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java
@@ -127,9 +127,9 @@ public WALEntryFilter getWALEntryfilter() {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
REPLICATED.set(true);
- return true;
+ return ReplicationResult.COMMITTED;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 70a6d73c6202..f53d9acc24f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -415,7 +415,7 @@ public ReplicationEndpointTest() {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
replicatedEntries.addAll(replicateContext.getEntries());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 057a9f3567f5..77cd5da8de0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -463,10 +463,10 @@ public UUID getPeerUUID() {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
lastEntries = new ArrayList<>(replicateContext.entries);
- return true;
+ return ReplicationResult.COMMITTED;
}
@Override
@@ -526,12 +526,12 @@ public void init(Context context) throws IOException {
}
@Override
- public boolean replicate(ReplicateContext context) {
+ public ReplicationResult replicate(ReplicateContext context) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- return false;
+ return ReplicationResult.FAILED;
}
return super.replicate(context);
}
@@ -548,9 +548,9 @@ public InterClusterReplicationEndpointForTest() {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
- boolean success = super.replicate(replicateContext);
- if (success) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
+ ReplicationResult success = super.replicate(replicateContext);
+ if (success == ReplicationResult.COMMITTED) {
replicateCount.addAndGet(replicateContext.entries.size());
}
return success;
@@ -577,7 +577,7 @@ public static class ReplicationEndpointReturningFalse extends ReplicationEndpoin
static AtomicBoolean replicated = new AtomicBoolean(false);
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
try {
// check row
doAssert(row);
@@ -589,7 +589,7 @@ public boolean replicate(ReplicateContext replicateContext) {
LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
- return replicated.get();
+ return replicated.get() ? ReplicationResult.COMMITTED : ReplicationResult.FAILED;
}
}
@@ -598,14 +598,14 @@ public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEnd
static AtomicReference ex = new AtomicReference<>(null);
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
try {
super.replicate(replicateContext);
doAssert(row);
} catch (Exception e) {
ex.set(e);
}
- return true;
+ return ReplicationResult.COMMITTED;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java
index b990916ae75f..50b0911970a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java
@@ -71,7 +71,7 @@ public class TestVerifyCellsReplicationEndpoint {
public static final class EndpointForTest extends VerifyWALEntriesReplicationEndpoint {
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
LOG.info(replicateContext.getEntries().toString());
replicateContext.entries.stream().map(WAL.Entry::getEdit).map(WALEdit::getCells)
.forEachOrdered(CELLS::addAll);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java
index 7b108f5ca148..92e7c8290f0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java
@@ -83,8 +83,8 @@ public static void setUpBeforeClass() throws Exception {
when(rpc.isSerial()).thenReturn(false);
when(replicationPeer.getPeerConfig()).thenReturn(rpc);
when(rpc.getClusterKey()).thenReturn("hbase+zk://localhost:2181");
- Context context = new Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null,
- null, null, replicationPeer, null, null, null);
+ Context context = new Context(null, null, UTIL.getConfiguration(), UTIL.getConfiguration(),
+ null, null, null, replicationPeer, null, null, null);
endpoint = new HBaseInterClusterReplicationEndpoint();
endpoint.init(context);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
index 66f04dca36d5..d7b5bcdcccbf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -94,7 +95,7 @@ public UUID getPeerUUID() {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
synchronized (WRITER) {
try {
for (Entry entry : replicateContext.getEntries()) {
@@ -105,7 +106,7 @@ public boolean replicate(ReplicateContext replicateContext) {
throw new UncheckedIOException(e);
}
}
- return true;
+ return ReplicationResult.COMMITTED;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 663b444dc4e4..c99f25380de4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
@@ -94,13 +95,13 @@ public static final class ReplicationEndpointForTest extends DummyReplicationEnd
private String clusterKey;
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
// if you want to block the replication, for example, do not want the recovered source to be
// removed
if (clusterKey.endsWith("error")) {
throw new RuntimeException("Inject error");
}
- return true;
+ return ReplicationResult.COMMITTED;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index 979db712ef34..cdbd1c73a2a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -218,7 +219,7 @@ public static void setEntriesCount(int i) {
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
try {
await();
} catch (InterruptedException e) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index 7d5a5627d2c0..ffbc0d2cee5a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -65,6 +65,7 @@
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
@@ -473,8 +474,8 @@ public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint,
}
@Override
- public boolean replicate(ReplicateContext replicateContext) {
- boolean ret = super.replicate(replicateContext);
+ public ReplicationResult replicate(ReplicateContext replicateContext) {
+ ReplicationResult ret = super.replicate(replicateContext);
lastEntries = replicateContext.getEntries();
replicateCount.incrementAndGet();
return ret;