From 7877771d9982ff5659588ac35ce768286cbcf199 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 02:32:57 +0800 Subject: [PATCH 01/27] - --- .../apache/bookkeeper/client/BookKeeper.java | 67 ++++++++++++++++++- .../bookkeeper/client/LedgerOpenOp.java | 23 ++++++- .../replication/BookieAutoRecoveryTest.java | 59 +++++++++++++++- 3 files changed, 143 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index c78f8fbbca1..0c7dbf78367 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -1218,14 +1218,52 @@ public void asyncCreateLedgerAdv(final long ledgerId, */ public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, final OpenCallback cb, final Object ctx) { + asyncOpenLedger(lId, digestType, passwd, cb, ctx, false); + } + + /** + * Open existing ledger asynchronously for reading. + * + *

Opening a ledger with this method invokes fencing and recovery on the ledger + * if the ledger has not been closed. Fencing will block all other clients from + * writing to the ledger. Recovery will make sure that the ledger is closed + * before reading from it. + * + *

Recovery also makes sure that any entries which reached one bookie, but not a + * quorum, will be replicated to a quorum of bookies. This occurs in cases were + * the writer of a ledger crashes after sending a write request to one bookie but + * before being able to send it to the rest of the bookies in the quorum. + * + *

If the ledger is already closed, neither fencing nor recovery will be applied. + * + * @see LedgerHandle#asyncClose + * + * @param lId + * ledger identifier + * @param digestType + * digest type, either MAC or CRC32 + * @param passwd + * password + * @param ctx + * optional control object + * @param keepUpdateMetadata + * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble. + */ + public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, + final OpenCallback cb, final Object ctx, boolean keepUpdateMetadata) { closeLock.readLock().lock(); try { if (closed) { cb.openComplete(BKException.Code.ClientClosedException, null, ctx); return; } - new LedgerOpenOp(BookKeeper.this, clientStats, - lId, digestType, passwd, cb, ctx).initiate(); + LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this, clientStats, + lId, digestType, passwd, cb, ctx); + if (keepUpdateMetadata) { + ledgerOpenOp.initiateWithKeepUpdateMetadata(); + } else { + ledgerOpenOp.initiate(); + } } finally { closeLock.readLock().unlock(); } @@ -1293,13 +1331,36 @@ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestTyp */ public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd) throws BKException, InterruptedException { + return openLedger(lId, digestType, passwd, false); + } + + + /** + * Synchronous open ledger call. + * + * @see #asyncOpenLedger + * @param lId + * ledger identifier + * @param digestType + * digest type, either MAC or CRC32 + * @param passwd + * password + * + * @param keepUpdateMetadata + * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble. + * @return a handle to the open ledger + * @throws InterruptedException + * @throws BKException + */ + public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd, boolean keepUpdateMetadata) + throws BKException, InterruptedException { CompletableFuture future = new CompletableFuture<>(); SyncOpenCallback result = new SyncOpenCallback(future); /* * Calls async open ledger */ - asyncOpenLedger(lId, digestType, passwd, result, null); + asyncOpenLedger(lId, digestType, passwd, result, null, keepUpdateMetadata); return SyncCallbackUtils.waitForResult(future); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 943aa8cd2a9..0cae3274503 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -57,6 +57,18 @@ class LedgerOpenOp { ReadOnlyLedgerHandle lh; final byte[] passwd; boolean doRecovery = true; + // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may rewrite + // the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an opened ledger + // handle in memory still accesses to a BK instance who has been decommissioned. The issue that solved happens as + // follows: + // 1. Client service open a readonly ledger handle, which has been closed. + // 2. All BKs that relates to the ledger have been decommissioned. + // 3. Auto recovery component moved the data into other BK instances who is alive. + // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, and the + // connection will always fail. + // For minimum modification, to add a new configuration named "keepUpdateMetadata", users can use the + // new API to create a readonly ledger handle that will auto-updates metadata. + boolean keepUpdateMetadata = false; boolean administrativeOpen = false; long startTime; final OpStatsLogger openOpLogger; @@ -126,6 +138,15 @@ public void initiateWithoutRecovery() { initiate(); } + /** + * Different with {@link #initiate()}, the method keep update metadata once the auto-recover component modified + * the ensemble. + */ + public void initiateWithKeepUpdateMetadata() { + this.keepUpdateMetadata = true; + initiate(); + } + private CompletableFuture closeLedgerHandleAsync() { if (lh != null) { return lh.closeAsync(); @@ -176,7 +197,7 @@ private void openWithMetadata(Versioned versionedMetadata) { // get the ledger metadata back try { lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType, - passwd, !doRecovery); + passwd, !doRecovery || keepUpdateMetadata); } catch (GeneralSecurityException e) { LOG.error("Security exception while opening ledger: " + ledgerId, e); openComplete(BKException.Code.DigestNotInitializedException, null); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 20b5e6a0c61..95c441ade1c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -24,10 +24,12 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.SortedMap; import java.util.concurrent.CountDownLatch; @@ -36,6 +38,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -55,6 +58,7 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.data.Stat; +import org.awaitility.Awaitility; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +144,51 @@ public void tearDown() throws Exception { } } + /** + * The purpose of this test: + * 1. Client service open a readonly ledger handle, which has been closed. + * 2. All BKs that relates to the ledger have been decommissioned. + * 3. Auto recovery component moved the data into other BK instances who is alive. + * 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read + * request works. + */ + @Test + public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD); + lh.addEntry(data); + lh.close(); + List ensemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); + BookieId bookieIdDoesNotInEnsemble = null; + for (BookieId bkAddr : bookieAddresses()) { + if (!ensemble.contains(bkAddr)) { + bookieIdDoesNotInEnsemble = bkAddr; + break; + } + } + LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, false); + assertTrue(ensemble.size() == 2); + + killBookie(ensemble.get(0)); + verifyLedgerEnsembleMetadataAfterReplication(bookieIdDoesNotInEnsemble, + lh, 0); + + startNewBookie(); + int newBookieIndex = lastBookieIndex(); + BookieServer newBookieServer = serverByIndex(newBookieIndex); + killBookie(ensemble.get(1)); + verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, + lh, 1); + + Awaitility.await().untilAsserted(() -> { + LedgerEntries ledgerEntries = readonlyLh.read(0, 0); + assertNotNull(ledgerEntries); + byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes(); + assertEquals(new String(data), new String(entryBytes)); + ledgerEntries.close(); + }); + readonlyLh.close(); + } + /** * Test verifies publish urLedger by Auditor and replication worker is * picking up the entries and finishing the rereplication of open ledger. @@ -590,13 +639,19 @@ private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) { private void verifyLedgerEnsembleMetadataAfterReplication( BookieServer newBookieServer, LedgerHandle lh, int ledgerReplicaIndex) throws Exception { + verifyLedgerEnsembleMetadataAfterReplication(newBookieServer.getBookieId(), lh, ledgerReplicaIndex); + } + + private void verifyLedgerEnsembleMetadataAfterReplication( + BookieId newBookie, LedgerHandle lh, + int ledgerReplicaIndex) throws Exception { LedgerHandle openLedger = bkc .openLedger(lh.getId(), digestType, PASSWD); BookieId inetSocketAddress = openLedger.getLedgerMetadata().getAllEnsembles().get(0L) .get(ledgerReplicaIndex); - assertEquals("Rereplication has been failed and ledgerReplicaIndex :" - + ledgerReplicaIndex, newBookieServer.getBookieId(), + assertEquals("Replication has been failed and ledgerReplicaIndex :" + + ledgerReplicaIndex, newBookie, inetSocketAddress); openLedger.close(); } From cca3a03556613b306d13e09ba6ce9dc6671f40f9 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 02:53:08 +0800 Subject: [PATCH 02/27] checkstyle --- .../apache/bookkeeper/replication/BookieAutoRecoveryTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 95c441ade1c..0e51fd8a4f0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -24,12 +24,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.SortedMap; import java.util.concurrent.CountDownLatch; From 475c9474f278fb0b18a057199944329e2da27278 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 09:40:56 +0800 Subject: [PATCH 03/27] let all ledger handle enable watcher --- .../apache/bookkeeper/client/BookKeeper.java | 67 +------------------ .../bookkeeper/client/LedgerOpenOp.java | 33 +++------ .../replication/BookieAutoRecoveryTest.java | 2 +- 3 files changed, 15 insertions(+), 87 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 0c7dbf78367..c78f8fbbca1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -1218,52 +1218,14 @@ public void asyncCreateLedgerAdv(final long ledgerId, */ public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, final OpenCallback cb, final Object ctx) { - asyncOpenLedger(lId, digestType, passwd, cb, ctx, false); - } - - /** - * Open existing ledger asynchronously for reading. - * - *

Opening a ledger with this method invokes fencing and recovery on the ledger - * if the ledger has not been closed. Fencing will block all other clients from - * writing to the ledger. Recovery will make sure that the ledger is closed - * before reading from it. - * - *

Recovery also makes sure that any entries which reached one bookie, but not a - * quorum, will be replicated to a quorum of bookies. This occurs in cases were - * the writer of a ledger crashes after sending a write request to one bookie but - * before being able to send it to the rest of the bookies in the quorum. - * - *

If the ledger is already closed, neither fencing nor recovery will be applied. - * - * @see LedgerHandle#asyncClose - * - * @param lId - * ledger identifier - * @param digestType - * digest type, either MAC or CRC32 - * @param passwd - * password - * @param ctx - * optional control object - * @param keepUpdateMetadata - * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble. - */ - public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, - final OpenCallback cb, final Object ctx, boolean keepUpdateMetadata) { closeLock.readLock().lock(); try { if (closed) { cb.openComplete(BKException.Code.ClientClosedException, null, ctx); return; } - LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this, clientStats, - lId, digestType, passwd, cb, ctx); - if (keepUpdateMetadata) { - ledgerOpenOp.initiateWithKeepUpdateMetadata(); - } else { - ledgerOpenOp.initiate(); - } + new LedgerOpenOp(BookKeeper.this, clientStats, + lId, digestType, passwd, cb, ctx).initiate(); } finally { closeLock.readLock().unlock(); } @@ -1331,36 +1293,13 @@ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestTyp */ public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd) throws BKException, InterruptedException { - return openLedger(lId, digestType, passwd, false); - } - - - /** - * Synchronous open ledger call. - * - * @see #asyncOpenLedger - * @param lId - * ledger identifier - * @param digestType - * digest type, either MAC or CRC32 - * @param passwd - * password - * - * @param keepUpdateMetadata - * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble. - * @return a handle to the open ledger - * @throws InterruptedException - * @throws BKException - */ - public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd, boolean keepUpdateMetadata) - throws BKException, InterruptedException { CompletableFuture future = new CompletableFuture<>(); SyncOpenCallback result = new SyncOpenCallback(future); /* * Calls async open ledger */ - asyncOpenLedger(lId, digestType, passwd, result, null, keepUpdateMetadata); + asyncOpenLedger(lId, digestType, passwd, result, null); return SyncCallbackUtils.waitForResult(future); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 0cae3274503..8bfd87a1f7f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -57,18 +57,6 @@ class LedgerOpenOp { ReadOnlyLedgerHandle lh; final byte[] passwd; boolean doRecovery = true; - // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may rewrite - // the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an opened ledger - // handle in memory still accesses to a BK instance who has been decommissioned. The issue that solved happens as - // follows: - // 1. Client service open a readonly ledger handle, which has been closed. - // 2. All BKs that relates to the ledger have been decommissioned. - // 3. Auto recovery component moved the data into other BK instances who is alive. - // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, and the - // connection will always fail. - // For minimum modification, to add a new configuration named "keepUpdateMetadata", users can use the - // new API to create a readonly ledger handle that will auto-updates metadata. - boolean keepUpdateMetadata = false; boolean administrativeOpen = false; long startTime; final OpStatsLogger openOpLogger; @@ -138,15 +126,6 @@ public void initiateWithoutRecovery() { initiate(); } - /** - * Different with {@link #initiate()}, the method keep update metadata once the auto-recover component modified - * the ensemble. - */ - public void initiateWithKeepUpdateMetadata() { - this.keepUpdateMetadata = true; - initiate(); - } - private CompletableFuture closeLedgerHandleAsync() { if (lh != null) { return lh.closeAsync(); @@ -196,8 +175,18 @@ private void openWithMetadata(Versioned versionedMetadata) { // get the ledger metadata back try { + // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may + // rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an + // opened ledger handle in memory still accesses to a BK instance who has been decommissioned. The issue + // that solved happens as follows: + // 1. Client service open a readonly ledger handle, which has been closed. + // 2. All BKs that relates to the ledger have been decommissioned. + // 3. Auto recovery component moved the data into other BK instances who is alive. + // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, + // and the connection will always fail. + // Therefore, all ledger handle need the watcher, lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType, - passwd, !doRecovery || keepUpdateMetadata); + passwd, true); } catch (GeneralSecurityException e) { LOG.error("Security exception while opening ledger: " + ledgerId, e); openComplete(BKException.Code.DigestNotInitializedException, null); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 0e51fd8a4f0..142b3b24317 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -163,7 +163,7 @@ public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Excepti break; } } - LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, false); + LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD); assertTrue(ensemble.size() == 2); killBookie(ensemble.get(0)); From b7c50ac0556003f5bdc6c4e4a66ec607665f0618 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 09:42:05 +0800 Subject: [PATCH 04/27] let all ledger handle enable watcher --- .../main/java/org/apache/bookkeeper/client/LedgerOpenOp.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 8bfd87a1f7f..2dbb11665c4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -184,7 +184,7 @@ private void openWithMetadata(Versioned versionedMetadata) { // 3. Auto recovery component moved the data into other BK instances who is alive. // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, // and the connection will always fail. - // Therefore, all ledger handle need the watcher, + // Therefore, whether the ledger handle is closed or not, it needs the watcher, lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType, passwd, true); } catch (GeneralSecurityException e) { From 2fd8c376debb6bbf0601ca4650a4f8b7d1c581a8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 16:19:21 +0800 Subject: [PATCH 05/27] fix tests --- .../replication/BookieAutoRecoveryTest.java | 57 +----- .../FullEnsembleDecommissionedTest.java | 184 ++++++++++++++++++ 2 files changed, 186 insertions(+), 55 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 142b3b24317..20b5e6a0c61 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -36,7 +36,6 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -56,7 +55,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.data.Stat; -import org.awaitility.Awaitility; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,51 +140,6 @@ public void tearDown() throws Exception { } } - /** - * The purpose of this test: - * 1. Client service open a readonly ledger handle, which has been closed. - * 2. All BKs that relates to the ledger have been decommissioned. - * 3. Auto recovery component moved the data into other BK instances who is alive. - * 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read - * request works. - */ - @Test - public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Exception { - LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD); - lh.addEntry(data); - lh.close(); - List ensemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); - BookieId bookieIdDoesNotInEnsemble = null; - for (BookieId bkAddr : bookieAddresses()) { - if (!ensemble.contains(bkAddr)) { - bookieIdDoesNotInEnsemble = bkAddr; - break; - } - } - LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD); - assertTrue(ensemble.size() == 2); - - killBookie(ensemble.get(0)); - verifyLedgerEnsembleMetadataAfterReplication(bookieIdDoesNotInEnsemble, - lh, 0); - - startNewBookie(); - int newBookieIndex = lastBookieIndex(); - BookieServer newBookieServer = serverByIndex(newBookieIndex); - killBookie(ensemble.get(1)); - verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, - lh, 1); - - Awaitility.await().untilAsserted(() -> { - LedgerEntries ledgerEntries = readonlyLh.read(0, 0); - assertNotNull(ledgerEntries); - byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes(); - assertEquals(new String(data), new String(entryBytes)); - ledgerEntries.close(); - }); - readonlyLh.close(); - } - /** * Test verifies publish urLedger by Auditor and replication worker is * picking up the entries and finishing the rereplication of open ledger. @@ -637,19 +590,13 @@ private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) { private void verifyLedgerEnsembleMetadataAfterReplication( BookieServer newBookieServer, LedgerHandle lh, int ledgerReplicaIndex) throws Exception { - verifyLedgerEnsembleMetadataAfterReplication(newBookieServer.getBookieId(), lh, ledgerReplicaIndex); - } - - private void verifyLedgerEnsembleMetadataAfterReplication( - BookieId newBookie, LedgerHandle lh, - int ledgerReplicaIndex) throws Exception { LedgerHandle openLedger = bkc .openLedger(lh.getId(), digestType, PASSWD); BookieId inetSocketAddress = openLedger.getLedgerMetadata().getAllEnsembles().get(0L) .get(ledgerReplicaIndex); - assertEquals("Replication has been failed and ledgerReplicaIndex :" - + ledgerReplicaIndex, newBookie, + assertEquals("Rereplication has been failed and ledgerReplicaIndex :" + + ledgerReplicaIndex, newBookieServer.getBookieId(), inetSocketAddress); openLedger.close(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java new file mode 100644 index 00000000000..f0d23b7256c --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.MetadataClientDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.awaitility.Awaitility; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests verifies the complete functionality of the + * Auditor-rereplication process: Auditor will publish the bookie failures, + * consequently ReplicationWorker will get the notifications and act on it. + */ +public class FullEnsembleDecommissionedTest extends BookKeeperClusterTestCase { + private static final Logger LOG = LoggerFactory + .getLogger(FullEnsembleDecommissionedTest.class); + private static final byte[] PASSWD = "admin".getBytes(); + private static final byte[] data = "TESTDATA".getBytes(); + private static final String openLedgerRereplicationGracePeriod = "3000"; // milliseconds + + private DigestType digestType; + private MetadataClientDriver metadataClientDriver; + private LedgerManagerFactory mFactory; + private LedgerUnderreplicationManager underReplicationManager; + private LedgerManager ledgerManager; + private OrderedScheduler scheduler; + + private final String underreplicatedPath = "/ledgers/underreplication/ledgers"; + + public FullEnsembleDecommissionedTest() throws Exception{ + super(2); + + baseConf.setLedgerManagerFactoryClassName( + "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory"); + baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod); + baseConf.setRwRereplicateBackoffMs(500); + baseClientConf.setLedgerManagerFactoryClassName( + "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory"); + this.digestType = DigestType.MAC; + setAutoRecoveryEnabled(true); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + + scheduler = OrderedScheduler.newSchedulerBuilder() + .name("test-scheduler") + .numThreads(1) + .build(); + + metadataClientDriver = MetadataDrivers.getClientDriver( + URI.create(baseClientConf.getMetadataServiceUri())); + metadataClientDriver.initialize( + baseClientConf, + scheduler, + NullStatsLogger.INSTANCE, + Optional.empty()); + + // initialize urReplicationManager + mFactory = metadataClientDriver.getLedgerManagerFactory(); + underReplicationManager = mFactory.newLedgerUnderreplicationManager(); + ledgerManager = mFactory.newLedgerManager(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + if (null != underReplicationManager) { + underReplicationManager.close(); + underReplicationManager = null; + } + if (null != ledgerManager) { + ledgerManager.close(); + ledgerManager = null; + } + if (null != metadataClientDriver) { + metadataClientDriver.close(); + metadataClientDriver = null; + } + if (null != scheduler) { + scheduler.shutdown(); + } + } + + /** + * The purpose of this test: + * 1. Client service open a readonly ledger handle, which has been closed. + * 2. All BKs that relates to the ledger have been decommissioned. + * 3. Auto recovery component moved the data into other BK instances who is alive. + * 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read + * request works. + */ + @Test + public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD); + assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 2); + lh.addEntry(data); + lh.close(); + List originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); + LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD); + assertTrue(originalEnsemble.size() == 2); + + startNewBookie(); + BookieServer newBookieServer3 = serverByIndex(lastBookieIndex()); + killBookie(originalEnsemble.get(0)); + waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), newBookieServer3.getBookieId()); + + startNewBookie(); + int newBookieIndex4 = lastBookieIndex(); + BookieServer newBookieServer4 = serverByIndex(newBookieIndex4); + killBookie(originalEnsemble.get(1)); + waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), newBookieServer4.getBookieId()); + + Awaitility.await().untilAsserted(() -> { + LedgerEntries ledgerEntries = readonlyLh.read(0, 0); + assertNotNull(ledgerEntries); + byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes(); + assertEquals(new String(data), new String(entryBytes)); + ledgerEntries.close(); + }); + readonlyLh.close(); + } + + private void waitAutoRecoveryFinished(long lId, BookieId originalBookie, + BookieId newBookie) throws Exception { + Awaitility.await().untilAsserted(() -> { + LedgerHandle openLedger = bkc.openLedger(lId, digestType, PASSWD); + NavigableMap> map = openLedger.getLedgerMetadata().getAllEnsembles(); + try { + for (Map.Entry> entry : map.entrySet()) { + assertFalse(entry.getValue().contains(originalBookie)); + assertTrue(entry.getValue().contains(newBookie)); + } + } finally { + openLedger.close(); + } + }); + } +} From 28205a2624fb96ece406904db2543177025652c9 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 16:20:18 +0800 Subject: [PATCH 06/27] fix tests --- .../replication/FullEnsembleDecommissionedTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java index f0d23b7256c..d36e438733b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java @@ -48,9 +48,7 @@ import org.slf4j.LoggerFactory; /** - * Integration tests verifies the complete functionality of the - * Auditor-rereplication process: Auditor will publish the bookie failures, - * consequently ReplicationWorker will get the notifications and act on it. + * Integration tests verifies the complete decommission tasks. */ public class FullEnsembleDecommissionedTest extends BookKeeperClusterTestCase { private static final Logger LOG = LoggerFactory @@ -66,8 +64,6 @@ public class FullEnsembleDecommissionedTest extends BookKeeperClusterTestCase { private LedgerManager ledgerManager; private OrderedScheduler scheduler; - private final String underreplicatedPath = "/ledgers/underreplication/ledgers"; - public FullEnsembleDecommissionedTest() throws Exception{ super(2); From cdd83373bf6655e8dd54ed8fd00c4e24e053a8bd Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 18:07:38 +0800 Subject: [PATCH 07/27] fix tests --- .../java/org/apache/bookkeeper/replication/AuditorElector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java index f6b3a3a04f2..0585fa150a6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java @@ -244,7 +244,7 @@ public void shutdown() throws InterruptedException { } // close auditor manager try { - submitShutdownTask().get(10, TimeUnit.SECONDS); + submitShutdownTask().get(60, TimeUnit.SECONDS); executor.shutdown(); } catch (ExecutionException e) { LOG.warn("Failed to close auditor manager", e); From 8702288d507e881930050b248f3f67aaab0d7073 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 20:53:05 +0800 Subject: [PATCH 08/27] add test logs for debug --- .../bookkeeper/replication/AuditorElector.java | 2 +- .../replication/BookieAutoRecoveryTest.java | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java index 0585fa150a6..f6b3a3a04f2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java @@ -244,7 +244,7 @@ public void shutdown() throws InterruptedException { } // close auditor manager try { - submitShutdownTask().get(60, TimeUnit.SECONDS); + submitShutdownTask().get(10, TimeUnit.SECONDS); executor.shutdown(); } catch (ExecutionException e) { LOG.warn("Failed to close auditor manager", e); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 20b5e6a0c61..c58ed016a95 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -32,6 +32,8 @@ import java.util.SortedMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import jdk.jpackage.internal.Log; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeperTestClient; @@ -208,7 +210,8 @@ public void testClosedLedgers() throws Exception { assertNull("UrLedger already exists!", watchUrLedgerNode(getUrLedgerZNode(lh), latch)); } - + LOG.info("Ledgers wait for replication: {}", listOfLedgerHandle.stream().map(LedgerHandle::getId).collect( + Collectors.toList())); LOG.info("Killing Bookie :" + replicaToKillAddr); killBookie(replicaToKillAddr); @@ -634,13 +637,13 @@ private Stat watchUrLedgerNode(final String znode, @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeDeleted) { - LOG.info("Received Ledger rereplication completion event :" - + event.getType()); + LOG.info("Received Ledger replication completion. event : {}, path: {}", + event.getType(), event.getPath()); latch.countDown(); } if (event.getType() == EventType.NodeCreated) { - LOG.info("Received urLedger publishing event :" - + event.getType()); + LOG.info("Received urLedger publishing event: {}, path: {}", + event.getType(), event.getPath()); latch.countDown(); } } From ac1fe9c66e3182008ade0384f1194e315e34cfdb Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 21:29:50 +0800 Subject: [PATCH 09/27] add test logs for debug --- .../apache/bookkeeper/replication/BookieAutoRecoveryTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index c58ed016a95..5b928db15a8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import jdk.jpackage.internal.Log; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeperTestClient; From ead1c0e91e8efc00613779b7f98a90eed5d1f5fc Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 22:34:22 +0800 Subject: [PATCH 10/27] add test logs for debug --- .../bookkeeper/replication/BookieAutoRecoveryTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 5b928db15a8..16b8fe1ac80 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -636,13 +636,13 @@ private Stat watchUrLedgerNode(final String znode, @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeDeleted) { - LOG.info("Received Ledger replication completion. event : {}, path: {}", - event.getType(), event.getPath()); + LOG.info("Received Ledger replication completion. event : {}, path: {}, latchCount: {}", + event.getType(), event.getPath(), latch.getCount()); latch.countDown(); } if (event.getType() == EventType.NodeCreated) { - LOG.info("Received urLedger publishing event: {}, path: {}", - event.getType(), event.getPath()); + LOG.info("Received urLedger publishing event: {}, path: {}, latchCount: {}", + event.getType(), event.getPath(), latch.getCount()); latch.countDown(); } } From 32280e71c3b2351615423cb9b405936d5e657b2d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 6 Jun 2025 23:18:38 +0800 Subject: [PATCH 11/27] - --- .../apache/bookkeeper/replication/BookieAutoRecoveryTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 16b8fe1ac80..c412dffc07f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -32,7 +32,6 @@ import java.util.SortedMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeperTestClient; @@ -209,8 +208,7 @@ public void testClosedLedgers() throws Exception { assertNull("UrLedger already exists!", watchUrLedgerNode(getUrLedgerZNode(lh), latch)); } - LOG.info("Ledgers wait for replication: {}", listOfLedgerHandle.stream().map(LedgerHandle::getId).collect( - Collectors.toList())); + LOG.info("Killing Bookie :" + replicaToKillAddr); killBookie(replicaToKillAddr); From f8ade544a82146176ea0f1ceb5459554319c04c8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Jun 2025 10:50:15 +0800 Subject: [PATCH 12/27] add a new param keepUpdateMetadata when open a read-only ledger handle --- .../apache/bookkeeper/client/BookKeeper.java | 67 ++++++++++++++++++- .../bookkeeper/client/LedgerOpenOp.java | 26 ++++++- .../FullEnsembleDecommissionedTest.java | 2 +- 3 files changed, 89 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index c78f8fbbca1..0c7dbf78367 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -1218,14 +1218,52 @@ public void asyncCreateLedgerAdv(final long ledgerId, */ public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, final OpenCallback cb, final Object ctx) { + asyncOpenLedger(lId, digestType, passwd, cb, ctx, false); + } + + /** + * Open existing ledger asynchronously for reading. + * + *

Opening a ledger with this method invokes fencing and recovery on the ledger + * if the ledger has not been closed. Fencing will block all other clients from + * writing to the ledger. Recovery will make sure that the ledger is closed + * before reading from it. + * + *

Recovery also makes sure that any entries which reached one bookie, but not a + * quorum, will be replicated to a quorum of bookies. This occurs in cases were + * the writer of a ledger crashes after sending a write request to one bookie but + * before being able to send it to the rest of the bookies in the quorum. + * + *

If the ledger is already closed, neither fencing nor recovery will be applied. + * + * @see LedgerHandle#asyncClose + * + * @param lId + * ledger identifier + * @param digestType + * digest type, either MAC or CRC32 + * @param passwd + * password + * @param ctx + * optional control object + * @param keepUpdateMetadata + * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble. + */ + public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, + final OpenCallback cb, final Object ctx, boolean keepUpdateMetadata) { closeLock.readLock().lock(); try { if (closed) { cb.openComplete(BKException.Code.ClientClosedException, null, ctx); return; } - new LedgerOpenOp(BookKeeper.this, clientStats, - lId, digestType, passwd, cb, ctx).initiate(); + LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this, clientStats, + lId, digestType, passwd, cb, ctx); + if (keepUpdateMetadata) { + ledgerOpenOp.initiateWithKeepUpdateMetadata(); + } else { + ledgerOpenOp.initiate(); + } } finally { closeLock.readLock().unlock(); } @@ -1293,13 +1331,36 @@ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestTyp */ public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd) throws BKException, InterruptedException { + return openLedger(lId, digestType, passwd, false); + } + + + /** + * Synchronous open ledger call. + * + * @see #asyncOpenLedger + * @param lId + * ledger identifier + * @param digestType + * digest type, either MAC or CRC32 + * @param passwd + * password + * + * @param keepUpdateMetadata + * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble. + * @return a handle to the open ledger + * @throws InterruptedException + * @throws BKException + */ + public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd, boolean keepUpdateMetadata) + throws BKException, InterruptedException { CompletableFuture future = new CompletableFuture<>(); SyncOpenCallback result = new SyncOpenCallback(future); /* * Calls async open ledger */ - asyncOpenLedger(lId, digestType, passwd, result, null); + asyncOpenLedger(lId, digestType, passwd, result, null, keepUpdateMetadata); return SyncCallbackUtils.waitForResult(future); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 2dbb11665c4..1101a7d4e1a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -57,6 +57,18 @@ class LedgerOpenOp { ReadOnlyLedgerHandle lh; final byte[] passwd; boolean doRecovery = true; + // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may rewrite + // the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an opened ledger + // handle in memory still accesses to a BK instance who has been decommissioned. The issue that solved happens as + // follows: + // 1. Client service open a readonly ledger handle, which has been closed. + // 2. All BKs that relates to the ledger have been decommissioned. + // 3. Auto recovery component moved the data into other BK instances who is alive. + // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, and the + // connection will always fail. + // For minimum modification, to add a new configuration named "keepUpdateMetadata", users can use the + // new API to create a readonly ledger handle that will auto-updates metadata. + boolean keepUpdateMetadata = false; boolean administrativeOpen = false; long startTime; final OpStatsLogger openOpLogger; @@ -126,6 +138,15 @@ public void initiateWithoutRecovery() { initiate(); } + /** + * Different with {@link #initiate()}, the method keep update metadata once the auto-recover component modified + * the ensemble. + */ + public void initiateWithKeepUpdateMetadata() { + this.keepUpdateMetadata = true; + initiate(); + } + private CompletableFuture closeLedgerHandleAsync() { if (lh != null) { return lh.closeAsync(); @@ -184,9 +205,10 @@ private void openWithMetadata(Versioned versionedMetadata) { // 3. Auto recovery component moved the data into other BK instances who is alive. // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, // and the connection will always fail. - // Therefore, whether the ledger handle is closed or not, it needs the watcher, + // Therefore, if a user needs to the feature that update metadata automatically, he will set + // "keepUpdateMetadata" to "true", lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType, - passwd, true); + passwd, !doRecovery || keepUpdateMetadata); } catch (GeneralSecurityException e) { LOG.error("Security exception while opening ledger: " + ledgerId, e); openComplete(BKException.Code.DigestNotInitializedException, null); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java index d36e438733b..9c2d221d328 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java @@ -138,7 +138,7 @@ public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Excepti lh.addEntry(data); lh.close(); List originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); - LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD); + LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, true); assertTrue(originalEnsemble.size() == 2); startNewBookie(); From 3e0f58ad273d6ab34554de6ee6bc60afe3276dc0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 8 Jul 2025 23:32:43 +0800 Subject: [PATCH 13/27] address comments --- .../bookkeeper/client/LedgerOpenOp.java | 6 ++- .../client/ReadOnlyLedgerHandle.java | 10 +++-- .../FullEnsembleDecommissionedTest.java | 38 +++++++++++++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 1101a7d4e1a..e45291f68a7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -195,6 +195,7 @@ private void openWithMetadata(Versioned versionedMetadata) { } // get the ledger metadata back + final boolean needRecovery = !doRecovery || !metadata.isClosed(); try { // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may // rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an @@ -208,7 +209,7 @@ private void openWithMetadata(Versioned versionedMetadata) { // Therefore, if a user needs to the feature that update metadata automatically, he will set // "keepUpdateMetadata" to "true", lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType, - passwd, !doRecovery || keepUpdateMetadata); + passwd, !needRecovery); } catch (GeneralSecurityException e) { LOG.error("Security exception while opening ledger: " + ledgerId, e); openComplete(BKException.Code.DigestNotInitializedException, null); @@ -231,6 +232,9 @@ private void openWithMetadata(Versioned versionedMetadata) { public void safeOperationComplete(int rc, Void result) { if (rc == BKException.Code.OK) { openComplete(BKException.Code.OK, lh); + if (needRecovery && keepUpdateMetadata) { + lh.registerLedgerMetadataListener(); + } } else { closeLedgerHandleAsync().whenComplete((ignore, ex) -> { if (ex != null) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java index 95e8666660d..ffad02d8f7c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java @@ -95,14 +95,18 @@ public String toString() { ReadOnlyLedgerHandle(ClientContext clientCtx, long ledgerId, Versioned metadata, BookKeeper.DigestType digestType, byte[] password, - boolean watch) + boolean watchImmediately) throws GeneralSecurityException, NumberFormatException { super(clientCtx, ledgerId, metadata, digestType, password, WriteFlag.NONE); - if (watch) { - clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this); + if (watchImmediately) { + registerLedgerMetadataListener(); } } + void registerLedgerMetadataListener() { + clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this); + } + @Override public void close() throws InterruptedException, BKException { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java index 9c2d221d328..4a31b617e58 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java @@ -138,6 +138,44 @@ public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Excepti lh.addEntry(data); lh.close(); List originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); + LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, false); + assertTrue(originalEnsemble.size() == 2); + + startNewBookie(); + BookieServer newBookieServer3 = serverByIndex(lastBookieIndex()); + killBookie(originalEnsemble.get(0)); + waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), newBookieServer3.getBookieId()); + + startNewBookie(); + int newBookieIndex4 = lastBookieIndex(); + BookieServer newBookieServer4 = serverByIndex(newBookieIndex4); + killBookie(originalEnsemble.get(1)); + waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), newBookieServer4.getBookieId()); + + Awaitility.await().untilAsserted(() -> { + LedgerEntries ledgerEntries = readonlyLh.read(0, 0); + assertNotNull(ledgerEntries); + byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes(); + assertEquals(new String(data), new String(entryBytes)); + ledgerEntries.close(); + }); + readonlyLh.close(); + } + + /** + * The purpose of this test: + * 1. Client service open a readonly ledger handle with recovery, which has not been closed yet. + * 2. All BKs that relates to the ledger have been decommissioned. + * 3. Auto recovery component moved the data into other BK instances who is alive. + * 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read + * request works. + */ + @Test + public void testRecoverOpenLedgerHandleStillWorkAfterDecommissioning() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD); + assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 2); + lh.addEntry(data); + List originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, true); assertTrue(originalEnsemble.size() == 2); From 84b6f08bcdb00179dfc50c8fc55323f3201008f9 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 10 Jul 2025 14:38:59 +0800 Subject: [PATCH 14/27] address comment --- .../main/java/org/apache/bookkeeper/client/LedgerOpenOp.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index e45291f68a7..8de7595fc58 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -195,7 +195,7 @@ private void openWithMetadata(Versioned versionedMetadata) { } // get the ledger metadata back - final boolean needRecovery = !doRecovery || !metadata.isClosed(); + final boolean watchImmediately = !doRecovery || metadata.isClosed(); try { // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may // rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an @@ -209,7 +209,7 @@ private void openWithMetadata(Versioned versionedMetadata) { // Therefore, if a user needs to the feature that update metadata automatically, he will set // "keepUpdateMetadata" to "true", lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType, - passwd, !needRecovery); + passwd, watchImmediately); } catch (GeneralSecurityException e) { LOG.error("Security exception while opening ledger: " + ledgerId, e); openComplete(BKException.Code.DigestNotInitializedException, null); From f0991636d78f6ce54af1d9f5ef1b84849596af33 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 10 Jul 2025 14:42:57 +0800 Subject: [PATCH 15/27] address comment --- .../main/java/org/apache/bookkeeper/client/LedgerOpenOp.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 8de7595fc58..a4b849bf6de 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -232,7 +232,7 @@ private void openWithMetadata(Versioned versionedMetadata) { public void safeOperationComplete(int rc, Void result) { if (rc == BKException.Code.OK) { openComplete(BKException.Code.OK, lh); - if (needRecovery && keepUpdateMetadata) { + if (!watchImmediately && keepUpdateMetadata) { lh.registerLedgerMetadataListener(); } } else { From f8ef86e552b3c5c303340c1eef23479612cbcf75 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 5 Aug 2025 15:56:24 +0800 Subject: [PATCH 16/27] test CI --- .../replication/BookieAutoRecoveryTest.java | 542 +++++------------- 1 file changed, 153 insertions(+), 389 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index c412dffc07f..169f7e78ab7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; @@ -31,13 +30,10 @@ import java.util.Optional; import java.util.SortedMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; -import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; -import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; @@ -140,6 +136,96 @@ public void tearDown() throws Exception { } } +// @Test +// public void testOpenLedgers1() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers1() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testOpenLedgers2() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers2() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testOpenLedgers3() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers3() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testOpenLedgers4() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers4() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testOpenLedgers5() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers5() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testOpenLedgers6() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers6() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testOpenLedgers7() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers7() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testOpenLedgers8() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers8() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testOpenLedgers9() throws Exception { +// testOpenLedgers(); +// } +// +// @Test +// public void testClosedLedgers9() throws Exception { +// testOpenLedgers(); +// } + /** * Test verifies publish urLedger by Auditor and replication worker is * picking up the entries and finishing the rereplication of open ledger. @@ -188,392 +274,70 @@ public void testOpenLedgers() throws Exception { listOfLedgerHandle.get(0), ledgerReplicaIndex); } - /** - * Test verifies publish urLedger by Auditor and replication worker is - * picking up the entries and finishing the rereplication of closed ledgers. - */ - @Test - public void testClosedLedgers() throws Exception { - List listOfReplicaIndex = new ArrayList(); - List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); - closeLedgers(listOfLedgerHandle); - LedgerHandle lhandle = listOfLedgerHandle.get(0); - int ledgerReplicaIndex = 0; - BookieId replicaToKillAddr = lhandle.getLedgerMetadata().getAllEnsembles().get(0L).get(0); - - CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); - for (LedgerHandle lh : listOfLedgerHandle) { - ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); - listOfReplicaIndex.add(ledgerReplicaIndex); - assertNull("UrLedger already exists!", - watchUrLedgerNode(getUrLedgerZNode(lh), latch)); - } - - LOG.info("Killing Bookie :" + replicaToKillAddr); - killBookie(replicaToKillAddr); - - // waiting to publish urLedger znode by Auditor - latch.await(); - - // Again watching the urLedger znode to know the replication status - latch = new CountDownLatch(listOfLedgerHandle.size()); - for (LedgerHandle lh : listOfLedgerHandle) { - String urLedgerZNode = getUrLedgerZNode(lh); - LOG.info("Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); - assertNotNull("UrLedger doesn't exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - } - - // starting the replication service, so that he will be able to act as - // target bookie - startNewBookie(); - int newBookieIndex = lastBookieIndex(); - BookieServer newBookieServer = serverByIndex(newBookieIndex); - - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting to finish the replication of failed bookie : " - + replicaToKillAddr); - } - - // waiting to finish replication - latch.await(); - - // grace period to update the urledger metadata in zookeeper - LOG.info("Waiting to update the urledger metadata in zookeeper"); - - for (int index = 0; index < listOfLedgerHandle.size(); index++) { - verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, - listOfLedgerHandle.get(index), - listOfReplicaIndex.get(index)); - } - } - - /** - * Test stopping replica service while replication in progress. Considering - * when there is an exception will shutdown Auditor and RW processes. After - * restarting should be able to finish the re-replication activities - */ - @Test - public void testStopWhileReplicationInProgress() throws Exception { - int numberOfLedgers = 2; - List listOfReplicaIndex = new ArrayList(); - List listOfLedgerHandle = createLedgersAndAddEntries( - numberOfLedgers, 5); - closeLedgers(listOfLedgerHandle); - LedgerHandle handle = listOfLedgerHandle.get(0); - BookieId replicaToKillAddr = handle.getLedgerMetadata().getAllEnsembles().get(0L).get(0); - LOG.info("Killing Bookie:" + replicaToKillAddr); - - // Each ledger, there will be two events : create urLedger and after - // rereplication delete urLedger - CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); - for (int i = 0; i < listOfLedgerHandle.size(); i++) { - final String urLedgerZNode = getUrLedgerZNode(listOfLedgerHandle - .get(i)); - assertNull("UrLedger already exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - int replicaIndexInLedger = getReplicaIndexInLedger( - listOfLedgerHandle.get(i), replicaToKillAddr); - listOfReplicaIndex.add(replicaIndexInLedger); - } - - LOG.info("Killing Bookie :" + replicaToKillAddr); - killBookie(replicaToKillAddr); - - // waiting to publish urLedger znode by Auditor - latch.await(); - - // Again watching the urLedger znode to know the replication status - latch = new CountDownLatch(listOfLedgerHandle.size()); - for (LedgerHandle lh : listOfLedgerHandle) { - String urLedgerZNode = getUrLedgerZNode(lh); - LOG.info("Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); - assertNotNull("UrLedger doesn't exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - } - - // starting the replication service, so that he will be able to act as - // target bookie - startNewBookie(); - int newBookieIndex = lastBookieIndex(); - BookieServer newBookieServer = serverByIndex(newBookieIndex); - - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting to finish the replication of failed bookie : " - + replicaToKillAddr); - } - while (true) { - if (latch.getCount() < numberOfLedgers || latch.getCount() <= 0) { - stopReplicationService(); - LOG.info("Latch Count is:" + latch.getCount()); - break; - } - // grace period to take breath - Thread.sleep(1000); - } - - startReplicationService(); - - LOG.info("Waiting to finish rereplication processes"); - latch.await(); - - // grace period to update the urledger metadata in zookeeper - LOG.info("Waiting to update the urledger metadata in zookeeper"); - - for (int index = 0; index < listOfLedgerHandle.size(); index++) { - verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, - listOfLedgerHandle.get(index), - listOfReplicaIndex.get(index)); - } - } - - /** - * Verify the published urledgers of deleted ledgers(those ledgers where - * deleted after publishing as urledgers by Auditor) should be cleared off - * by the newly selected replica bookie. - */ - @Test - public void testNoSuchLedgerExists() throws Exception { - List listOfLedgerHandle = createLedgersAndAddEntries(2, 5); - CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); - for (LedgerHandle lh : listOfLedgerHandle) { - assertNull("UrLedger already exists!", - watchUrLedgerNode(getUrLedgerZNode(lh), latch)); - } - BookieId replicaToKillAddr = listOfLedgerHandle.get(0) - .getLedgerMetadata().getAllEnsembles() - .get(0L).get(0); - killBookie(replicaToKillAddr); - replicaToKillAddr = listOfLedgerHandle.get(0) - .getLedgerMetadata().getAllEnsembles() - .get(0L).get(0); - killBookie(replicaToKillAddr); - // waiting to publish urLedger znode by Auditor - latch.await(); - - latch = new CountDownLatch(listOfLedgerHandle.size()); - for (LedgerHandle lh : listOfLedgerHandle) { - assertNotNull("UrLedger doesn't exists!", - watchUrLedgerNode(getUrLedgerZNode(lh), latch)); - } - - // delete ledgers - for (LedgerHandle lh : listOfLedgerHandle) { - bkc.deleteLedger(lh.getId()); - } - startNewBookie(); - - // waiting to delete published urledgers, since it doesn't exists - latch.await(); - - for (LedgerHandle lh : listOfLedgerHandle) { - assertNull("UrLedger still exists after rereplication", - watchUrLedgerNode(getUrLedgerZNode(lh), latch)); - } - } - - /** - * Test that if an empty ledger loses the bookie not in the quorum for entry 0, it will - * still be openable when it loses enough bookies to lose a whole quorum. - */ - @Test - public void testEmptyLedgerLosesQuorumEventually() throws Exception { - LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD); - CountDownLatch latch = new CountDownLatch(1); - String urZNode = getUrLedgerZNode(lh); - watchUrLedgerNode(urZNode, latch); - - BookieId replicaToKill = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(2); - LOG.info("Killing last bookie, {}, in ensemble {}", replicaToKill, - lh.getLedgerMetadata().getAllEnsembles().get(0L)); - killBookie(replicaToKill); - startNewBookie(); - - getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs - - assertTrue("Should be marked as underreplicated", latch.await(5, TimeUnit.SECONDS)); - latch = new CountDownLatch(1); - Stat s = watchUrLedgerNode(urZNode, latch); // should be marked as replicated - if (s != null) { - assertTrue("Should be marked as replicated", latch.await(15, TimeUnit.SECONDS)); - } - - replicaToKill = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(1); - LOG.info("Killing second bookie, {}, in ensemble {}", replicaToKill, - lh.getLedgerMetadata().getAllEnsembles().get(0L)); - killBookie(replicaToKill); - - getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs - - assertTrue("Should be marked as underreplicated", latch.await(5, TimeUnit.SECONDS)); - latch = new CountDownLatch(1); - s = watchUrLedgerNode(urZNode, latch); // should be marked as replicated - - startNewBookie(); - getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs - - if (s != null) { - assertTrue("Should be marked as replicated", latch.await(20, TimeUnit.SECONDS)); - } - - // should be able to open ledger without issue - bkc.openLedger(lh.getId(), DigestType.CRC32, PASSWD); - } - - /** - * Test verifies bookie recovery, the host (recorded via ipaddress in - * ledgermetadata). - */ - @Test - public void testLedgerMetadataContainsIpAddressAsBookieID() - throws Exception { - stopBKCluster(); - bkc = new BookKeeperTestClient(baseClientConf); - // start bookie with useHostNameAsBookieID=false, as old bookie - ServerConfiguration serverConf1 = newServerConfiguration(); - // start 2 more bookies with useHostNameAsBookieID=true - ServerConfiguration serverConf2 = newServerConfiguration(); - serverConf2.setUseHostNameAsBookieID(true); - ServerConfiguration serverConf3 = newServerConfiguration(); - serverConf3.setUseHostNameAsBookieID(true); - startAndAddBookie(serverConf1); - startAndAddBookie(serverConf2); - startAndAddBookie(serverConf3); - - List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); - LedgerHandle lh = listOfLedgerHandle.get(0); - int ledgerReplicaIndex = 0; - final SortedMap> ensembles = lh.getLedgerMetadata().getAllEnsembles(); - final List bkAddresses = ensembles.get(0L); - BookieId replicaToKillAddr = bkAddresses.get(0); - for (BookieId bookieSocketAddress : bkAddresses) { - if (!isCreatedFromIp(bookieSocketAddress)) { - replicaToKillAddr = bookieSocketAddress; - LOG.info("Kill bookie which has registered using hostname"); - break; - } - } - - final String urLedgerZNode = getUrLedgerZNode(lh); - ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); - - CountDownLatch latch = new CountDownLatch(1); - assertNull("UrLedger already exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - - LOG.info("Killing Bookie :" + replicaToKillAddr); - killBookie(replicaToKillAddr); - - // waiting to publish urLedger znode by Auditor - latch.await(); - latch = new CountDownLatch(1); - LOG.info("Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); - assertNotNull("UrLedger doesn't exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - - // starting the replication service, so that he will be able to act as - // target bookie - ServerConfiguration serverConf = newServerConfiguration(); - serverConf.setUseHostNameAsBookieID(false); - startAndAddBookie(serverConf); - - int newBookieIndex = lastBookieIndex(); - BookieServer newBookieServer = serverByIndex(newBookieIndex); - - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting to finish the replication of failed bookie : " - + replicaToKillAddr); - } - latch.await(); - - // grace period to update the urledger metadata in zookeeper - LOG.info("Waiting to update the urledger metadata in zookeeper"); - - verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, - listOfLedgerHandle.get(0), ledgerReplicaIndex); - - } - - /** - * Test verifies bookie recovery, the host (recorded via useHostName in - * ledgermetadata). - */ - @Test - public void testLedgerMetadataContainsHostNameAsBookieID() - throws Exception { - stopBKCluster(); - - bkc = new BookKeeperTestClient(baseClientConf); - // start bookie with useHostNameAsBookieID=false, as old bookie - ServerConfiguration serverConf1 = newServerConfiguration(); - // start 2 more bookies with useHostNameAsBookieID=true - ServerConfiguration serverConf2 = newServerConfiguration(); - serverConf2.setUseHostNameAsBookieID(true); - ServerConfiguration serverConf3 = newServerConfiguration(); - serverConf3.setUseHostNameAsBookieID(true); - startAndAddBookie(serverConf1); - startAndAddBookie(serverConf2); - startAndAddBookie(serverConf3); - - List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); - LedgerHandle lh = listOfLedgerHandle.get(0); - int ledgerReplicaIndex = 0; - final SortedMap> ensembles = lh.getLedgerMetadata().getAllEnsembles(); - final List bkAddresses = ensembles.get(0L); - BookieId replicaToKillAddr = bkAddresses.get(0); - for (BookieId bookieSocketAddress : bkAddresses) { - if (isCreatedFromIp(bookieSocketAddress)) { - replicaToKillAddr = bookieSocketAddress; - LOG.info("Kill bookie which has registered using ipaddress"); - break; - } - } - - final String urLedgerZNode = getUrLedgerZNode(lh); - ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); - - CountDownLatch latch = new CountDownLatch(1); - assertNull("UrLedger already exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - - LOG.info("Killing Bookie :" + replicaToKillAddr); - killBookie(replicaToKillAddr); - - // waiting to publish urLedger znode by Auditor - latch.await(); - latch = new CountDownLatch(1); - LOG.info("Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); - assertNotNull("UrLedger doesn't exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - - // creates new bkclient - bkc = new BookKeeperTestClient(baseClientConf); - // starting the replication service, so that he will be able to act as - // target bookie - ServerConfiguration serverConf = newServerConfiguration(); - serverConf.setUseHostNameAsBookieID(true); - startAndAddBookie(serverConf); - - int newBookieIndex = lastBookieIndex(); - BookieServer newBookieServer = serverByIndex(newBookieIndex); +// /** +// * Test verifies publish urLedger by Auditor and replication worker is +// * picking up the entries and finishing the rereplication of closed ledgers. +// */ +// @Test +// public void testClosedLedgers() throws Exception { +// List listOfReplicaIndex = new ArrayList(); +// List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); +// closeLedgers(listOfLedgerHandle); +// LedgerHandle lhandle = listOfLedgerHandle.get(0); +// int ledgerReplicaIndex = 0; +// BookieId replicaToKillAddr = lhandle.getLedgerMetadata().getAllEnsembles().get(0L).get(0); +// +// CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); +// for (LedgerHandle lh : listOfLedgerHandle) { +// ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); +// listOfReplicaIndex.add(ledgerReplicaIndex); +// String urLedgerZNode = getUrLedgerZNode(lh); +// LOG.info("===> Watching on urLedgerPath:" + urLedgerZNode +// + " to know the status of rereplication process"); +// assertNull("UrLedger already exists!", +// watchUrLedgerNode(urLedgerZNode, latch)); +// } +// +// LOG.info("Killing Bookie :" + replicaToKillAddr); +// killBookie(replicaToKillAddr); +// +// // waiting to publish urLedger znode by Auditor +// latch.await(); +// +// // Again watching the urLedger znode to know the replication status +// latch = new CountDownLatch(listOfLedgerHandle.size()); +// for (LedgerHandle lh : listOfLedgerHandle) { +// String urLedgerZNode = getUrLedgerZNode(lh); +// LOG.info("Watching on urLedgerPath:" + urLedgerZNode +// + " to know the status of rereplication process"); +// assertNotNull("UrLedger doesn't exists!", +// watchUrLedgerNode(urLedgerZNode, latch)); +// } +// +// // starting the replication service, so that he will be able to act as +// // target bookie +// startNewBookie(); +// int newBookieIndex = lastBookieIndex(); +// BookieServer newBookieServer = serverByIndex(newBookieIndex); +// +// if (LOG.isDebugEnabled()) { +// LOG.debug("Waiting to finish the replication of failed bookie : " +// + replicaToKillAddr); +// } +// +// // waiting to finish replication +// latch.await(); +// +// // grace period to update the urledger metadata in zookeeper +// LOG.info("Waiting to update the urledger metadata in zookeeper"); +// +// for (int index = 0; index < listOfLedgerHandle.size(); index++) { +// verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, +// listOfLedgerHandle.get(index), +// listOfReplicaIndex.get(index)); +// } +// } - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting to finish the replication of failed bookie : " - + replicaToKillAddr); - } - latch.await(); - - // grace period to update the urledger metadata in zookeeper - LOG.info("Waiting to update the urledger metadata in zookeeper"); - - verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, - listOfLedgerHandle.get(0), ledgerReplicaIndex); - - } private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) { SortedMap> ensembles = lh.getLedgerMetadata().getAllEnsembles(); From 4f0c89eb2412d3a6daba9d8aaa84d0289a52844e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 5 Aug 2025 16:57:52 +0800 Subject: [PATCH 17/27] test CI --- .../replication/BookieAutoRecoveryTest.java | 183 ++++++++++-------- 1 file changed, 101 insertions(+), 82 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 169f7e78ab7..efb156486d0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -226,93 +226,39 @@ public void tearDown() throws Exception { // testOpenLedgers(); // } - /** - * Test verifies publish urLedger by Auditor and replication worker is - * picking up the entries and finishing the rereplication of open ledger. - */ - @Test - public void testOpenLedgers() throws Exception { - List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); - LedgerHandle lh = listOfLedgerHandle.get(0); - int ledgerReplicaIndex = 0; - BookieId replicaToKillAddr = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(0); - - final String urLedgerZNode = getUrLedgerZNode(lh); - ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); - - CountDownLatch latch = new CountDownLatch(1); - assertNull("UrLedger already exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - - LOG.info("Killing Bookie :" + replicaToKillAddr); - killBookie(replicaToKillAddr); - - // waiting to publish urLedger znode by Auditor - latch.await(); - latch = new CountDownLatch(1); - LOG.info("Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); - assertNotNull("UrLedger doesn't exists!", - watchUrLedgerNode(urLedgerZNode, latch)); - - // starting the replication service, so that he will be able to act as - // target bookie - startNewBookie(); - int newBookieIndex = lastBookieIndex(); - BookieServer newBookieServer = serverByIndex(newBookieIndex); - - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting to finish the replication of failed bookie : " - + replicaToKillAddr); - } - latch.await(); - - // grace period to update the urledger metadata in zookeeper - LOG.info("Waiting to update the urledger metadata in zookeeper"); - - verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, - listOfLedgerHandle.get(0), ledgerReplicaIndex); - } - // /** // * Test verifies publish urLedger by Auditor and replication worker is -// * picking up the entries and finishing the rereplication of closed ledgers. +// * picking up the entries and finishing the rereplication of open ledger. // */ // @Test -// public void testClosedLedgers() throws Exception { -// List listOfReplicaIndex = new ArrayList(); +// public void testOpenLedgers() throws Exception { +// LOG.info("===> Testing open ledgers"); // List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); -// closeLedgers(listOfLedgerHandle); -// LedgerHandle lhandle = listOfLedgerHandle.get(0); +// LedgerHandle lh = listOfLedgerHandle.get(0); // int ledgerReplicaIndex = 0; -// BookieId replicaToKillAddr = lhandle.getLedgerMetadata().getAllEnsembles().get(0L).get(0); +// BookieId replicaToKillAddr = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(0); // -// CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); -// for (LedgerHandle lh : listOfLedgerHandle) { -// ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); -// listOfReplicaIndex.add(ledgerReplicaIndex); -// String urLedgerZNode = getUrLedgerZNode(lh); -// LOG.info("===> Watching on urLedgerPath:" + urLedgerZNode -// + " to know the status of rereplication process"); -// assertNull("UrLedger already exists!", -// watchUrLedgerNode(urLedgerZNode, latch)); -// } +// final String urLedgerZNode = getUrLedgerZNode(lh); +// ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); +// +// CountDownLatch latch = new CountDownLatch(1); +// LOG.info("===> 1 Watching on urLedgerPath:" + urLedgerZNode +// + " to know the status of rereplication process"); +// assertNull("UrLedger already exists!", +// watchUrLedgerNode(urLedgerZNode, latch)); // // LOG.info("Killing Bookie :" + replicaToKillAddr); // killBookie(replicaToKillAddr); // // // waiting to publish urLedger znode by Auditor +// LOG.info("===> 2 Watching on urLedgerPath:" + urLedgerZNode +// + " to know the status of rereplication process"); // latch.await(); -// -// // Again watching the urLedger znode to know the replication status -// latch = new CountDownLatch(listOfLedgerHandle.size()); -// for (LedgerHandle lh : listOfLedgerHandle) { -// String urLedgerZNode = getUrLedgerZNode(lh); -// LOG.info("Watching on urLedgerPath:" + urLedgerZNode -// + " to know the status of rereplication process"); -// assertNotNull("UrLedger doesn't exists!", -// watchUrLedgerNode(urLedgerZNode, latch)); -// } +// latch = new CountDownLatch(1); +// LOG.info("===> 3 Watching on urLedgerPath:" + urLedgerZNode +// + " to know the status of rereplication process"); +// assertNotNull("UrLedger doesn't exists!", +// watchUrLedgerNode(urLedgerZNode, latch)); // // // starting the replication service, so that he will be able to act as // // target bookie @@ -324,20 +270,93 @@ public void testOpenLedgers() throws Exception { // LOG.debug("Waiting to finish the replication of failed bookie : " // + replicaToKillAddr); // } -// -// // waiting to finish replication +// LOG.info("===> 4 Watching on urLedgerPath:" + urLedgerZNode +// + " to know the status of rereplication process"); // latch.await(); -// +// LOG.info("===> 5 Watching on urLedgerPath:" + urLedgerZNode +// + " to know the status of rereplication process"); // // grace period to update the urledger metadata in zookeeper // LOG.info("Waiting to update the urledger metadata in zookeeper"); // -// for (int index = 0; index < listOfLedgerHandle.size(); index++) { -// verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, -// listOfLedgerHandle.get(index), -// listOfReplicaIndex.get(index)); -// } +// verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, +// listOfLedgerHandle.get(0), ledgerReplicaIndex); +// LOG.info("===> Finished test open ledgers"); // } + /** + * Test verifies publish urLedger by Auditor and replication worker is + * picking up the entries and finishing the rereplication of closed ledgers. + */ + @Test + public void testClosedLedgers() throws Exception { + LOG.info("===> Testing close ledgers"); + List listOfReplicaIndex = new ArrayList(); + List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); + closeLedgers(listOfLedgerHandle); + LedgerHandle lhandle = listOfLedgerHandle.get(0); + int ledgerReplicaIndex = 0; + BookieId replicaToKillAddr = lhandle.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + + String urLedgerZNode = null; + CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); + for (LedgerHandle lh : listOfLedgerHandle) { + ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); + listOfReplicaIndex.add(ledgerReplicaIndex); + urLedgerZNode = getUrLedgerZNode(lh); + LOG.info("===> Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + assertNull("UrLedger already exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + } + + LOG.info("Killing Bookie :" + replicaToKillAddr); + LOG.info("===> 2 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + killBookie(replicaToKillAddr); + LOG.info("===> 3 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + + // waiting to publish urLedger znode by Auditor + latch.await(); + + // Again watching the urLedger znode to know the replication status + latch = new CountDownLatch(listOfLedgerHandle.size()); + for (LedgerHandle lh : listOfLedgerHandle) { + urLedgerZNode = getUrLedgerZNode(lh); + LOG.info("===> 4 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + assertNotNull("UrLedger doesn't exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + } + + // starting the replication service, so that he will be able to act as + // target bookie + startNewBookie(); + int newBookieIndex = lastBookieIndex(); + BookieServer newBookieServer = serverByIndex(newBookieIndex); + + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting to finish the replication of failed bookie : " + + replicaToKillAddr); + } + + // waiting to finish replication + LOG.info("===> 5 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + latch.await(); + LOG.info("===> 6 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + // grace period to update the urledger metadata in zookeeper + LOG.info("Waiting to update the urledger metadata in zookeeper"); + + for (int index = 0; index < listOfLedgerHandle.size(); index++) { + verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, + listOfLedgerHandle.get(index), + listOfReplicaIndex.get(index)); + } + LOG.info("===> Finished test close ledgers"); + } + private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) { SortedMap> ensembles = lh.getLedgerMetadata().getAllEnsembles(); From 65a22c0376bebad41540f810dd99d8630ce751c4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 5 Aug 2025 17:36:52 +0800 Subject: [PATCH 18/27] test CI --- .../replication/BookieAutoRecoveryTest.java | 112 +++++++++--------- 1 file changed, 56 insertions(+), 56 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index efb156486d0..b8aff7574cd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -226,62 +226,62 @@ public void tearDown() throws Exception { // testOpenLedgers(); // } -// /** -// * Test verifies publish urLedger by Auditor and replication worker is -// * picking up the entries and finishing the rereplication of open ledger. -// */ -// @Test -// public void testOpenLedgers() throws Exception { -// LOG.info("===> Testing open ledgers"); -// List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); -// LedgerHandle lh = listOfLedgerHandle.get(0); -// int ledgerReplicaIndex = 0; -// BookieId replicaToKillAddr = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(0); -// -// final String urLedgerZNode = getUrLedgerZNode(lh); -// ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); -// -// CountDownLatch latch = new CountDownLatch(1); -// LOG.info("===> 1 Watching on urLedgerPath:" + urLedgerZNode -// + " to know the status of rereplication process"); -// assertNull("UrLedger already exists!", -// watchUrLedgerNode(urLedgerZNode, latch)); -// -// LOG.info("Killing Bookie :" + replicaToKillAddr); -// killBookie(replicaToKillAddr); -// -// // waiting to publish urLedger znode by Auditor -// LOG.info("===> 2 Watching on urLedgerPath:" + urLedgerZNode -// + " to know the status of rereplication process"); -// latch.await(); -// latch = new CountDownLatch(1); -// LOG.info("===> 3 Watching on urLedgerPath:" + urLedgerZNode -// + " to know the status of rereplication process"); -// assertNotNull("UrLedger doesn't exists!", -// watchUrLedgerNode(urLedgerZNode, latch)); -// -// // starting the replication service, so that he will be able to act as -// // target bookie -// startNewBookie(); -// int newBookieIndex = lastBookieIndex(); -// BookieServer newBookieServer = serverByIndex(newBookieIndex); -// -// if (LOG.isDebugEnabled()) { -// LOG.debug("Waiting to finish the replication of failed bookie : " -// + replicaToKillAddr); -// } -// LOG.info("===> 4 Watching on urLedgerPath:" + urLedgerZNode -// + " to know the status of rereplication process"); -// latch.await(); -// LOG.info("===> 5 Watching on urLedgerPath:" + urLedgerZNode -// + " to know the status of rereplication process"); -// // grace period to update the urledger metadata in zookeeper -// LOG.info("Waiting to update the urledger metadata in zookeeper"); -// -// verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, -// listOfLedgerHandle.get(0), ledgerReplicaIndex); -// LOG.info("===> Finished test open ledgers"); -// } + /** + * Test verifies publish urLedger by Auditor and replication worker is + * picking up the entries and finishing the rereplication of open ledger. + */ + @Test + public void testOpenLedgers() throws Exception { + LOG.info("===> Testing open ledgers"); + List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); + LedgerHandle lh = listOfLedgerHandle.get(0); + int ledgerReplicaIndex = 0; + BookieId replicaToKillAddr = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + + final String urLedgerZNode = getUrLedgerZNode(lh); + ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); + + CountDownLatch latch = new CountDownLatch(1); + LOG.info("===> 1 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + assertNull("UrLedger already exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + + LOG.info("Killing Bookie :" + replicaToKillAddr); + killBookie(replicaToKillAddr); + + // waiting to publish urLedger znode by Auditor + LOG.info("===> 2 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + latch.await(); + latch = new CountDownLatch(1); + LOG.info("===> 3 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + assertNotNull("UrLedger doesn't exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + + // starting the replication service, so that he will be able to act as + // target bookie + startNewBookie(); + int newBookieIndex = lastBookieIndex(); + BookieServer newBookieServer = serverByIndex(newBookieIndex); + + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting to finish the replication of failed bookie : " + + replicaToKillAddr); + } + LOG.info("===> 4 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + latch.await(); + LOG.info("===> 5 Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + // grace period to update the urledger metadata in zookeeper + LOG.info("Waiting to update the urledger metadata in zookeeper"); + + verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, + listOfLedgerHandle.get(0), ledgerReplicaIndex); + LOG.info("===> Finished test open ledgers"); + } /** * Test verifies publish urLedger by Auditor and replication worker is From f018109e07077e03dc977f6c834a5c51c2fb9994 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 5 Aug 2025 20:10:42 +0800 Subject: [PATCH 19/27] test CI --- .../replication/BookieAutoRecoveryTest.java | 345 +++++++++++++----- 1 file changed, 255 insertions(+), 90 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index b8aff7574cd..85a3e7597ed 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; @@ -30,10 +31,13 @@ import java.util.Optional; import java.util.SortedMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; @@ -136,96 +140,6 @@ public void tearDown() throws Exception { } } -// @Test -// public void testOpenLedgers1() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers1() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testOpenLedgers2() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers2() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testOpenLedgers3() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers3() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testOpenLedgers4() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers4() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testOpenLedgers5() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers5() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testOpenLedgers6() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers6() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testOpenLedgers7() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers7() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testOpenLedgers8() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers8() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testOpenLedgers9() throws Exception { -// testOpenLedgers(); -// } -// -// @Test -// public void testClosedLedgers9() throws Exception { -// testOpenLedgers(); -// } - /** * Test verifies publish urLedger by Auditor and replication worker is * picking up the entries and finishing the rereplication of open ledger. @@ -357,6 +271,257 @@ public void testClosedLedgers() throws Exception { LOG.info("===> Finished test close ledgers"); } + /** + * Verify the published urledgers of deleted ledgers(those ledgers where + * deleted after publishing as urledgers by Auditor) should be cleared off + * by the newly selected replica bookie. + */ + @Test + public void testNoSuchLedgerExists() throws Exception { + LOG.info("===> Starting testNoSuchLedgerExists"); + List listOfLedgerHandle = createLedgersAndAddEntries(2, 5); + CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); + for (LedgerHandle lh : listOfLedgerHandle) { + assertNull("UrLedger already exists!", + watchUrLedgerNode(getUrLedgerZNode(lh), latch)); + } + BookieId replicaToKillAddr = listOfLedgerHandle.get(0) + .getLedgerMetadata().getAllEnsembles() + .get(0L).get(0); + killBookie(replicaToKillAddr); + replicaToKillAddr = listOfLedgerHandle.get(0) + .getLedgerMetadata().getAllEnsembles() + .get(0L).get(0); + killBookie(replicaToKillAddr); + // waiting to publish urLedger znode by Auditor + latch.await(); + + latch = new CountDownLatch(listOfLedgerHandle.size()); + for (LedgerHandle lh : listOfLedgerHandle) { + assertNotNull("UrLedger doesn't exists!", + watchUrLedgerNode(getUrLedgerZNode(lh), latch)); + } + + // delete ledgers + for (LedgerHandle lh : listOfLedgerHandle) { + bkc.deleteLedger(lh.getId()); + } + startNewBookie(); + + // waiting to delete published urledgers, since it doesn't exists + latch.await(); + + for (LedgerHandle lh : listOfLedgerHandle) { + assertNull("UrLedger still exists after rereplication", + watchUrLedgerNode(getUrLedgerZNode(lh), latch)); + } + LOG.info("===> Finished testNoSuchLedgerExists"); + } + + /** + * Test that if an empty ledger loses the bookie not in the quorum for entry 0, it will + * still be openable when it loses enough bookies to lose a whole quorum. + */ + @Test + public void testEmptyLedgerLosesQuorumEventually() throws Exception { + LOG.info("===> Starting testEmptyLedgerLosesQuorumEventually"); + LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD); + CountDownLatch latch = new CountDownLatch(1); + String urZNode = getUrLedgerZNode(lh); + watchUrLedgerNode(urZNode, latch); + + BookieId replicaToKill = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(2); + LOG.info("Killing last bookie, {}, in ensemble {}", replicaToKill, + lh.getLedgerMetadata().getAllEnsembles().get(0L)); + killBookie(replicaToKill); + startNewBookie(); + + getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs + + assertTrue("Should be marked as underreplicated", latch.await(5, TimeUnit.SECONDS)); + latch = new CountDownLatch(1); + Stat s = watchUrLedgerNode(urZNode, latch); // should be marked as replicated + if (s != null) { + assertTrue("Should be marked as replicated", latch.await(15, TimeUnit.SECONDS)); + } + + replicaToKill = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(1); + LOG.info("Killing second bookie, {}, in ensemble {}", replicaToKill, + lh.getLedgerMetadata().getAllEnsembles().get(0L)); + killBookie(replicaToKill); + + getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs + + assertTrue("Should be marked as underreplicated", latch.await(5, TimeUnit.SECONDS)); + latch = new CountDownLatch(1); + s = watchUrLedgerNode(urZNode, latch); // should be marked as replicated + + startNewBookie(); + getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get(); // ensure auditor runs + + if (s != null) { + assertTrue("Should be marked as replicated", latch.await(20, TimeUnit.SECONDS)); + } + + // should be able to open ledger without issue + bkc.openLedger(lh.getId(), DigestType.CRC32, PASSWD); + LOG.info("===> Finished testEmptyLedgerLosesQuorumEventually"); + } + + /** + * Test verifies bookie recovery, the host (recorded via ipaddress in + * ledgermetadata). + */ + @Test + public void testLedgerMetadataContainsIpAddressAsBookieID() + throws Exception { + LOG.info("===> Starting testLedgerMetadataContainsIpAddressAsBookieID"); + stopBKCluster(); + bkc = new BookKeeperTestClient(baseClientConf); + // start bookie with useHostNameAsBookieID=false, as old bookie + ServerConfiguration serverConf1 = newServerConfiguration(); + // start 2 more bookies with useHostNameAsBookieID=true + ServerConfiguration serverConf2 = newServerConfiguration(); + serverConf2.setUseHostNameAsBookieID(true); + ServerConfiguration serverConf3 = newServerConfiguration(); + serverConf3.setUseHostNameAsBookieID(true); + startAndAddBookie(serverConf1); + startAndAddBookie(serverConf2); + startAndAddBookie(serverConf3); + + List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); + LedgerHandle lh = listOfLedgerHandle.get(0); + int ledgerReplicaIndex = 0; + final SortedMap> ensembles = lh.getLedgerMetadata().getAllEnsembles(); + final List bkAddresses = ensembles.get(0L); + BookieId replicaToKillAddr = bkAddresses.get(0); + for (BookieId bookieSocketAddress : bkAddresses) { + if (!isCreatedFromIp(bookieSocketAddress)) { + replicaToKillAddr = bookieSocketAddress; + LOG.info("Kill bookie which has registered using hostname"); + break; + } + } + + final String urLedgerZNode = getUrLedgerZNode(lh); + ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); + + CountDownLatch latch = new CountDownLatch(1); + assertNull("UrLedger already exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + + LOG.info("Killing Bookie :" + replicaToKillAddr); + killBookie(replicaToKillAddr); + + // waiting to publish urLedger znode by Auditor + latch.await(); + latch = new CountDownLatch(1); + LOG.info("Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + assertNotNull("UrLedger doesn't exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + + // starting the replication service, so that he will be able to act as + // target bookie + ServerConfiguration serverConf = newServerConfiguration(); + serverConf.setUseHostNameAsBookieID(false); + startAndAddBookie(serverConf); + + int newBookieIndex = lastBookieIndex(); + BookieServer newBookieServer = serverByIndex(newBookieIndex); + + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting to finish the replication of failed bookie : " + + replicaToKillAddr); + } + latch.await(); + + // grace period to update the urledger metadata in zookeeper + LOG.info("Waiting to update the urledger metadata in zookeeper"); + + verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, + listOfLedgerHandle.get(0), ledgerReplicaIndex); + LOG.info("===> Finished testLedgerMetadataContainsIpAddressAsBookieID"); + } + + /** + * Test verifies bookie recovery, the host (recorded via useHostName in + * ledgermetadata). + */ + @Test + public void testLedgerMetadataContainsHostNameAsBookieID() + throws Exception { + LOG.info("===> Starting testLedgerMetadataContainsHostNameAsBookieID"); + stopBKCluster(); + + bkc = new BookKeeperTestClient(baseClientConf); + // start bookie with useHostNameAsBookieID=false, as old bookie + ServerConfiguration serverConf1 = newServerConfiguration(); + // start 2 more bookies with useHostNameAsBookieID=true + ServerConfiguration serverConf2 = newServerConfiguration(); + serverConf2.setUseHostNameAsBookieID(true); + ServerConfiguration serverConf3 = newServerConfiguration(); + serverConf3.setUseHostNameAsBookieID(true); + startAndAddBookie(serverConf1); + startAndAddBookie(serverConf2); + startAndAddBookie(serverConf3); + + List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); + LedgerHandle lh = listOfLedgerHandle.get(0); + int ledgerReplicaIndex = 0; + final SortedMap> ensembles = lh.getLedgerMetadata().getAllEnsembles(); + final List bkAddresses = ensembles.get(0L); + BookieId replicaToKillAddr = bkAddresses.get(0); + for (BookieId bookieSocketAddress : bkAddresses) { + if (isCreatedFromIp(bookieSocketAddress)) { + replicaToKillAddr = bookieSocketAddress; + LOG.info("Kill bookie which has registered using ipaddress"); + break; + } + } + + final String urLedgerZNode = getUrLedgerZNode(lh); + ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); + + CountDownLatch latch = new CountDownLatch(1); + assertNull("UrLedger already exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + + LOG.info("Killing Bookie :" + replicaToKillAddr); + killBookie(replicaToKillAddr); + + // waiting to publish urLedger znode by Auditor + latch.await(); + latch = new CountDownLatch(1); + LOG.info("Watching on urLedgerPath:" + urLedgerZNode + + " to know the status of rereplication process"); + assertNotNull("UrLedger doesn't exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + + // creates new bkclient + bkc = new BookKeeperTestClient(baseClientConf); + // starting the replication service, so that he will be able to act as + // target bookie + ServerConfiguration serverConf = newServerConfiguration(); + serverConf.setUseHostNameAsBookieID(true); + startAndAddBookie(serverConf); + + int newBookieIndex = lastBookieIndex(); + BookieServer newBookieServer = serverByIndex(newBookieIndex); + + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting to finish the replication of failed bookie : " + + replicaToKillAddr); + } + latch.await(); + + // grace period to update the urledger metadata in zookeeper + LOG.info("Waiting to update the urledger metadata in zookeeper"); + + verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, + listOfLedgerHandle.get(0), ledgerReplicaIndex); + LOG.info("===> Finished testLedgerMetadataContainsHostNameAsBookieID"); + } private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) { SortedMap> ensembles = lh.getLedgerMetadata().getAllEnsembles(); From 3c01d1a2ee04daecd46968edd32042fa983aee5c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 5 Aug 2025 21:52:17 +0800 Subject: [PATCH 20/27] test CI --- .../org/apache/bookkeeper/replication/ReplicationWorker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 19248631bc1..2b545af0050 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -760,7 +760,7 @@ public void run() { * Stop the replication worker service. */ public void shutdown() { - LOG.info("Shutting down replication worker"); + LOG.warn("Shutting down replication worker", new Exception("test CI")); synchronized (this) { if (!workerRunning) { @@ -768,7 +768,7 @@ public void shutdown() { } workerRunning = false; } - LOG.info("Shutting down ReplicationWorker"); + LOG.warn("Shutting down ReplicationWorker", new Exception("test CI")); this.pendingReplicationTimer.cancel(); try { this.workerThread.interrupt(); From 5407e1b9556eaaba00b314b95e1b8206453cae7b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 5 Aug 2025 23:18:01 +0800 Subject: [PATCH 21/27] test CI --- .../java/org/apache/bookkeeper/bookie/BookieStateManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java index 38f8d026cae..d2c01afe8fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java @@ -267,6 +267,7 @@ public boolean isShuttingDown(){ @Override public void close() { + LOG.warn("Closing BookieStateManager", new Exception("test CI")); this.running = false; stateService.shutdown(); } From 39178f6680386a2014bf1b76a9e16258a3ce7dc1 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Aug 2025 08:08:23 +0800 Subject: [PATCH 22/27] test CI --- .../replication/BookieAutoRecoveryTest.java | 130 +++++++++++++----- 1 file changed, 99 insertions(+), 31 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 85a3e7597ed..7ce1b3858da 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -96,6 +96,7 @@ public BookieAutoRecoveryTest() throws IOException, KeeperException, @Override public void setUp() throws Exception { + LOG.info("===> Start setUp"); super.setUp(); baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); @@ -117,10 +118,12 @@ public void setUp() throws Exception { mFactory = metadataClientDriver.getLedgerManagerFactory(); underReplicationManager = mFactory.newLedgerUnderreplicationManager(); ledgerManager = mFactory.newLedgerManager(); + LOG.info("===> Finished setUp"); } @Override public void tearDown() throws Exception { + LOG.info("===> Start tearDown"); super.tearDown(); if (null != underReplicationManager) { @@ -138,6 +141,7 @@ public void tearDown() throws Exception { if (null != scheduler) { scheduler.shutdown(); } + LOG.info("===> Finished tearDown"); } /** @@ -146,7 +150,7 @@ public void tearDown() throws Exception { */ @Test public void testOpenLedgers() throws Exception { - LOG.info("===> Testing open ledgers"); + LOG.info("===> Start testOpenLedgers"); List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); LedgerHandle lh = listOfLedgerHandle.get(0); int ledgerReplicaIndex = 0; @@ -156,8 +160,6 @@ public void testOpenLedgers() throws Exception { ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); CountDownLatch latch = new CountDownLatch(1); - LOG.info("===> 1 Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); assertNull("UrLedger already exists!", watchUrLedgerNode(urLedgerZNode, latch)); @@ -165,11 +167,9 @@ public void testOpenLedgers() throws Exception { killBookie(replicaToKillAddr); // waiting to publish urLedger znode by Auditor - LOG.info("===> 2 Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); latch.await(); latch = new CountDownLatch(1); - LOG.info("===> 3 Watching on urLedgerPath:" + urLedgerZNode + LOG.info("Watching on urLedgerPath:" + urLedgerZNode + " to know the status of rereplication process"); assertNotNull("UrLedger doesn't exists!", watchUrLedgerNode(urLedgerZNode, latch)); @@ -184,17 +184,14 @@ public void testOpenLedgers() throws Exception { LOG.debug("Waiting to finish the replication of failed bookie : " + replicaToKillAddr); } - LOG.info("===> 4 Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); latch.await(); - LOG.info("===> 5 Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); + // grace period to update the urledger metadata in zookeeper LOG.info("Waiting to update the urledger metadata in zookeeper"); verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex); - LOG.info("===> Finished test open ledgers"); + LOG.info("===> Finished testOpenLedgers"); } /** @@ -203,7 +200,7 @@ public void testOpenLedgers() throws Exception { */ @Test public void testClosedLedgers() throws Exception { - LOG.info("===> Testing close ledgers"); + LOG.info("===> Start testClosedLedgers"); List listOfReplicaIndex = new ArrayList(); List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); closeLedgers(listOfLedgerHandle); @@ -211,24 +208,87 @@ public void testClosedLedgers() throws Exception { int ledgerReplicaIndex = 0; BookieId replicaToKillAddr = lhandle.getLedgerMetadata().getAllEnsembles().get(0L).get(0); - String urLedgerZNode = null; CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); for (LedgerHandle lh : listOfLedgerHandle) { ledgerReplicaIndex = getReplicaIndexInLedger(lh, replicaToKillAddr); listOfReplicaIndex.add(ledgerReplicaIndex); - urLedgerZNode = getUrLedgerZNode(lh); - LOG.info("===> Watching on urLedgerPath:" + urLedgerZNode + assertNull("UrLedger already exists!", + watchUrLedgerNode(getUrLedgerZNode(lh), latch)); + } + + LOG.info("Killing Bookie :" + replicaToKillAddr); + killBookie(replicaToKillAddr); + + // waiting to publish urLedger znode by Auditor + latch.await(); + + // Again watching the urLedger znode to know the replication status + latch = new CountDownLatch(listOfLedgerHandle.size()); + for (LedgerHandle lh : listOfLedgerHandle) { + String urLedgerZNode = getUrLedgerZNode(lh); + LOG.info("Watching on urLedgerPath:" + urLedgerZNode + " to know the status of rereplication process"); + assertNotNull("UrLedger doesn't exists!", + watchUrLedgerNode(urLedgerZNode, latch)); + } + + // starting the replication service, so that he will be able to act as + // target bookie + startNewBookie(); + int newBookieIndex = lastBookieIndex(); + BookieServer newBookieServer = serverByIndex(newBookieIndex); + + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting to finish the replication of failed bookie : " + + replicaToKillAddr); + } + + // waiting to finish replication + latch.await(); + + // grace period to update the urledger metadata in zookeeper + LOG.info("Waiting to update the urledger metadata in zookeeper"); + + for (int index = 0; index < listOfLedgerHandle.size(); index++) { + verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, + listOfLedgerHandle.get(index), + listOfReplicaIndex.get(index)); + } + LOG.info("===> Finished testClosedLedgers"); + } + + /** + * Test stopping replica service while replication in progress. Considering + * when there is an exception will shutdown Auditor and RW processes. After + * restarting should be able to finish the re-replication activities + */ + @Test + public void testStopWhileReplicationInProgress() throws Exception { + LOG.info("===> Start testStopWhileReplicationInProgress"); + int numberOfLedgers = 2; + List listOfReplicaIndex = new ArrayList(); + List listOfLedgerHandle = createLedgersAndAddEntries( + numberOfLedgers, 5); + closeLedgers(listOfLedgerHandle); + LedgerHandle handle = listOfLedgerHandle.get(0); + BookieId replicaToKillAddr = handle.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + LOG.info("Killing Bookie:" + replicaToKillAddr); + + // Each ledger, there will be two events : create urLedger and after + // rereplication delete urLedger + CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); + for (int i = 0; i < listOfLedgerHandle.size(); i++) { + final String urLedgerZNode = getUrLedgerZNode(listOfLedgerHandle + .get(i)); assertNull("UrLedger already exists!", watchUrLedgerNode(urLedgerZNode, latch)); + int replicaIndexInLedger = getReplicaIndexInLedger( + listOfLedgerHandle.get(i), replicaToKillAddr); + listOfReplicaIndex.add(replicaIndexInLedger); } LOG.info("Killing Bookie :" + replicaToKillAddr); - LOG.info("===> 2 Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); killBookie(replicaToKillAddr); - LOG.info("===> 3 Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); // waiting to publish urLedger znode by Auditor latch.await(); @@ -236,8 +296,8 @@ public void testClosedLedgers() throws Exception { // Again watching the urLedger znode to know the replication status latch = new CountDownLatch(listOfLedgerHandle.size()); for (LedgerHandle lh : listOfLedgerHandle) { - urLedgerZNode = getUrLedgerZNode(lh); - LOG.info("===> 4 Watching on urLedgerPath:" + urLedgerZNode + String urLedgerZNode = getUrLedgerZNode(lh); + LOG.info("Watching on urLedgerPath:" + urLedgerZNode + " to know the status of rereplication process"); assertNotNull("UrLedger doesn't exists!", watchUrLedgerNode(urLedgerZNode, latch)); @@ -253,13 +313,21 @@ public void testClosedLedgers() throws Exception { LOG.debug("Waiting to finish the replication of failed bookie : " + replicaToKillAddr); } + while (true) { + if (latch.getCount() < numberOfLedgers || latch.getCount() <= 0) { + stopReplicationService(); + LOG.info("Latch Count is:" + latch.getCount()); + break; + } + // grace period to take breath + Thread.sleep(1000); + } - // waiting to finish replication - LOG.info("===> 5 Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); + startReplicationService(); + + LOG.info("Waiting to finish rereplication processes"); latch.await(); - LOG.info("===> 6 Watching on urLedgerPath:" + urLedgerZNode - + " to know the status of rereplication process"); + // grace period to update the urledger metadata in zookeeper LOG.info("Waiting to update the urledger metadata in zookeeper"); @@ -268,7 +336,7 @@ public void testClosedLedgers() throws Exception { listOfLedgerHandle.get(index), listOfReplicaIndex.get(index)); } - LOG.info("===> Finished test close ledgers"); + LOG.info("===> Finished testStopWhileReplicationInProgress"); } /** @@ -278,7 +346,7 @@ public void testClosedLedgers() throws Exception { */ @Test public void testNoSuchLedgerExists() throws Exception { - LOG.info("===> Starting testNoSuchLedgerExists"); + LOG.info("===> Start testNoSuchLedgerExists"); List listOfLedgerHandle = createLedgersAndAddEntries(2, 5); CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); for (LedgerHandle lh : listOfLedgerHandle) { @@ -324,7 +392,7 @@ public void testNoSuchLedgerExists() throws Exception { */ @Test public void testEmptyLedgerLosesQuorumEventually() throws Exception { - LOG.info("===> Starting testEmptyLedgerLosesQuorumEventually"); + LOG.info("===> Start testEmptyLedgerLosesQuorumEventually"); LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD); CountDownLatch latch = new CountDownLatch(1); String urZNode = getUrLedgerZNode(lh); @@ -375,7 +443,7 @@ public void testEmptyLedgerLosesQuorumEventually() throws Exception { @Test public void testLedgerMetadataContainsIpAddressAsBookieID() throws Exception { - LOG.info("===> Starting testLedgerMetadataContainsIpAddressAsBookieID"); + LOG.info("===> Start testLedgerMetadataContainsIpAddressAsBookieID"); stopBKCluster(); bkc = new BookKeeperTestClient(baseClientConf); // start bookie with useHostNameAsBookieID=false, as old bookie @@ -451,7 +519,7 @@ public void testLedgerMetadataContainsIpAddressAsBookieID() @Test public void testLedgerMetadataContainsHostNameAsBookieID() throws Exception { - LOG.info("===> Starting testLedgerMetadataContainsHostNameAsBookieID"); + LOG.info("===> Start testLedgerMetadataContainsHostNameAsBookieID"); stopBKCluster(); bkc = new BookKeeperTestClient(baseClientConf); From 32bc5baec0c07b90e8c4eacfd94efb84b12c422c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Aug 2025 12:15:32 +0800 Subject: [PATCH 23/27] remove logs for CI --- .../bookkeeper/bookie/BookieStateManager.java | 1 - .../replication/ReplicationWorker.java | 4 +-- .../replication/BookieAutoRecoveryTest.java | 36 +++++++++---------- 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java index d2c01afe8fa..38f8d026cae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java @@ -267,7 +267,6 @@ public boolean isShuttingDown(){ @Override public void close() { - LOG.warn("Closing BookieStateManager", new Exception("test CI")); this.running = false; stateService.shutdown(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 2b545af0050..19248631bc1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -760,7 +760,7 @@ public void run() { * Stop the replication worker service. */ public void shutdown() { - LOG.warn("Shutting down replication worker", new Exception("test CI")); + LOG.info("Shutting down replication worker"); synchronized (this) { if (!workerRunning) { @@ -768,7 +768,7 @@ public void shutdown() { } workerRunning = false; } - LOG.warn("Shutting down ReplicationWorker", new Exception("test CI")); + LOG.info("Shutting down ReplicationWorker"); this.pendingReplicationTimer.cancel(); try { this.workerThread.interrupt(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index 7ce1b3858da..d0ea91261a9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -96,7 +96,7 @@ public BookieAutoRecoveryTest() throws IOException, KeeperException, @Override public void setUp() throws Exception { - LOG.info("===> Start setUp"); + LOG.info("Start setUp"); super.setUp(); baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); @@ -118,12 +118,12 @@ public void setUp() throws Exception { mFactory = metadataClientDriver.getLedgerManagerFactory(); underReplicationManager = mFactory.newLedgerUnderreplicationManager(); ledgerManager = mFactory.newLedgerManager(); - LOG.info("===> Finished setUp"); + LOG.info("Finished setUp"); } @Override public void tearDown() throws Exception { - LOG.info("===> Start tearDown"); + LOG.info("Start tearDown"); super.tearDown(); if (null != underReplicationManager) { @@ -141,7 +141,7 @@ public void tearDown() throws Exception { if (null != scheduler) { scheduler.shutdown(); } - LOG.info("===> Finished tearDown"); + LOG.info("Finished tearDown"); } /** @@ -150,7 +150,7 @@ public void tearDown() throws Exception { */ @Test public void testOpenLedgers() throws Exception { - LOG.info("===> Start testOpenLedgers"); + LOG.info("Start testOpenLedgers"); List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); LedgerHandle lh = listOfLedgerHandle.get(0); int ledgerReplicaIndex = 0; @@ -191,7 +191,7 @@ public void testOpenLedgers() throws Exception { verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex); - LOG.info("===> Finished testOpenLedgers"); + LOG.info("Finished testOpenLedgers"); } /** @@ -200,7 +200,7 @@ public void testOpenLedgers() throws Exception { */ @Test public void testClosedLedgers() throws Exception { - LOG.info("===> Start testClosedLedgers"); + LOG.info("Start testClosedLedgers"); List listOfReplicaIndex = new ArrayList(); List listOfLedgerHandle = createLedgersAndAddEntries(1, 5); closeLedgers(listOfLedgerHandle); @@ -254,7 +254,7 @@ public void testClosedLedgers() throws Exception { listOfLedgerHandle.get(index), listOfReplicaIndex.get(index)); } - LOG.info("===> Finished testClosedLedgers"); + LOG.info("Finished testClosedLedgers"); } /** @@ -264,7 +264,7 @@ public void testClosedLedgers() throws Exception { */ @Test public void testStopWhileReplicationInProgress() throws Exception { - LOG.info("===> Start testStopWhileReplicationInProgress"); + LOG.info("Start testStopWhileReplicationInProgress"); int numberOfLedgers = 2; List listOfReplicaIndex = new ArrayList(); List listOfLedgerHandle = createLedgersAndAddEntries( @@ -336,7 +336,7 @@ public void testStopWhileReplicationInProgress() throws Exception { listOfLedgerHandle.get(index), listOfReplicaIndex.get(index)); } - LOG.info("===> Finished testStopWhileReplicationInProgress"); + LOG.info("Finished testStopWhileReplicationInProgress"); } /** @@ -346,7 +346,7 @@ public void testStopWhileReplicationInProgress() throws Exception { */ @Test public void testNoSuchLedgerExists() throws Exception { - LOG.info("===> Start testNoSuchLedgerExists"); + LOG.info("Start testNoSuchLedgerExists"); List listOfLedgerHandle = createLedgersAndAddEntries(2, 5); CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); for (LedgerHandle lh : listOfLedgerHandle) { @@ -383,7 +383,7 @@ public void testNoSuchLedgerExists() throws Exception { assertNull("UrLedger still exists after rereplication", watchUrLedgerNode(getUrLedgerZNode(lh), latch)); } - LOG.info("===> Finished testNoSuchLedgerExists"); + LOG.info("Finished testNoSuchLedgerExists"); } /** @@ -392,7 +392,7 @@ public void testNoSuchLedgerExists() throws Exception { */ @Test public void testEmptyLedgerLosesQuorumEventually() throws Exception { - LOG.info("===> Start testEmptyLedgerLosesQuorumEventually"); + LOG.info("Start testEmptyLedgerLosesQuorumEventually"); LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD); CountDownLatch latch = new CountDownLatch(1); String urZNode = getUrLedgerZNode(lh); @@ -433,7 +433,7 @@ public void testEmptyLedgerLosesQuorumEventually() throws Exception { // should be able to open ledger without issue bkc.openLedger(lh.getId(), DigestType.CRC32, PASSWD); - LOG.info("===> Finished testEmptyLedgerLosesQuorumEventually"); + LOG.info("Finished testEmptyLedgerLosesQuorumEventually"); } /** @@ -443,7 +443,7 @@ public void testEmptyLedgerLosesQuorumEventually() throws Exception { @Test public void testLedgerMetadataContainsIpAddressAsBookieID() throws Exception { - LOG.info("===> Start testLedgerMetadataContainsIpAddressAsBookieID"); + LOG.info("Start testLedgerMetadataContainsIpAddressAsBookieID"); stopBKCluster(); bkc = new BookKeeperTestClient(baseClientConf); // start bookie with useHostNameAsBookieID=false, as old bookie @@ -509,7 +509,7 @@ public void testLedgerMetadataContainsIpAddressAsBookieID() verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex); - LOG.info("===> Finished testLedgerMetadataContainsIpAddressAsBookieID"); + LOG.info("Finished testLedgerMetadataContainsIpAddressAsBookieID"); } /** @@ -519,7 +519,7 @@ public void testLedgerMetadataContainsIpAddressAsBookieID() @Test public void testLedgerMetadataContainsHostNameAsBookieID() throws Exception { - LOG.info("===> Start testLedgerMetadataContainsHostNameAsBookieID"); + LOG.info("Start testLedgerMetadataContainsHostNameAsBookieID"); stopBKCluster(); bkc = new BookKeeperTestClient(baseClientConf); @@ -588,7 +588,7 @@ public void testLedgerMetadataContainsHostNameAsBookieID() verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex); - LOG.info("===> Finished testLedgerMetadataContainsHostNameAsBookieID"); + LOG.info("Finished testLedgerMetadataContainsHostNameAsBookieID"); } private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) { From 3c6c49a611e9dafeda5667f8df0b943ea5583417 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Aug 2025 15:54:03 +0800 Subject: [PATCH 24/27] test CI --- .../java/org/apache/bookkeeper/bookie/BookieStateManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java index 38f8d026cae..d2c01afe8fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java @@ -267,6 +267,7 @@ public boolean isShuttingDown(){ @Override public void close() { + LOG.warn("Closing BookieStateManager", new Exception("test CI")); this.running = false; stateService.shutdown(); } From 229889e648ea06594ec155bb9952ef2d3bf208f0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Aug 2025 18:01:09 +0800 Subject: [PATCH 25/27] remove logs for CI --- .../java/org/apache/bookkeeper/bookie/BookieStateManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java index d2c01afe8fa..38f8d026cae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java @@ -267,7 +267,6 @@ public boolean isShuttingDown(){ @Override public void close() { - LOG.warn("Closing BookieStateManager", new Exception("test CI")); this.running = false; stateService.shutdown(); } From dcc1861971b09d9320e02a99ba9f112fa6da4885 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Sep 2025 12:33:48 +0800 Subject: [PATCH 26/27] address comment --- .../java/org/apache/bookkeeper/client/LedgerOpenOp.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index a4b849bf6de..52c8190e1f2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -195,7 +195,11 @@ private void openWithMetadata(Versioned versionedMetadata) { } // get the ledger metadata back - final boolean watchImmediately = !doRecovery || metadata.isClosed(); + // The cases that need to register listener immediately are: + // 1. The ledger is not in recovery opening, which is the original case. + // 2. The ledger is closed and need to keep update metadata. There is other cases that do not need to + // register listener. e.g. The ledger is opening by Auto-Recovery component. + final boolean watchImmediately = !doRecovery || (keepUpdateMetadata && metadata.isClosed()); try { // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may // rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an From b1b453c3d08548adb8a0db01c6b180062aa6fe95 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 9 Sep 2025 20:28:21 +0800 Subject: [PATCH 27/27] fix test --- .../bookkeeper/replication/FullEnsembleDecommissionedTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java index 4a31b617e58..0b5c5a8e1c1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java @@ -138,7 +138,7 @@ public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Excepti lh.addEntry(data); lh.close(); List originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); - LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, false); + LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, true); assertTrue(originalEnsemble.size() == 2); startNewBookie();