diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index fc3054ecdf92d..e30c578fb556d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -197,17 +197,17 @@ public void safeRun() { ml.lastConfirmedEntry = lastEntry; if (closeWhenDone) { - ReferenceCountUtil.release(data); log.info("[{}] Closing ledger {} for being full", ml.getName(), ledger.getId()); + // `data` will be released in `closeComplete` ledger.asyncClose(this, ctx); } else { updateLatency(); AddEntryCallback cb = callbackUpdater.getAndSet(this, null); if (cb != null) { cb.addComplete(lastEntry, data.asReadOnly(), ctx); - ReferenceCountUtil.release(data); ml.notifyCursors(); ml.notifyWaitingEntryCallBacks(); + ReferenceCountUtil.release(data); this.recycle(); } else { ReferenceCountUtil.release(data); @@ -231,10 +231,13 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) { AddEntryCallback cb = callbackUpdater.getAndSet(this, null); if (cb != null) { - cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx); + cb.addComplete(PositionImpl.get(lh.getId(), entryId), data.asReadOnly(), ctx); ml.notifyCursors(); ml.notifyWaitingEntryCallBacks(); + ReferenceCountUtil.release(data); this.recycle(); + } else { + ReferenceCountUtil.release(data); } } @@ -345,5 +348,4 @@ public String toString() { ", dataLength=" + dataLength + '}'; } - } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 80189820721a9..9bca66351f3d0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -552,13 +552,15 @@ private byte[] copyBytesFromByteBuf(final ByteBuf buf) { @Test(timeOut = 20000) public void asyncAddEntryWithoutError() throws Exception { - ManagedLedger ledger = factory.open("my_test_ledger"); + ManagedLedger ledger = factory.open("my_test_ledger", + new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); ledger.openCursor("test-cursor"); - final CountDownLatch counter = new CountDownLatch(1); + final int count = 4; + final CountDownLatch counter = new CountDownLatch(count); final byte[] bytes = "dummy-entry-1".getBytes(Encoding); - ledger.asyncAddEntry(bytes, new AddEntryCallback() { + AddEntryCallback callback = new AddEntryCallback() { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { assertNull(ctx); @@ -579,11 +581,14 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { fail(exception.getMessage()); } - }, null); + }; + for (int i = 0; i < count; i++) { + ledger.asyncAddEntry(bytes, callback, null); + } counter.await(); - assertEquals(ledger.getNumberOfEntries(), 1); - assertEquals(ledger.getTotalSize(), "dummy-entry-1".getBytes(Encoding).length); + assertEquals(ledger.getNumberOfEntries(), count); + assertEquals(ledger.getTotalSize(), "dummy-entry-1".getBytes(Encoding).length * count); } @Test(timeOut = 20000)