From 9a0ed8bd2900bc4f3c91ea0ab2292dba6523608b Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 20 Jan 2026 09:28:00 +0800 Subject: [PATCH 1/2] [fix][broker] Add testAsyncDeleteNeverLoseMarkDeleteProperties() to reproduce the issue --- .../mledger/impl/ManagedCursorTest.java | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index bfb9b6ecca1b3..0bdb1f75c8d20 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5926,6 +5926,106 @@ public void testConcurrentRead() throws Exception { assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), "msg".getBytes()); } + @Test(timeOut = 20000) + public void testAsyncMarkDeleteNeverLoseProperties() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(3); + config.setRetentionTime(20, TimeUnit.SECONDS); + config.setRetentionSizeInMB(5); + + @Cleanup ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testAsyncMarkDeleteNeverLoseProperties", config); + @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + int numMessages = 20; + List positions = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Position pos = ledger.addEntry("entry-1".getBytes(Encoding)); + positions.add(pos); + } + + String propertyKey = "test-property"; + CountDownLatch latch = new CountDownLatch(numMessages); + for (int i = 0; i < numMessages; i++) { + Map properties = new HashMap<>(); + properties.put(propertyKey, (long) i); + cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + } + + latch.await(); + + int lastIndex = numMessages - 1; + assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex)); + Map properties = cursor.getProperties(); + assertEquals(properties.size(), 1); + assertEquals(properties.get(propertyKey), lastIndex); + } + + @Test(timeOut = 20000) + public void testAsyncDeleteNeverLoseMarkDeleteProperties() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(11); + + @Cleanup ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testAsyncDeleteNeverLoseMarkDeleteProperty", config); + @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + int numMessages = 10; + List positions = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Position pos = ledger.addEntry("entry-1".getBytes(Encoding)); + positions.add(pos); + } + + String propertyKey = "test-property"; + CountDownLatch latch = new CountDownLatch(numMessages); + for (int i = 0; i < numMessages - 1; i++) { + Map properties = new HashMap<>(); + properties.put(propertyKey, (long) i); + cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + } + + int lastIndex = numMessages - 1; + cursor.asyncDelete(positions.get(lastIndex), new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Delete should succeed"); + } + }, null); + + latch.await(); + + assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex)); + Map properties = cursor.getProperties(); + assertEquals(properties.size(), 1); + assertEquals(properties.get(propertyKey), lastIndex); + } + class TestPulsarMockBookKeeper extends PulsarMockBookKeeper { Map ledgerErrors = new HashMap<>(); From 36ce6366db139dbc85dd9318f4a0148abf3ad62f Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 20 Jan 2026 09:34:10 +0800 Subject: [PATCH 2/2] [fix][broker] Fix the issue --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 +---- .../apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c120b9fa719ec..1ab6ceab9a2a0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2681,10 +2681,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } try { - Map properties = lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties - : Collections.emptyMap(); - - internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() { + internalAsyncMarkDelete(newMarkDeletePosition, null, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { callback.deleteComplete(ctx); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 0bdb1f75c8d20..f41269b49af9c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -6023,7 +6023,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex)); Map properties = cursor.getProperties(); assertEquals(properties.size(), 1); - assertEquals(properties.get(propertyKey), lastIndex); + assertEquals(properties.get(propertyKey), lastIndex - 1); } class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {