From 21d418e9c3c3c9b37b6cac48c8210d227f7a9999 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 2 May 2021 20:27:12 +0800 Subject: [PATCH 1/2] Fix publish callback's entry data is null during ledger rollover --- .../bookkeeper/mledger/impl/OpAddEntry.java | 15 ++++++++------- .../mledger/impl/ManagedLedgerTest.java | 17 +++++++++++------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index fc3054ecdf92d..ba0ec05384bde 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -197,21 +197,18 @@ public void safeRun() { ml.lastConfirmedEntry = lastEntry; if (closeWhenDone) { - ReferenceCountUtil.release(data); log.info("[{}] Closing ledger {} for being full", ml.getName(), ledger.getId()); + // `data` will be released in `closeComplete` ledger.asyncClose(this, ctx); } else { updateLatency(); AddEntryCallback cb = callbackUpdater.getAndSet(this, null); if (cb != null) { cb.addComplete(lastEntry, data.asReadOnly(), ctx); - ReferenceCountUtil.release(data); ml.notifyCursors(); ml.notifyWaitingEntryCallBacks(); - this.recycle(); - } else { - ReferenceCountUtil.release(data); } + releaseAndRecycle(); } } @@ -231,11 +228,11 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) { AddEntryCallback cb = callbackUpdater.getAndSet(this, null); if (cb != null) { - cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx); + cb.addComplete(PositionImpl.get(lh.getId(), entryId), data.asReadOnly(), ctx); ml.notifyCursors(); ml.notifyWaitingEntryCallBacks(); - this.recycle(); } + releaseAndRecycle(); } private void updateLatency() { @@ -346,4 +343,8 @@ public String toString() { '}'; } + private void releaseAndRecycle() { + ReferenceCountUtil.release(data); + recycle(); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 80189820721a9..9bca66351f3d0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -552,13 +552,15 @@ private byte[] copyBytesFromByteBuf(final ByteBuf buf) { @Test(timeOut = 20000) public void asyncAddEntryWithoutError() throws Exception { - ManagedLedger ledger = factory.open("my_test_ledger"); + ManagedLedger ledger = factory.open("my_test_ledger", + new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); ledger.openCursor("test-cursor"); - final CountDownLatch counter = new CountDownLatch(1); + final int count = 4; + final CountDownLatch counter = new CountDownLatch(count); final byte[] bytes = "dummy-entry-1".getBytes(Encoding); - ledger.asyncAddEntry(bytes, new AddEntryCallback() { + AddEntryCallback callback = new AddEntryCallback() { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { assertNull(ctx); @@ -579,11 +581,14 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { fail(exception.getMessage()); } - }, null); + }; + for (int i = 0; i < count; i++) { + ledger.asyncAddEntry(bytes, callback, null); + } counter.await(); - assertEquals(ledger.getNumberOfEntries(), 1); - assertEquals(ledger.getTotalSize(), "dummy-entry-1".getBytes(Encoding).length); + assertEquals(ledger.getNumberOfEntries(), count); + assertEquals(ledger.getTotalSize(), "dummy-entry-1".getBytes(Encoding).length * count); } @Test(timeOut = 20000) From d0598977349ce356b833925002bebcd7752b9256 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 2 May 2021 20:42:27 +0800 Subject: [PATCH 2/2] Don't call recycle() when the callback is null --- .../bookkeeper/mledger/impl/OpAddEntry.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index ba0ec05384bde..e30c578fb556d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -207,8 +207,11 @@ public void safeRun() { cb.addComplete(lastEntry, data.asReadOnly(), ctx); ml.notifyCursors(); ml.notifyWaitingEntryCallBacks(); + ReferenceCountUtil.release(data); + this.recycle(); + } else { + ReferenceCountUtil.release(data); } - releaseAndRecycle(); } } @@ -231,8 +234,11 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) { cb.addComplete(PositionImpl.get(lh.getId(), entryId), data.asReadOnly(), ctx); ml.notifyCursors(); ml.notifyWaitingEntryCallBacks(); + ReferenceCountUtil.release(data); + this.recycle(); + } else { + ReferenceCountUtil.release(data); } - releaseAndRecycle(); } private void updateLatency() { @@ -342,9 +348,4 @@ public String toString() { ", dataLength=" + dataLength + '}'; } - - private void releaseAndRecycle() { - ReferenceCountUtil.release(data); - recycle(); - } }