From d37a5bf90bb7efcee21d27543fd14e7c0ed98faa Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Wed, 9 Jul 2025 22:52:35 +0530 Subject: [PATCH 1/4] HBASE-29441: ReplicationSourceShipper should return SUBMITTED instead of COMMITED for empty WAL batches --- .../replication/regionserver/ReplicationSourceShipper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index ee819faa77b8..f8ef9b22115f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -156,7 +156,7 @@ private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - updateLogPosition(entryBatch, ReplicationResult.COMMITTED); + updateLogPosition(entryBatch, ReplicationResult.SUBMITTED); return; } int currentSize = (int) entryBatch.getHeapSize(); From 0ffeddc68ce1af36425b4242486668965f37c66b Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Mon, 14 Jul 2025 15:22:23 +0530 Subject: [PATCH 2/4] deligate the emtpy wal process decision to Replication endpoint --- .../ContinuousBackupReplicationEndpoint.java | 9 +++++ .../hbase/replication/EmptyEntriesPolicy.java | 34 +++++++++++++++++++ .../replication/ReplicationEndpoint.java | 18 ++++++++++ .../ReplicationSourceShipper.java | 18 +++++++++- 4 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index bf3fbd531bfe..2442e0789a8d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -205,6 +206,14 @@ protected void doStart() { notifyStarted(); } + @Override + public EmptyEntriesPolicy getEmptyEntriesPolicy() { + // Since this endpoint writes to S3 asynchronously, an empty entry batch + // does not guarantee that all previously submitted entries were persisted. + // Hence, avoid committing the WAL position. + return EmptyEntriesPolicy.SUBMIT; + } + @Override public ReplicationResult replicate(ReplicateContext replicateContext) { final List entries = replicateContext.getEntries(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java new file mode 100644 index 000000000000..5a5d8ab754c3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Policy that defines what a replication endpoint should do when the entry batch is empty. This is + * used to determine whether the replication source should consider an empty batch as: - + * {@code COMMIT}: Consider the position as fully committed, and update the WAL position. - + * {@code SUBMIT}: Treat it as submitted but not committed, i.e., do not advance the WAL position. + * Some endpoints may buffer entries (e.g., in open files on S3) and delay actual persistence. In + * such cases, an empty batch should not result in WAL position commit. + */ +@InterfaceAudience.Private +public enum EmptyEntriesPolicy { + COMMIT, + SUBMIT +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index fc5c2bf62659..fbb6b6b9ef10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -291,4 +291,22 @@ public int getTimeout() { * @throws IllegalStateException if this service's state isn't FAILED. */ Throwable failureCause(); + + /** + * Defines the behavior when the replication source encounters an empty entry batch. + *

+ * By default, this method returns {@link EmptyEntriesPolicy#COMMIT}, meaning the replication + * source can safely consider the WAL position as committed and move on. + *

+ *

+ * However, certain endpoints like backup or asynchronous S3 writers may delay persistence (e.g., + * writing to temporary files or buffers). In those cases, returning + * {@link EmptyEntriesPolicy#SUBMIT} avoids incorrectly advancing WAL position and risking data + * loss. + *

+ * @return the {@link EmptyEntriesPolicy} to apply for empty entry batches. + */ + default EmptyEntriesPolicy getEmptyEntriesPolicy() { + return EmptyEntriesPolicy.COMMIT; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index f8ef9b22115f..e17afad00f84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -156,7 +157,15 @@ private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - updateLogPosition(entryBatch, ReplicationResult.SUBMITTED); + /* + * Delegate to the endpoint to decide how to treat empty entry batches. In most replication + * flows, receiving an empty entry batch means that everything so far has been successfully + * replicated and committed — so it's safe to mark the WAL position as committed (COMMIT). + * However, some endpoints (e.g., asynchronous S3 backups) may buffer writes and delay actual + * persistence. In such cases, we must avoid committing the WAL position prematurely. + */ + final ReplicationResult result = getReplicationResult(); + updateLogPosition(entryBatch, result); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -232,6 +241,13 @@ private void shipEdits(WALEntryBatch entryBatch) { } } + private ReplicationResult getReplicationResult() { + EmptyEntriesPolicy policy = source.getReplicationEndpoint().getEmptyEntriesPolicy(); + return (policy == EmptyEntriesPolicy.COMMIT) + ? ReplicationResult.COMMITTED + : ReplicationResult.SUBMITTED; + } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) { From 9bcd1b230e19c5ff897bd9bdb2f470b87285d5cf Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Tue, 15 Jul 2025 12:27:54 +0530 Subject: [PATCH 3/4] add unit tests --- .../ReplicationSourceShipper.java | 9 ++- .../regionserver/TestReplicationSource.java | 63 +++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index e17afad00f84..9cc86d44f3b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -151,9 +151,9 @@ protected void postFinish() { } /** - * Do the shipping logic + * Do the shipping logic. Package-private for test visibility only. Do not use outside tests. */ - private void shipEdits(WALEntryBatch entryBatch) { + void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { @@ -272,7 +272,10 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException { } } - private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { + /** + * Package-private for test visibility only. Do not use outside tests. + */ + boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { boolean updated = false; // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file // record on zk, so let's call it. The last wal position maybe zero if end of file is true and diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 37af52eb93b9..25eef51ff681 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -53,11 +53,13 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -492,6 +494,67 @@ public synchronized UUID getPeerUUID() { } + /** + * Custom ReplicationEndpoint that simulates an asynchronous target like S3 or cloud storage. In + * this case, empty entry batches should not cause WAL position to be committed immediately. + */ + public static class AsyncReplicationEndpoint extends DoNothingReplicationEndpoint { + @Override + public EmptyEntriesPolicy getEmptyEntriesPolicy() { + return EmptyEntriesPolicy.SUBMIT; + } + } + + /** + * Default synchronous ReplicationEndpoint that treats empty entry batches as a signal to commit + * WAL position, assuming all entries pushed before were safely replicated. + */ + public static class SyncReplicationEndpoint extends DoNothingReplicationEndpoint { + // Inherits default COMMIT behavior + } + + /** + * Verifies that ReplicationSourceShipper commits the WAL position when using a synchronous + * endpoint and the entry batch is empty. + */ + @Test + public void testEmptyBatchCommitsPositionForCommitEndpoint() { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + Mockito.when(source.getReplicationEndpoint()).thenReturn(new SyncReplicationEndpoint()); + + ReplicationSourceShipper shipper = + Mockito.spy(new ReplicationSourceShipper(conf, "testGroup", source, null)); + + WALEntryBatch emptyBatch = new WALEntryBatch(0, new Path("test-wal")); + + shipper.shipEdits(emptyBatch); + + // With default (COMMIT) policy, empty entry batch should advance WAL position + Mockito.verify(shipper).updateLogPosition(emptyBatch, ReplicationResult.COMMITTED); + } + + /** + * Verifies that ReplicationSourceShipper does NOT commit the WAL position when using an + * asynchronous endpoint and the entry batch is empty. + */ + @Test + public void testEmptyBatchSubmitsPositionForSubmitEndpoint() { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + Mockito.when(source.getReplicationEndpoint()).thenReturn(new AsyncReplicationEndpoint()); + + ReplicationSourceShipper shipper = + Mockito.spy(new ReplicationSourceShipper(conf, "testGroup", source, null)); + + WALEntryBatch emptyBatch = new WALEntryBatch(0, new Path("test-wal")); + + shipper.shipEdits(emptyBatch); + + // With SUBMIT policy, empty entry batch should NOT advance WAL position + Mockito.verify(shipper).updateLogPosition(emptyBatch, ReplicationResult.SUBMITTED); + } + private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, String endpointName) throws IOException { conf.setInt("replication.source.maxretriesmultiplier", 1); From be71bb096af01720034673045c5c6044a552ce86 Mon Sep 17 00:00:00 2001 From: Vinayak Hegde Date: Wed, 16 Jul 2025 09:20:39 +0530 Subject: [PATCH 4/4] restrict the package-private methods to tests only --- .../regionserver/ReplicationSourceShipper.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 9cc86d44f3b8..f45c8762683a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -151,8 +152,12 @@ protected void postFinish() { } /** - * Do the shipping logic. Package-private for test visibility only. Do not use outside tests. + * Do the shipping logic. */ + @RestrictedApi( + explanation = "Package-private for test visibility only. Do not use outside tests.", + link = "", + allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)") void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; @@ -272,9 +277,10 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException { } } - /** - * Package-private for test visibility only. Do not use outside tests. - */ + @RestrictedApi( + explanation = "Package-private for test visibility only. Do not use outside tests.", + link = "", + allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)") boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { boolean updated = false; // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file