diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c120b9fa719ec..1ab6ceab9a2a0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2681,10 +2681,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } try { - Map properties = lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties - : Collections.emptyMap(); - - internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() { + internalAsyncMarkDelete(newMarkDeletePosition, null, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { callback.deleteComplete(ctx); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index bfb9b6ecca1b3..f41269b49af9c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5926,6 +5926,106 @@ public void testConcurrentRead() throws Exception { assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), "msg".getBytes()); } + @Test(timeOut = 20000) + public void testAsyncMarkDeleteNeverLoseProperties() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(3); + config.setRetentionTime(20, TimeUnit.SECONDS); + config.setRetentionSizeInMB(5); + + @Cleanup ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testAsyncMarkDeleteNeverLoseProperties", config); + @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + int numMessages = 20; + List positions = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Position pos = ledger.addEntry("entry-1".getBytes(Encoding)); + positions.add(pos); + } + + String propertyKey = "test-property"; + CountDownLatch latch = new CountDownLatch(numMessages); + for (int i = 0; i < numMessages; i++) { + Map properties = new HashMap<>(); + properties.put(propertyKey, (long) i); + cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + } + + latch.await(); + + int lastIndex = numMessages - 1; + assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex)); + Map properties = cursor.getProperties(); + assertEquals(properties.size(), 1); + assertEquals(properties.get(propertyKey), lastIndex); + } + + @Test(timeOut = 20000) + public void testAsyncDeleteNeverLoseMarkDeleteProperties() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(11); + + @Cleanup ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testAsyncDeleteNeverLoseMarkDeleteProperty", config); + @Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + int numMessages = 10; + List positions = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Position pos = ledger.addEntry("entry-1".getBytes(Encoding)); + positions.add(pos); + } + + String propertyKey = "test-property"; + CountDownLatch latch = new CountDownLatch(numMessages); + for (int i = 0; i < numMessages - 1; i++) { + Map properties = new HashMap<>(); + properties.put(propertyKey, (long) i); + cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + } + + int lastIndex = numMessages - 1; + cursor.asyncDelete(positions.get(lastIndex), new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Delete should succeed"); + } + }, null); + + latch.await(); + + assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex)); + Map properties = cursor.getProperties(); + assertEquals(properties.size(), 1); + assertEquals(properties.get(propertyKey), lastIndex - 1); + } + class TestPulsarMockBookKeeper extends PulsarMockBookKeeper { Map ledgerErrors = new HashMap<>();