From 002e478a5009e8b0d0ce92fdb7c0aaaf571a1fd0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 20 Jan 2021 16:50:35 +0800 Subject: [PATCH 1/4] Support fetching metadata from entry data in publish callback --- .../apache/bookkeeper/mledger/AsyncCallbacks.java | 4 +++- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++-- .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 9 ++++----- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 3 ++- .../mledger/impl/ManagedLedgerBkTest.java | 4 +++- .../mledger/impl/ManagedLedgerErrorsTest.java | 8 +++++--- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 13 +++++++------ .../mledger/impl/NonDurableCursorTest.java | 4 +++- .../org/apache/pulsar/broker/service/Topic.java | 3 +++ .../broker/service/persistent/PersistentTopic.java | 3 ++- .../buffer/impl/TopicTransactionBuffer.java | 8 ++++---- .../PersistentDispatcherFailoverConsumerTest.java | 3 ++- .../pulsar/broker/service/PersistentTopicTest.java | 3 ++- .../apache/pulsar/broker/service/ServerCnxTest.java | 4 +++- .../service/persistent/MessageDuplicationTest.java | 12 ++++++------ .../pulsar/testclient/ManagedLedgerWriter.java | 2 +- .../coordinator/impl/MLTransactionLogImpl.java | 2 +- 17 files changed, 53 insertions(+), 36 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java index 62568031edb09..7205b776a9753 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; + +import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -64,7 +66,7 @@ interface DeleteCursorCallback { } interface AddEntryCallback { - void addComplete(Position position, Object ctx); + void addComplete(Position position, ByteBuf entryData, Object ctx); void addFailed(ManagedLedgerException exception, Object ctx); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9264e152040bf..7241ee5017209 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -593,7 +593,7 @@ class Result { asyncAddEntry(data, offset, length, new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { result.position = position; counter.countDown(); } @@ -628,7 +628,7 @@ class Result { asyncAddEntry(data, numberOfMessages, offset, length, new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { result.position = position; counter.countDown(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index d4ab73dff9a3f..d16546115f466 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -192,21 +192,20 @@ public void safeRun() { entry.release(); } - // We are done using the byte buffer - ReferenceCountUtil.release(data); - PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId); ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml); ml.lastConfirmedEntry = lastEntry; if (closeWhenDone) { + ReferenceCountUtil.release(data); log.info("[{}] Closing ledger {} for being full", ml.getName(), ledger.getId()); ledger.asyncClose(this, ctx); } else { updateLatency(); AddEntryCallback cb = callbackUpdater.getAndSet(this, null); if (cb != null) { - cb.addComplete(lastEntry, ctx); + cb.addComplete(lastEntry, data.asReadOnly(), ctx); + ReferenceCountUtil.release(data); ml.notifyCursors(); this.recycle(); } @@ -229,7 +228,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) { AddEntryCallback cb = callbackUpdater.getAndSet(this, null); if (cb != null) { - cb.addComplete(PositionImpl.get(lh.getId(), entryId), ctx); + cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx); ml.notifyCursors(); this.recycle(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index ffd62b3b23f96..28d0eb2a6f633 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -1000,7 +1001,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { } @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { lastPosition.set(position); c1.asyncMarkDelete(position, new MarkDeleteCallback() { @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index d5273b901b6be..c187a180fb311 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.client.api.DigestType; @@ -484,7 +486,7 @@ public void managedLedgerClosed() throws Exception { ledger1.asyncAddEntry(("entry-" + i).getBytes(), new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { latch.countDown(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 977346a66ff20..ba5f1725b0a2e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -27,6 +27,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -332,7 +334,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { // not-ok } - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { // ok } }, null); @@ -342,7 +344,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { promise.complete(null); } - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { promise.completeExceptionally(new Exception("should have failed")); } }, null); @@ -445,7 +447,7 @@ public void recoverLongTimeAfterMultipleWriteErrors() throws Exception { AddEntryCallback cb = new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { counter.countDown(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 4c14fa8a48cb1..cdaa931934cc6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -35,6 +35,7 @@ import static org.testng.Assert.fail; import com.google.common.base.Charsets; import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; @@ -295,7 +296,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { ledger.asyncAddEntry("test".getBytes(Encoding), new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { @SuppressWarnings("unchecked") Pair pair = (Pair) ctx; ManagedLedger ledger = pair.getLeft(); @@ -548,7 +549,7 @@ public void asyncAddEntryWithoutError() throws Exception { ledger.asyncAddEntry("dummy-entry-1".getBytes(Encoding), new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { assertNull(ctx); counter.countDown(); @@ -577,7 +578,7 @@ public void doubleAsyncAddEntryWithoutError() throws Exception { final String content = "dummy-entry-" + i; ledger.asyncAddEntry(content.getBytes(Encoding), new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { assertNotNull(ctx); log.info("Successfully added {}", content); @@ -607,7 +608,7 @@ public void asyncAddEntryWithError() throws Exception { ledger.asyncAddEntry("dummy-entry-1".getBytes(Encoding), new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { fail("Should have failed"); } @@ -832,7 +833,7 @@ public void testAsyncAddEntryAndSyncClose() throws Exception { String content = "entry-" + i; ledger.asyncAddEntry(content.getBytes(Encoding), new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { counter.countDown(); } @@ -2615,7 +2616,7 @@ public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback ledger.asyncAddEntry(data.getBytes(), new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { addSuccess.set(true); latch.countDown(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 7a1503570bbb4..8ce209b29b296 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -37,6 +37,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; @@ -478,7 +480,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { } @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { lastPosition.set(position); c1.asyncMarkDelete(position, new MarkDeleteCallback() { @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index f04b7d655a954..a18e78f36b516 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -76,6 +76,9 @@ default long getOriginalSequenceId() { void completed(Exception e, long ledgerId, long entryId); + default void setMetadataFromEntryData(ByteBuf entryData) { + } + default long getHighestSequenceId() { return -1L; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cba18613e6ff4..76c58aaf25fad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -449,12 +449,13 @@ private void decrementPendingWriteOpsAndCheck() { } @Override - public void addComplete(Position pos, Object ctx) { + public void addComplete(Position pos, ByteBuf entryData, Object ctx) { PublishContext publishContext = (PublishContext) ctx; PositionImpl position = (PositionImpl) pos; // Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position); + publishContext.setMetadataFromEntryData(entryData); publishContext.completed(null, position.getLedgerId(), position.getEntryId()); // in order to sync the max position when cursor read entries transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index d6f8bdf02479f..4c364acde9994 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -62,7 +62,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI CompletableFuture completableFuture = new CompletableFuture<>(); topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { synchronized (TopicTransactionBuffer.this) { if (!ongoingTxns.containsKey(txnId)) { ongoingTxns.put(txnId, (PositionImpl) position); @@ -99,7 +99,7 @@ public CompletableFuture commitTxn(TxnID txnID, long lowWaterMark) { topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { synchronized (TopicTransactionBuffer.this) { updateMaxReadPosition(txnID); handleLowWaterMark(txnID, lowWaterMark); @@ -126,7 +126,7 @@ public CompletableFuture abortTxn(TxnID txnID, long lowWaterMark) { ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits()); topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { synchronized (TopicTransactionBuffer.this) { aborts.put(txnID, (PositionImpl) position); updateMaxReadPosition(txnID); @@ -152,7 +152,7 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { txnID.getMostSigBits(), txnID.getLeastSigBits()); topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { synchronized (TopicTransactionBuffer.this) { aborts.put(firstTxn, (PositionImpl) position); updateMaxReadPosition(firstTxn); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index e0e302ffd94cf..3497b73e5f87e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -238,7 +238,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1), null); + ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete( + new PositionImpl(1, 1), null, null); return null; } }).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 57a98cd2f80ab..864405a6e7714 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1235,6 +1235,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1), + null, invocationOnMock.getArguments()[2]); return null; } @@ -1717,7 +1718,7 @@ public void testBacklogCursor() throws Exception { ByteBuf entry = getMessageWithMetadata(content.getBytes()); ledger.asyncAddEntry(entry, new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { latch.countDown(); entry.release(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 4bdbacb4c648b..13ef36c319d59 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -1531,7 +1531,9 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(-1, -1), + ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete( + new PositionImpl(-1, -1), + null, invocationOnMock.getArguments()[2]); return null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 0d44b8954b31e..11f98bda69030 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -189,7 +189,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Topic.PublishContext publishContext2 = getPublishContext(producerName2, 1); persistentTopic.publishMessage(byteBuf1, publishContext1); - persistentTopic.addComplete(new PositionImpl(0, 1), publishContext1); + persistentTopic.addComplete(new PositionImpl(0, 1), null, publishContext1); verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), any(), any()); Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertTrue(lastSequenceIdPushed != null); @@ -199,7 +199,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { assertEquals(lastSequenceIdPushed.longValue(), 0); persistentTopic.publishMessage(byteBuf2, publishContext2); - persistentTopic.addComplete(new PositionImpl(0, 2), publishContext2); + persistentTopic.addComplete(new PositionImpl(0, 2), null, publishContext2); verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), any(), any()); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2); assertTrue(lastSequenceIdPushed != null); @@ -211,7 +211,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { byteBuf1 = getMessage(producerName1, 1); publishContext1 = getPublishContext(producerName1, 1); persistentTopic.publishMessage(byteBuf1, publishContext1); - persistentTopic.addComplete(new PositionImpl(0, 3), publishContext1); + persistentTopic.addComplete(new PositionImpl(0, 3), null, publishContext1); verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), any(), any()); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertTrue(lastSequenceIdPushed != null); @@ -223,7 +223,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { byteBuf1 = getMessage(producerName1, 5); publishContext1 = getPublishContext(producerName1, 5); persistentTopic.publishMessage(byteBuf1, publishContext1); - persistentTopic.addComplete(new PositionImpl(0, 4), publishContext1); + persistentTopic.addComplete(new PositionImpl(0, 4), null, publishContext1); verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any()); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertTrue(lastSequenceIdPushed != null); @@ -263,7 +263,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { verify(publishContext1, times(1)).completed(any(MessageDeduplication.MessageDupUnknownException.class), eq(-1L), eq(-1L)); // complete seq 6 message eventually - persistentTopic.addComplete(new PositionImpl(0, 5), publishContext1); + persistentTopic.addComplete(new PositionImpl(0, 5), null, publishContext1); // simulate failure byteBuf1 = getMessage(producerName1, 7); @@ -300,7 +300,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { publishContext1 = getPublishContext(producerName1, 8); persistentTopic.publishMessage(byteBuf1, publishContext1); verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), any(), any()); - persistentTopic.addComplete(new PositionImpl(0, 5), publishContext1); + persistentTopic.addComplete(new PositionImpl(0, 5), null, publishContext1); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertTrue(lastSequenceIdPushed != null); assertEquals(lastSequenceIdPushed.longValue(), 8); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index 7cb36330a5a8a..890e052029fff 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -235,7 +235,7 @@ public void run() { final AddEntryCallback addEntryCallback = new AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { long sendTime = (Long) (ctx); messagesSent.increment(); bytesSent.add(payloadData.length); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index 333963907aaef..626a687fd8895 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -118,7 +118,7 @@ public CompletableFuture append(TransactionMetadataEntry transactionMe transactionMetadataEntry.writeTo(buf); managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() { @Override - public void addComplete(Position position, Object ctx) { + public void addComplete(Position position, ByteBuf entryData, Object ctx) { buf.release(); completableFuture.complete(position); } From 5a5369a7b3158bd22b6afea497ef825ddb69a416 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 22 Jan 2021 10:39:38 +0800 Subject: [PATCH 2/4] Fix data not released --- .../java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index d16546115f466..742ed1e1eb5c8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -208,6 +208,8 @@ public void safeRun() { ReferenceCountUtil.release(data); ml.notifyCursors(); this.recycle(); + } else { + ReferenceCountUtil.release(data); } } } From d609e413a4cb1e602b8fc909d00062cd45265812 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 22 Jan 2021 12:11:49 +0800 Subject: [PATCH 3/4] Add tests for AddEntryCallback#addComplete --- .../mledger/impl/ManagedLedgerTest.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index cdaa931934cc6..f6262487b1659 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -39,6 +39,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; +import java.nio.ReadOnlyBufferException; import java.nio.charset.Charset; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -540,6 +541,14 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { counter.await(); } + private byte[] copyBytesFromByteBuf(final ByteBuf buf) { + final int index = buf.readerIndex(); + final byte[] bytes = new byte[buf.readableBytes()]; + buf.getBytes(index, bytes); + buf.readerIndex(index); + return bytes; + } + @Test(timeOut = 20000) public void asyncAddEntryWithoutError() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); @@ -547,10 +556,19 @@ public void asyncAddEntryWithoutError() throws Exception { final CountDownLatch counter = new CountDownLatch(1); - ledger.asyncAddEntry("dummy-entry-1".getBytes(Encoding), new AddEntryCallback() { + final byte[] bytes = "dummy-entry-1".getBytes(Encoding); + ledger.asyncAddEntry(bytes, new AddEntryCallback() { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { assertNull(ctx); + assertEquals(copyBytesFromByteBuf(entryData), bytes); + + // `entryData` is read-only so that write related methods will throw ReadOnlyBufferException + try { + entryData.array(); + } catch (Exception e) { + assertTrue(e instanceof ReadOnlyBufferException); + } counter.countDown(); } @@ -580,6 +598,7 @@ public void doubleAsyncAddEntryWithoutError() throws Exception { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { assertNotNull(ctx); + assertEquals(copyBytesFromByteBuf(entryData), content.getBytes(Encoding)); log.info("Successfully added {}", content); done.countDown(); From 9f7809e676189d28c8622782fe49a4bbc6c64e30 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 22 Jan 2021 15:27:10 +0800 Subject: [PATCH 4/4] Add tests for PublishContext#setMetadataFromEntryData --- .../broker/service/PersistentTopicTest.java | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 864405a6e7714..52a67cb89ed19 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -289,6 +289,14 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { @Test public void testPublishMessage() throws Exception { + doAnswer(invocationOnMock -> { + final ByteBuf payload = (ByteBuf) invocationOnMock.getArguments()[0]; + final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[1]; + final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[2]; + callback.addComplete(PositionImpl.latest, payload, ctx); + return null; + }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any()); + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); /* * MessageMetadata.Builder messageMetadata = MessageMetadata.newBuilder(); @@ -299,9 +307,23 @@ public void testPublishMessage() throws Exception { final CountDownLatch latch = new CountDownLatch(1); - topic.publishMessage(payload, (exception, ledgerId, entryId) -> { - latch.countDown(); - }); + final Topic.PublishContext publishContext = new Topic.PublishContext() { + @Override + public void completed(Exception e, long ledgerId, long entryId) { + assertEquals(ledgerId, PositionImpl.latest.getLedgerId()); + assertEquals(entryId, PositionImpl.latest.getEntryId()); + latch.countDown(); + } + + @Override + public void setMetadataFromEntryData(ByteBuf entryData) { + // This method must be invoked before `completed` + assertEquals(latch.getCount(), 1); + assertEquals(entryData.array(), payload.array()); + } + }; + + topic.publishMessage(payload, publishContext); assertTrue(latch.await(1, TimeUnit.SECONDS)); }