diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c120b9fa719ec..2763f130290d7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2257,37 +2257,51 @@ public void asyncMarkDelete(final Position position, Map propertie log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position); } + // Snapshot all positions into local variables to avoid race condition. Position newPosition = ackBatchPosition(position); + Position moveForwardPosition = newPosition; Position markDeletePos = markDeletePosition; Position lastConfirmedEntry = ledger.getLastConfirmedEntry(); - if (lastConfirmedEntry.compareTo(newPosition) < 0) { - boolean shouldCursorMoveForward = false; - try { - long ledgerEntries = ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries(); + boolean shouldCursorMoveForward = false; + try { + if (lastConfirmedEntry.getLedgerId() >= newPosition.getLedgerId()) { + LedgerInfo curMarkDeleteLedgerInfo = ledger.getLedgerInfo(newPosition.getLedgerId()).get(); + Long nextValidLedger = ledger.getNextValidLedger(newPosition.getLedgerId()); + shouldCursorMoveForward = (nextValidLedger != null) + && (curMarkDeleteLedgerInfo != null + && newPosition.getEntryId() + 1 >= curMarkDeleteLedgerInfo.getEntries()); + if (shouldCursorMoveForward) { + moveForwardPosition = PositionFactory.create(nextValidLedger, -1); + } + } else { Long nextValidLedger = ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId()); - shouldCursorMoveForward = nextValidLedger != null - && (markDeletePos.getEntryId() + 1 >= ledgerEntries) - && (newPosition.getLedgerId() == nextValidLedger); - } catch (Exception e) { - log.warn("Failed to get ledger entries while setting mark-delete-position", e); + shouldCursorMoveForward = (nextValidLedger != null) + && (newPosition.getLedgerId() == nextValidLedger) + && (newPosition.getEntryId() == -1); } + } catch (Exception e) { + log.warn("Failed to get ledger entries while setting mark-delete-position", e); + } - if (shouldCursorMoveForward) { - log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed", - ledger.getName(), markDeletePos, newPosition); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}" - + " for cursor [{}]", ledger.getName(), position, lastConfirmedEntry, name); - } - callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); - return; + if (shouldCursorMoveForward) { + log.info("[{}] move cursor {} mark-delete-position from {} to {} since all the entries have been consumed," + + " last-confirmed-entry: {}, attempt position: {}, ack-batch position: {}", ledger.getName(), + name, markDeletePos, moveForwardPosition, lastConfirmedEntry, position, newPosition); + } else if (lastConfirmedEntry.compareTo(newPosition) < 0) { + // If newPosition>lastConfirmedEntry and current ledger entries are not fully consumed, + // in which case the newPosition is an invalid mark delete position. + if (log.isDebugEnabled()) { + log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}" + + " for cursor [{}], attempt position: {}", ledger.getName(), newPosition, lastConfirmedEntry, + name, position); } + callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); + return; } lock.writeLock().lock(); try { - newPosition = setAcknowledgedPosition(newPosition); + moveForwardPosition = setAcknowledgedPosition(moveForwardPosition); } catch (IllegalArgumentException e) { callback.markDeleteFailed(getManagedLedgerException(e), ctx); return; @@ -2298,11 +2312,11 @@ public void asyncMarkDelete(final Position position, Map propertie // Apply rate limiting to mark-delete operations if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { isDirty = true; - updateLastMarkDeleteEntryToLatest(newPosition, properties); + updateLastMarkDeleteEntryToLatest(moveForwardPosition, properties); callback.markDeleteComplete(ctx); return; } - internalAsyncMarkDelete(newPosition, properties, callback, ctx, null); + internalAsyncMarkDelete(moveForwardPosition, properties, callback, ctx, null); } private Position ackBatchPosition(Position position) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index bfb9b6ecca1b3..c4e694a7f12e5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -37,6 +37,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Range; @@ -456,7 +457,7 @@ void testSwitchLedgerFailed() throws Exception { ml.delete(); } - @Test + @Test(timeOut = 20000) void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception { final int entryCount = 10; final String cursorName = "c1"; @@ -496,9 +497,7 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception { // Assert persist mark deleted position to ZK was successful. Position slowestReadPosition = ml.getCursors().getSlowestCursorPosition(); - assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId()); - assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId()); - assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry); + assertThat(slowestReadPosition).isGreaterThanOrEqualTo(lastEntry); assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); @@ -1837,7 +1836,7 @@ void errorCreatingCursor() throws Exception { } } - @Test + @Test(timeOut = 20000) void failDuringRecoveryWithEmptyLedger() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); ManagedCursor cursor = ledger.openCursor("cursor"); @@ -1853,6 +1852,7 @@ void failDuringRecoveryWithEmptyLedger() throws Exception { // Re-open ledger = factory.open("my_test_ledger"); cursor = ledger.openCursor("cursor"); + // Move markDeletePosition to nextLedger:-1 since current ledger entries are consumed. cursor.markDelete(p3); // Force-reopen so the recovery will be forced to read from ledger @@ -1861,11 +1861,15 @@ void failDuringRecoveryWithEmptyLedger() throws Exception { @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf); - ledger = factory2.open("my_test_ledger"); - cursor = ledger.openCursor("cursor"); + ManagedLedger newLedger = factory2.open("my_test_ledger"); + cursor = newLedger.openCursor("cursor"); - // Cursor was rolled back to p2 because of the ledger recovery failure - assertEquals(cursor.getMarkDeletedPosition(), p2); + // Previous empty ledger would be deleted, and create a new ledger. + assertThat(newLedger.getLedgersInfo().size()).isEqualTo(1); + + // Cursor move to newestEmptyLedgerId:-1 since all entries are consumed. + assertThat(cursor.getMarkDeletedPosition()).isEqualByComparingTo( + PositionFactory.create(newLedger.getLedgersInfo().lastKey(), -1)); } @Test(timeOut = 20000) @@ -2099,8 +2103,8 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception { c1.skipEntries(1, IndividualDeletedEntries.Exclude); assertEquals(c1.getReadPosition(), pos); - pos = ledger.addEntry("dummy-entry-1".getBytes(Encoding)); - pos = ledger.addEntry("dummy-entry-2".getBytes(Encoding)); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-2".getBytes(Encoding)); // Wait new empty ledger created completely. Awaitility.await().untilAsserted(() -> { @@ -2115,13 +2119,17 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception { c1.skipEntries(1, IndividualDeletedEntries.Exclude); assertEquals(c1.getNumberOfEntries(), 0); assertEquals(c1.getReadPosition(), PositionFactory.create(ledger.currentLedger.getId(), 0)); - assertEquals(c1.getMarkDeletedPosition(), pos); + assertThat(c1.getMarkDeletedPosition()).isEqualByComparingTo( + PositionFactory.create(ledger.ledgers.lastKey(), -1)); // skip entries across ledgers for (int i = 0; i < 6; i++) { - pos = ledger.addEntry("dummy-entry".getBytes(Encoding)); + ledger.addEntry("dummy-entry".getBytes(Encoding)); } + // Wait new empty ledger created completely. + Awaitility.await().untilAsserted(() -> assertThat(ledger.getLedgersInfo().size()).isEqualTo(4)); + c1.skipEntries(5, IndividualDeletedEntries.Exclude); assertEquals(c1.getNumberOfEntries(), 1); @@ -2133,7 +2141,8 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception { Awaitility.await().untilAsserted(() -> { assertEquals(c1.getReadPosition().getEntryId(), 0); }); - assertEquals(c1.getMarkDeletedPosition(), pos); + assertThat(c1.getMarkDeletedPosition()).isEqualByComparingTo( + PositionFactory.create(ledger.ledgers.lastKey(), -1)); } @Test(timeOut = 20000, dataProvider = "useOpenRangeSet") @@ -2161,7 +2170,7 @@ void testSkipEntriesWithIndividualDeletedMessages(boolean useOpenRangeSet) throw c1.skipEntries(3, IndividualDeletedEntries.Exclude); assertEquals(c1.getNumberOfEntries(), 0); assertEquals(c1.getReadPosition(), PositionFactory.create(pos5.getLedgerId() + 1, 0)); - assertEquals(c1.getMarkDeletedPosition(), pos5); + assertEquals(c1.getMarkDeletedPosition(), PositionFactory.create(pos5.getLedgerId() + 1, -1)); Position pos6 = ledger.addEntry("dummy-entry-1".getBytes(Encoding)); Position pos7 = ledger.addEntry("dummy-entry-2".getBytes(Encoding)); @@ -5926,6 +5935,318 @@ public void testConcurrentRead() throws Exception { assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), "msg".getBytes()); } + @Test(timeOut = 20000) + public void testAsyncMarkDeleteMoveToNextLedgerInNonRolloverScenario() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + int maxEntriesPerLedger = 10; + config.setMaxEntriesPerLedger(maxEntriesPerLedger); + ManagedLedger ledger = factory.open("async_mark_delete_move_to_next_ledger_in_non_rollover_test", config); + final ManagedCursor c1 = ledger.openCursor("c1"); + + final int addEntryNum = 101; + final CountDownLatch addEntryLatch = new CountDownLatch(addEntryNum); + AtomicReference secondLastPosition = new AtomicReference<>(); + AtomicReference lastPosition = new AtomicReference<>(); + for (int i = 0; i < addEntryNum; i++) { + String entryStr = "entry-" + i; + int index = i; + ledger.asyncAddEntry(entryStr.getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + if (index == addEntryNum - 2) { + secondLastPosition.set(position); + } + lastPosition.set(position); + addEntryLatch.countDown(); + } + }, null); + } + addEntryLatch.await(); + + assertEquals(ledger.getNumberOfEntries(), addEntryNum); + assertEquals(secondLastPosition.get().getEntryId(), maxEntriesPerLedger - 1); + assertEquals(lastPosition.get().getEntryId(), addEntryNum % maxEntriesPerLedger - 1); + + final CountDownLatch c1MarkDeleteLatch = new CountDownLatch(1); + c1.asyncMarkDelete(secondLastPosition.get(), new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + c1MarkDeleteLatch.countDown(); + } + }, null); + c1MarkDeleteLatch.await(); + + assertEquals(c1.getNumberOfEntries(), 1); + // Should move to lastPositionLedgerId:-1 since entries in secondLastPositionLedger are all consumed. + assertEquals(c1.getMarkDeletedPosition(), + PositionFactory.create(lastPosition.get().getLedgerId(), -1)); + + // Reopen + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ledger = factory2.open("async_mark_delete_move_to_next_ledger_in_non_rollover_test"); + ManagedCursor c2 = ledger.openCursor("c1"); + + assertEquals(c2.getNumberOfEntries(), 1); + assertEquals(c2.getMarkDeletedPosition(), + PositionFactory.create(lastPosition.get().getLedgerId(), -1)); + + final CountDownLatch c2MarkDeleteLatch = new CountDownLatch(1); + c2.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + c2MarkDeleteLatch.countDown(); + } + }, null); + c2MarkDeleteLatch.await(); + + assertEquals(c2.getNumberOfEntries(), 0); + // Should move to nextLedgerId:-1 since entries in lastPositionLedger are all consumed. + // Here we can guarantee next ledger is created since the asyncOpenLedger guarantees this order. + assertThat(c2.getMarkDeletedPosition()).isEqualByComparingTo( + PositionFactory.create(ledger.getLedgersInfo().lastKey(), -1)); + } + + @Test(timeOut = 20000) + public void testAsyncMarkDeleteMayMoveToNextLedgerInRolloverScenario() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + // Make sure acked ledgers will never be trimmed in this test. + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(1, TimeUnit.MINUTES); + config.setRetentionSizeInMB(5); + ManagedLedger ledger = factory.open("async_mark_delete_may_move_to_next_ledger_in_non_rollover_test", config); + final ManagedCursor c1 = ledger.openCursor("c1"); + final AtomicReference lastPosition = new AtomicReference<>(); + + final int num = 100; + final CountDownLatch addEntryLatch = new CountDownLatch(num); + for (int i = 0; i < num; i++) { + ledger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + lastPosition.set(position); + c1.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + addEntryLatch.countDown(); + } + }, null); + } + }, null); + } + addEntryLatch.await(); + + assertEquals(c1.getNumberOfEntries(), 0); + assertThat(c1.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get()); + + // Reopen + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedger newLedger = ManagedLedgerTestUtil.retry( + () -> factory2.open("async_mark_delete_may_move_to_next_ledger_in_non_rollover_test")); + ManagedCursor c2 = newLedger.openCursor("c1"); + + assertEquals(c2.getNumberOfEntries(), 0); + assertThat(c2.getMarkDeletedPosition()).isEqualByComparingTo( + PositionFactory.create(newLedger.getLedgersInfo().lastKey(), -1)); + } + + @Test(timeOut = 20000) + public void testAsyncMarkDeleteMoveToNextLedgerOneByOne() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + int maxEntriesPerLedger = 10; + config.setMaxEntriesPerLedger(maxEntriesPerLedger); + ManagedLedger ledger = factory.open("async_mark_delete_move_to_next_ledger_one_by_one_test", config); + final ManagedCursor c1 = ledger.openCursor("c1"); + final AtomicReference lastPosition = new AtomicReference<>(); + + final int entryNum = 101; + final CountDownLatch addEntryLatch = new CountDownLatch(entryNum); + Deque positions = new ConcurrentLinkedDeque<>(); + for (int i = 0; i < entryNum; i++) { + ledger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + lastPosition.set(position); + positions.offer(position); + addEntryLatch.countDown(); + } + }, null); + } + addEntryLatch.await(); + + for (int i = 0; i < entryNum; i++) { + CountDownLatch markDeleteLatch = new CountDownLatch(1); + Position position = positions.poll(); + c1.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + markDeleteLatch.countDown(); + } + }, null); + markDeleteLatch.await(); + assertEquals(c1.getNumberOfEntries(), entryNum - i - 1); + if ((i + 1) % maxEntriesPerLedger == 0) { + assertThat(c1.getMarkDeletedPosition()).isGreaterThan(position); + } else { + assertThat(c1.getMarkDeletedPosition()).isEqualByComparingTo(position); + } + } + + // Reopen + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedger newLedger = ManagedLedgerTestUtil.retry( + () -> factory2.open("async_mark_delete_move_to_next_ledger_one_by_one_test")); + ManagedCursor c2 = newLedger.openCursor("c1"); + assertEquals(c2.getNumberOfEntries(), 0); + assertThat(c2.getMarkDeletedPosition()).isEqualByComparingTo( + PositionFactory.create(newLedger.getLedgersInfo().lastKey(), -1)); + } + + @Test(timeOut = 20000) + public void testAsyncMarkDeleteNextLedgerMinusOneEntryIdPosition() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + int maxEntriesPerLedger = 10; + config.setMaxEntriesPerLedger(maxEntriesPerLedger); + ManagedLedger ledger = factory.open("async_mark_delete_next_ledger_minus_one_entry_id_test", config); + final ManagedCursor c1 = ledger.openCursor("c1"); + + final int entryNum = 100; + for (int i = 0; i < entryNum / maxEntriesPerLedger; i++) { + final CountDownLatch addEntryLatch = new CountDownLatch(maxEntriesPerLedger); + for (int j = 0; j < maxEntriesPerLedger; j++) { + ledger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + addEntryLatch.countDown(); + } + }, null); + } + addEntryLatch.await(); + + // Wait for new ledger creation completed. + Awaitility.await().untilAsserted(() -> assertThat(ledger.getLedgersInfo().size()).isEqualTo(2)); + + long ledgerId = ledger.getLedgersInfo().firstEntry().getValue().getLedgerId(); + Position firstEntryPosition = PositionFactory.create(ledgerId, 0); + c1.markDelete(firstEntryPosition); + assertThat(c1.getMarkDeletedPosition()).isEqualTo(firstEntryPosition); + + Long nextLedgerId = ledger.getLedgersInfo().lastEntry().getKey(); + Position markDeletePosition = PositionFactory.create(nextLedgerId, -1); + CountDownLatch markDeleteLatch = new CountDownLatch(1); + c1.asyncMarkDelete(markDeletePosition, new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + markDeleteLatch.countDown(); + } + }, null); + markDeleteLatch.await(); + assertEquals(c1.getNumberOfEntries(), 0); + assertThat(c1.getMarkDeletedPosition()).isEqualByComparingTo(markDeletePosition); + } + } + + @Test(timeOut = 20000) + void testMarkDeletePreviousLacManyTimesInRolloverScenario() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + // Set retention in order to not trim ledger. + config.setRetentionTime(20, TimeUnit.SECONDS); + config.setRetentionSizeInMB(1); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testMarkDeletePreviousLacManyTimesInRolloverScenario", config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + Position position = ledger.addEntry("entry1".getBytes(Encoding)); + cursor.markDelete(position); + + // Wait for new ledger created. + Awaitility.await().untilAsserted(() -> assertThat(ledger.getLedgersInfo().size()).isEqualTo(2)); + + int markDeleteTimes = 10; + for (int i = 0; i < markDeleteTimes; i++) { + cursor.markDelete(position); + } + + assertThat(cursor.getMarkDeletedPosition()).isEqualByComparingTo( + PositionFactory.create(ledger.getLedgersInfo().lastKey(), -1)); + } + + @Test + void testMarkDeletePositionNeverExceedLacOrNextLedgerMinusOneEntryIdPos() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + int maxEntriesPerLedger = 1; + config.setMaxEntriesPerLedger(maxEntriesPerLedger); + config.setRetentionTime(0, TimeUnit.MILLISECONDS); + config.setRetentionSizeInMB(0); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testMarkDeletePositionNeverExceedLacOrNextLedgerMinusOneEntryIdPos", + config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + Position p3 = ledger.addEntry("entry-3".getBytes(Encoding)); + + // Wait for new ledger created. + Awaitility.await().untilAsserted(() -> assertThat(ledger.getLedgersInfo().size()).isEqualTo(4)); + long emptyLedgerId = ledger.getLedgersInfo().lastEntry().getValue().getLedgerId(); + + cursor.markDelete(p1); + assertThat(cursor.getMarkDeletedPosition()).isEqualTo(PositionFactory.create(p2.getLedgerId(), -1)); + + // If requestMarkDeleteLedgerId == lacLedgerId, and current ledger is all consumed, and next ledger exists, + // we move markDeletePosition to nextLedgerId:-1. This is reasonable since we can also mark delete + // nextLedgerId:-1, which is also larger than lac. + Position lacPlusOnePos = PositionFactory.create(p2.getLedgerId(), 1); + cursor.markDelete(lacPlusOnePos); + assertThat(cursor.getMarkDeletedPosition()).isEqualTo(PositionFactory.create(p3.getLedgerId(), -1)); + + // But we can not mark delete emptyLedgerId:1, since it is a valid entry position. + Position invalidMarkDeletePos = PositionFactory.create(emptyLedgerId, 1); + Throwable thrown = expectThrows(ManagedLedgerException.class, () -> { + cursor.markDelete(invalidMarkDeletePos); + }); + assertThat(thrown.getMessage()).contains("Invalid mark deleted position"); + } + class TestPulsarMockBookKeeper extends PulsarMockBookKeeper { Map ledgerErrors = new HashMap<>(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 12bbd66670b50..89e4bec853d2b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2289,7 +2289,7 @@ public void testRetention() throws Exception { assertTrue(ml.getTotalSize() > "shortmessage".getBytes().length); } - @Test(enabled = true) + @Test(timeOut = 20000) public void testNoRetention() throws Exception { @Cleanup("shutdown") ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); @@ -2302,20 +2302,32 @@ public void testNoRetention() throws Exception { ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("noretention_test_ledger", config); ManagedCursor c1 = ml.openCursor("c1noretention"); ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes()); + + // Wait for new ledger creation completed + Awaitility.await() + .untilAsserted(() -> assertThat(ml.getLedgersInfo().size()).isEqualTo(2)); + c1.skipEntries(1, IndividualDeletedEntries.Exclude); ml.close(); // reopen ml - ml = (ManagedLedgerImpl) factory.open("noretention_test_ledger", config); - c1 = ml.openCursor("c1noretention"); - ml.addEntry("shortmessage".getBytes()); + ManagedLedgerImpl newMl = (ManagedLedgerImpl) factory.open("noretention_test_ledger", config); + c1 = newMl.openCursor("c1noretention"); + newMl.addEntry("shortmessage".getBytes()); + + // Wait for new ledger creation completed + Awaitility.await() + .untilAsserted(() -> assertThat(newMl.getLedgersInfo().size()).isEqualTo(2)); + c1.skipEntries(1, IndividualDeletedEntries.Exclude); - // sleep for trim - Thread.sleep(1000); - ml.close(); - assertTrue(ml.getLedgersInfoAsList().size() <= 1); - assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length); + // Wait for ledger trim process completed + Awaitility.await() + .untilAsserted(() -> assertThat(newMl.getLedgersInfo().size()).isEqualTo(1)); + + newMl.close(); + + assertThat(newMl.getTotalSize()).isEqualTo(0); } @Test @@ -5263,14 +5275,15 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // 3. Add Entry 2. Triggers second rollover process. // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover - Position p = ledger.addEntry("entry-2".getBytes(Encoding)); + ledger.addEntry("entry-2".getBytes(Encoding)); // Wait for background tasks (metadata callback and trim) to complete. // We expect only one ledger (Rollover and trim happened). Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() == 1); - // All ledgers are trimmed, left one empty ledger, trim process moves markDeletedPosition to p.getLedgerId():0 - assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), 0)); - assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), 0)); + long emptyLedgerId = ledger.getLedgersInfo().lastEntry().getValue().getLedgerId(); + // All ledgers are trimmed, left one empty ledger, trim process moves markDeletedPosition to emptyLedgerId:-1. + assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(emptyLedgerId, -1)); + assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(emptyLedgerId, -1)); // Verify properties are preserved after cursor reset assertEquals(cursor.getProperties(), properties); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 19383d50f2e06..c07f6777723e9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -1370,11 +1370,10 @@ public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Excepti rolloverStats(); Awaitility.await().untilAsserted(() -> { - // Cause the last ledger is empty, it is not possible to skip first ledger, - // so the number of ledgers will keep unchanged, and backlog is clear + // Mark deleting moves cursor position to nextLedgerId:-1, and backlog is clear. PersistentTopicInternalStats latestInternalStats = admin.topics().getInternalStats(topic); - assertEquals(latestInternalStats.ledgers.size(), 2); - assertEquals(latestInternalStats.ledgers.get(1).entries, 0); + assertEquals(latestInternalStats.ledgers.size(), 1); + assertEquals(latestInternalStats.ledgers.get(0).entries, 0); TopicStats latestStats = getTopicStats(topic); assertEquals(latestStats.getSubscriptions().get(subName).getMsgBacklog(), 0); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java index beb054d86f5e6..2203a18cb5786 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java @@ -94,7 +94,6 @@ public void testMessageExpiryAfterTopicUnload() throws Exception { } FutureUtil.waitForAll(sendFutureList).get(); MessageIdImpl firstMessageId = (MessageIdImpl) sendFutureList.get(0).get(); - MessageIdImpl lastMessageId = (MessageIdImpl) sendFutureList.get(sendFutureList.size() - 1).get(); producer.close(); // unload a reload the topic // this action created a new ledger @@ -113,12 +112,15 @@ public void testMessageExpiryAfterTopicUnload() throws Exception { .pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> { this.runMessageExpiryCheck(); log.info("***** run message expiry now"); - // verify that the markDeletePosition was moved forward, and exacly to the last message + // verify that the markDeletePosition was moved forward, and exactly to the last message PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName); CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName); log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition); - assertEquals(statsAfterExpire.markDeletePosition, PositionFactory.create(lastMessageId.getLedgerId(), - lastMessageId.getEntryId()).toString()); + // New ledger created, move markDeletePosition to currentLedgerId:-1 + long currentLedgerId = + internalStatsAfterExpire.ledgers.get(internalStatsAfterExpire.ledgers.size() - 1).ledgerId; + assertEquals(statsAfterExpire.markDeletePosition, + PositionFactory.create(currentLedgerId, -1).toString()); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 5707a6cca60b8..65e3aee0e9a96 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -425,7 +425,7 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { assertEquals(lastLedgerInfo.getEntries(), 0); assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1); - // this will make sure that all entries should be deleted + // This will make sure that all entries should be deleted, and move markDeletePosition to nextLedgerId:-1. Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds)); bkc.deleteLedger(ledgers.get(0).getLedgerId()); @@ -438,9 +438,9 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { assertTrue(monitor.expireMessages(ttlSeconds)); Awaitility.await().untilAsserted(() -> { Position markDeletePosition = c1.getMarkDeletedPosition(); - // The markDeletePosition points to the last entry of the previous ledger in lastLedgerInfo. - assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId() - 1); - assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 1); + // The markDeletePosition already moved to nextLedgerId:-1 + assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId()); + assertEquals(markDeletePosition.getEntryId(), -1); }); c1.close(); @@ -481,7 +481,7 @@ void testMessageExpiryAsyncWithTimestampNonRecoverableException() throws Excepti assertEquals(lastLedgerInfo.getEntries(), 0); assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1); - // this will make sure that all entries should be deleted + // This will make sure that all entries should be deleted, and move markDeletePosition to nextLedgerId:-1. Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds)); bkc.deleteLedger(ledgers.get(0).getLedgerId()); @@ -494,9 +494,9 @@ void testMessageExpiryAsyncWithTimestampNonRecoverableException() throws Excepti assertTrue(monitor.expireMessagesAsync(ttlSeconds).get()); Awaitility.await().untilAsserted(() -> { Position markDeletePosition = c1.getMarkDeletedPosition(); - // The markDeletePosition points to the last entry of the previous ledger in lastLedgerInfo. - assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId() - 1); - assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 1); + // The markDeletePosition already moved to nextLedgerId:-1 + assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId()); + assertEquals(markDeletePosition.getEntryId(), -1); }); c1.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java index bba0d2050d860..bcc613b3198ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import java.util.concurrent.CompletableFuture; @@ -92,7 +93,7 @@ public void testEstimatedTimeBasedBacklogQuotaCheckWhenNoBacklog() throws Except admin.topics().skipAllMessages(tp, "s1"); Awaitility.await().untilAsserted(() -> { assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0); - assertEquals(cursor.getMarkDeletedPosition(), ml.getLastConfirmedEntry()); + assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(ml.getLastConfirmedEntry()); }); CompletableFuture completableFuture = new CompletableFuture(); ml.trimConsumedLedgersInBackground(completableFuture); @@ -100,7 +101,7 @@ public void testEstimatedTimeBasedBacklogQuotaCheckWhenNoBacklog() throws Except Awaitility.await().untilAsserted(() -> { assertEquals(ml.getLedgersInfo().size(), 1); assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0); - assertEquals(cursor.getMarkDeletedPosition(), ml.getLastConfirmedEntry()); + assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(ml.getLastConfirmedEntry()); }); // Verify: "persistentTopic.estimatedTimeBasedBacklogQuotaCheck" will not get a NullPointerException. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index f5e79c8a81218..896684224abaa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -55,6 +56,7 @@ import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.api.proto.MessageIdData; @@ -749,7 +751,7 @@ public void testReadCompleteMessagesDuringTopicUnloading() throws Exception { .create(); // Create a consumer to generate a durable cursor, to avoid deleting the ledger before the reader receives. @Cleanup - org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .subscriptionName("sub_" + UUID.randomUUID()) .topic(topic) .subscribe(); @@ -821,14 +823,20 @@ public void testReadCompactedLatestMessageWithInclusive() throws Exception { lastMessage.join(); admin.topics().unload(topic); admin.topics().triggerCompaction(topic); + final MessageId finalLastMessage = lastMessage.get(); Awaitility.await().untilAsserted(() -> { PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); Assert.assertEquals(stats.compactedLedger.entries, numMessages); Assert.assertEquals(admin.topics().getStats(topic) .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); - Assert.assertEquals(stats.lastConfirmedEntry, - stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition); + long ledgerId = ((MessageIdImpl) finalLastMessage).getLedgerId(); + long entryId = ((MessageIdImpl) finalLastMessage).getEntryId(); + Assert.assertEquals(stats.lastConfirmedEntry, PositionFactory.create(ledgerId, entryId).toString()); + // New ledger created, move markDeletePosition to currentLedgerId:-1 + long currentLedgerId = stats.ledgers.get(stats.ledgers.size() - 1).ledgerId; + Assert.assertEquals(stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition, + PositionFactory.create(currentLedgerId, -1).toString()); }); Awaitility.await()