From 7206283685d4035ca3ce155df736e6be4dda7072 Mon Sep 17 00:00:00 2001 From: xiangying <1984997880@qq.com> Date: Thu, 16 Nov 2023 12:29:25 +0800 Subject: [PATCH 01/17] refactor: change the method signature of `checkIfTBRecoverCompletely` --- .../org/apache/pulsar/broker/service/Topic.java | 9 ++++----- .../service/nonpersistent/NonPersistentTopic.java | 2 +- .../broker/service/persistent/PersistentTopic.java | 4 ++-- .../transaction/buffer/TransactionBuffer.java | 13 ++++++------- .../buffer/impl/InMemTransactionBuffer.java | 2 +- .../buffer/impl/TransactionBufferDisable.java | 2 +- 6 files changed, 15 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index c697639ff4fa1..03ebce1d736bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -146,12 +146,11 @@ default void setEntryTimestamp(long entryTimestamp) { void removeProducer(Producer producer); /** - * Wait TransactionBuffer Recovers completely. - * Take snapshot after TB Recovers completely. - * @param isTxnEnabled - * @return a future which has completely if isTxn = false. Or a future return by takeSnapshot. + * Wait TransactionBuffer recovers completely. + * + * @return a future that will be completed after the transaction buffer recover completely. */ - CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled); + CompletableFuture checkIfTransactionBufferRecoverCompletely(); /** * record add-latency. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 76e9f261ca6a9..29710b6577107 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -254,7 +254,7 @@ public void removeProducer(Producer producer) { } @Override - public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) { + public CompletableFuture checkIfTransactionBufferRecoverCompletely() { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4d35d284d3299..d9b952eb5d0a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -712,8 +712,8 @@ public CompletableFuture> addProducer(Producer producer, } @Override - public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) { - return getTransactionBuffer().checkIfTBRecoverCompletely(isTxnEnabled); + public CompletableFuture checkIfTransactionBufferRecoverCompletely() { + return getTransactionBuffer().checkIfTBRecoverCompletely(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index 7eb5d6f789c22..1416b9e64753d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -188,14 +188,13 @@ public interface TransactionBuffer { TransactionBufferStats getStats(boolean lowWaterMarks); /** - * Wait TransactionBuffer Recovers completely. - * Take snapshot after TB Recovers completely. - * @param isTxn - * @return a future which has completely if isTxn = false. Or a future return by takeSnapshot. + * Wait TransactionBuffer recovers completely. + * + * @return a future that will be completed after the transaction buffer recover completely. */ - CompletableFuture checkIfTBRecoverCompletely(boolean isTxn); - - + default CompletableFuture checkIfTBRecoverCompletely() { + return CompletableFuture.completedFuture(null); + } long getOngoingTxnCount(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index bc2dd58a5812e..0f14769cfbcaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -397,7 +397,7 @@ public TransactionBufferStats getStats(boolean lowWaterMarks) { @Override - public CompletableFuture checkIfTBRecoverCompletely(boolean isTxn) { + public CompletableFuture checkIfTBRecoverCompletely() { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index 7c74b52951e28..6cd4e1887a10b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -115,7 +115,7 @@ public TransactionBufferStats getStats(boolean lowWaterMarks) { } @Override - public CompletableFuture checkIfTBRecoverCompletely(boolean isTxn) { + public CompletableFuture checkIfTBRecoverCompletely() { return CompletableFuture.completedFuture(null); } From 87f79c825db4d2e83ca0fc3dd998fb2a6802ba12 Mon Sep 17 00:00:00 2001 From: xiangying <1984997880@qq.com> Date: Thu, 16 Nov 2023 13:03:12 +0800 Subject: [PATCH 02/17] fix:Write first snapshot when send first transactional message instead of building producer. --- .../pulsar/broker/service/ServerCnx.java | 87 ++++++++----------- .../buffer/impl/TopicTransactionBuffer.java | 71 +++++++++------ 2 files changed, 80 insertions(+), 78 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 95f139dc11e43..0a8511192819e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1501,66 +1501,53 @@ protected void handleProducer(final CommandProducer cmdProducer) { }); schemaVersionFuture.thenAccept(schemaVersion -> { - topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> { - CompletionStage createInitSubFuture; - if (!Strings.isNullOrEmpty(initialSubscriptionName) - && topic.isPersistent() - && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { - createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) - .thenCompose(isAllowAutoSubscriptionCreation -> { - if (!isAllowAutoSubscriptionCreation) { - return CompletableFuture.failedFuture( - new BrokerServiceException.NotAllowedException( - "Could not create the initial subscription due to" - + " the auto subscription creation is not allowed.")); - } - return topic.createSubscription(initialSubscriptionName, - InitialPosition.Earliest, false, null); - }); - } else { - createInitSubFuture = CompletableFuture.completedFuture(null); - } - - createInitSubFuture.whenComplete((sub, ex) -> { - if (ex != null) { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - if (rc instanceof BrokerServiceException.NotAllowedException) { - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); - if (producerFuture.completeExceptionally(rc)) { - commandSender.sendErrorResponse(requestId, - ServerError.NotAllowedError, rc.getMessage()); + CompletionStage createInitSubFuture; + if (!Strings.isNullOrEmpty(initialSubscriptionName) + && topic.isPersistent() + && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { + createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + if (!isAllowAutoSubscriptionCreation) { + return CompletableFuture.failedFuture( + new BrokerServiceException.NotAllowedException( + "Could not create the initial subscription due to the " + + "auto subscription creation is not allowed.")); } - producers.remove(producerId, producerFuture); - return; - } - String msg = - "Failed to create the initial subscription: " + ex.getCause().getMessage(); + return topic.createSubscription(initialSubscriptionName, + InitialPosition.Earliest, false, null); + }); + } else { + createInitSubFuture = CompletableFuture.completedFuture(null); + } + + createInitSubFuture.whenComplete((sub, ex) -> { + if (ex != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + if (rc instanceof BrokerServiceException.NotAllowedException) { log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, msg, initialSubscriptionName, topicName); - if (producerFuture.completeExceptionally(ex)) { + remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(rc)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(ex), msg); + ServerError.NotAllowedError, rc.getMessage()); } producers.remove(producerId, producerFuture); return; } + String msg = + "Failed to create the initial subscription: " + ex.getCause().getMessage(); + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, msg, initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(ex)) { + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(ex), msg); + } + producers.remove(producerId, producerFuture); + return; + } - buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, + buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName, topicName, producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture); - }); - }).exceptionally(exception -> { - Throwable cause = exception.getCause(); - log.error("producerId {}, requestId {} : TransactionBuffer recover failed", - producerId, requestId, exception); - if (producerFuture.completeExceptionally(exception)) { - commandSender.sendErrorResponse(requestId, - ServiceUnitNotReadyException.getClientErrorCode(cause), - cause.getMessage()); - } - producers.remove(producerId, producerFuture); - return null; }); }); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 3c13be220869f..65d1cb9f01b5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RecoverTimeRecord; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; @@ -89,6 +90,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final int takeSnapshotIntervalTime; private final CompletableFuture transactionBufferFuture = new CompletableFuture<>(); + private CompletableFuture publishFuture = transactionBufferFuture.thenApply(__ -> PositionImpl.EARLIEST); /** * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC. @@ -208,34 +210,8 @@ public CompletableFuture getTransactionMeta(TxnID txnID) { } @Override - public CompletableFuture checkIfTBRecoverCompletely(boolean isTxnEnabled) { - if (!isTxnEnabled) { - return CompletableFuture.completedFuture(null); - } else { - CompletableFuture completableFuture = new CompletableFuture<>(); - transactionBufferFuture.thenRun(() -> { - if (checkIfNoSnapshot()) { - snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { - if (changeToReadyStateFromNoSnapshot()) { - timer.newTimeout(TopicTransactionBuffer.this, - takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - } - completableFuture.complete(null); - }).exceptionally(exception -> { - log.error("Topic {} failed to take snapshot", this.topic.getName()); - completableFuture.completeExceptionally(exception); - return null; - }); - } else { - completableFuture.complete(null); - } - }).exceptionally(exception -> { - log.error("Topic {}: TransactionBuffer recover failed", this.topic.getName(), exception.getCause()); - completableFuture.completeExceptionally(exception.getCause()); - return null; - }); - return completableFuture; - } + public CompletableFuture checkIfTBRecoverCompletely() { + return transactionBufferFuture; } @Override @@ -255,6 +231,45 @@ public long getCommittedTxnCount() { @Override public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { + // Method `takeAbortedTxnsSnapshot` will be executed in the different thread. + // So we need to retain the buffer in this thread. It will be released after message persistent. + buffer.retain(); + CompletableFuture future = publishFuture.thenCompose(ignore -> { + if (checkIfNoSnapshot()) { + CompletableFuture completableFuture = new CompletableFuture<>(); + // `publishFuture` will be completed after message persistent, so there will not be two threads + // writing snapshots at the same time. + snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { + if (changeToReadyStateFromNoSnapshot()) { + timer.newTimeout(TopicTransactionBuffer.this, + takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); + completableFuture.complete(null); + } else { + log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", + topic.getName()); + completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction Buffer take first snapshot failed, the current state is: " + getState())); + } + }).exceptionally(exception -> { + log.error("Topic {} failed to take snapshot", this.topic.getName()); + completableFuture.completeExceptionally(exception); + return null; + }); + return completableFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); + } else if (checkIfReady()) { + return internalAppendBufferToTxn(txnId, buffer); + } else { + // `publishFuture` will be completed after transaction buffer recover completely + // during initializing, so this case should not happen. + return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction Buffer recover failed, the current state is: " + getState())); + } + }).whenComplete(((position, throwable) -> buffer.release())); + publishFuture = future; + return future; + } + + private CompletableFuture internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer) { CompletableFuture completableFuture = new CompletableFuture<>(); Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits()); if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) { From 134472e3437a4919441cad77f12a2a620b72fa68 Mon Sep 17 00:00:00 2001 From: xiangying <1984997880@qq.com> Date: Thu, 16 Nov 2023 13:20:23 +0800 Subject: [PATCH 03/17] test: Add test. 1. `testWriteSnapshotWhenFirstTxnMessageSend` mainly test what we want to do. -- Take first snapshot when sending first transaction message. 2. `testMessagePublishInOrder` and `testRefCountWhenAppendBufferToTxn` test the message publish as expected. -- in order and no memory leak. --- pulsar-broker/pom.xml | 7 + .../broker/transaction/TransactionTest.java | 2 +- .../buffer/TopicTransactionBufferTest.java | 154 ++++++++++++++++++ 3 files changed, 162 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 46f2eb95175c9..00de7e8ff21d3 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -451,6 +451,13 @@ pulsar-package-filesystem-storage ${project.version} + + org.powermock + powermock-api-mockito2 + 2.0.9 + test + + diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 905da9379ec9e..185e9a6710526 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1598,7 +1598,7 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout persistentTopic.set(new PersistentTopic("topic-a", managedLedger, brokerService)); try { // Do check. - persistentTopic.get().checkIfTransactionBufferRecoverCompletely(true).get(5, TimeUnit.SECONDS); + persistentTopic.get().checkIfTransactionBufferRecoverCompletely().get(5, TimeUnit.SECONDS); fail("Expect failure by TB closed, but it is finished."); } catch (ExecutionException executionException){ Throwable t = executionException.getCause(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index a50363c861caa..c7b6a6a12df23 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,9 +18,14 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.lang.reflect.Field; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; @@ -30,8 +35,13 @@ import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -39,6 +49,7 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.mockito.Mockito; +import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -179,4 +190,147 @@ public void testCloseTransactionBufferWhenTimeout() throws Exception { Assert.assertTrue(f.isCompletedExceptionally()); } + /** + * This test verifies the state changes of a TransactionBuffer within a topic under different conditions. + * Initially, the TransactionBuffer is in a NoSnapshot state upon topic creation. + * It remains in the NoSnapshot state even after a normal message is sent. + * The state changes to Ready only after a transactional message is sent. + * The test also ensures that the TransactionBuffer can be correctly recovered after the topic is unloaded. + */ + @Test + public void testWriteSnapshotWhenFirstTxnMessageSend() throws Exception { + // 1. Prepare test environment. + String topic = "persistent://" + NAMESPACE1 + "/testWriteSnapshotWhenFirstTxnMessageSend"; + String txnMsg = "transaction message"; + String normalMsg = "normal message"; + admin.topics().createNonPartitionedTopic(topic); + PersistentTopic persistentTopic = (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(topic, false) + .get() + .get(); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("my-sub") + .subscribe(); + // 2. Test the state of transaction buffer after building producer with no new messages. + // The TransactionBuffer should be in NoSnapshot state before transaction message sent. + TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(topicTransactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + }); + // 3. Test the state of transaction buffer after sending normal messages. + // The TransactionBuffer should still be in NoSnapshot state after a normal message is sent. + producer.newMessage().value(normalMsg).send(); + Assert.assertEquals(topicTransactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + // 4. Test the state of transaction buffer after sending transaction messages. + // The transaction buffer should be in Ready state at this time. + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build() + .get(); + producer.newMessage(transaction).value(txnMsg).send(); + Assert.assertEquals(topicTransactionBuffer.getState(), TopicTransactionBufferState.State.Ready); + // 5. Test transaction buffer can be recovered correctly. + // There are 4 message sent to this topic, 2 normal message and 2 transaction message |m1|m2-txn1|m3-txn1|m4|. + // Aborting the transaction and unload the topic and then redelivering unacked messages, + // only normal messages can be received. + transaction.abort().get(5, TimeUnit.SECONDS); + producer.newMessage().value(normalMsg).send(); + admin.topics().unload(topic); + PersistentTopic persistentTopic2 = (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(topic, false) + .get() + .get(); + TopicTransactionBuffer topicTransactionBuffer2 = (TopicTransactionBuffer) persistentTopic2 + .getTransactionBuffer(); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(topicTransactionBuffer2.getState(), TopicTransactionBufferState.State.Ready); + }); + consumer.redeliverUnacknowledgedMessages(); + for (int i = 0; i < 2; i++) { + Message message = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertEquals(message.getValue(), normalMsg); + } + Message message = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertNull(message); + } + + /** + * Send some messages before transaction buffer ready and then send some messages after transaction buffer ready, + * these messages should be received in order. + */ + @Test + public void testMessagePublishInOrder() throws Exception { + // 1. Prepare test environment. + String topic = "persistent://" + NAMESPACE1 + "/testMessagePublishInOrder" + RandomUtils.nextLong(); + admin.topics().createNonPartitionedTopic(topic); + PersistentTopic persistentTopic = (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(topic, false) + .get() + .get(); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .subscribe(); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build().get(); + // 2. Set a new future in transaction buffer as `transactionBufferFuture` to stimulate whether the + // transaction buffer recover completely. + TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); + CompletableFuture completableFuture = new CompletableFuture<>(); + Whitebox.setInternalState(topicTransactionBuffer, "transactionBufferFuture", completableFuture); + Field stateField = TopicTransactionBufferState.class.getDeclaredField("state"); + stateField.setAccessible(true); + stateField.set(topicTransactionBuffer, TopicTransactionBufferState.State.Ready); + // Register this topic to the transaction in advance to avoid the sending request pending here. + ((TransactionImpl) transaction).registerProducedTopic(topic).get(5, TimeUnit.SECONDS); + // 3. Test the messages sent before transaction buffer ready is in order. + for (int i = 0; i < 50; i++) { + producer.newMessage(transaction).value(i).sendAsync(); + } + // 4. Test the messages sent after transaction buffer ready is in order. + completableFuture.complete(null); + for (int i = 50; i < 100; i++) { + producer.newMessage(transaction).value(i).sendAsync(); + } + transaction.commit().get(); + for (int i = 0; i < 100; i++) { + Message message = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertEquals(message.getValue(), i); + } + } + + /** + * Test `testMessagePublishInOrder` will test the ref count work as expected with no exception. + * And this test is used to test the memory leak due to ref count. + */ + @Test + public void testRefCountWhenAppendBufferToTxn() throws Exception { + // 1. Prepare test resource + String topic = "persistent://" + NAMESPACE1 + "/testRefCountWhenAppendBufferToTxn"; + admin.topics().createNonPartitionedTopic(topic); + PersistentTopic persistentTopic = (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(topic, false) + .get() + .get(); + TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); + // 2. Test reference count does not change in the method `appendBufferToTxn`. + ByteBuf byteBuf = Unpooled.buffer(); + topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf) + .get(5, TimeUnit.SECONDS); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf.refCnt(), 1)); + // 3. release resource + byteBuf.release(); + } } From bfc64e75615e94e6846ad0a6045876eec7a61732 Mon Sep 17 00:00:00 2001 From: xiangying <1984997880@qq.com> Date: Fri, 17 Nov 2023 23:54:34 +0800 Subject: [PATCH 04/17] test: improve testRefCountWhenAppendBufferToTxn Add cases for sending second transaction message adn send transaction message failed. --- .../buffer/TopicTransactionBufferTest.java | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index c7b6a6a12df23..d7158b026d337 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.fail; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.lang.reflect.Field; @@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; @@ -326,11 +329,32 @@ public void testRefCountWhenAppendBufferToTxn() throws Exception { .get(); TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); // 2. Test reference count does not change in the method `appendBufferToTxn`. - ByteBuf byteBuf = Unpooled.buffer(); - topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf) + // 2.1 Test sending first transaction message, this will take a snapshot. + ByteBuf byteBuf1 = Unpooled.buffer(); + topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) .get(5, TimeUnit.SECONDS); - Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf.refCnt(), 1)); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf1.refCnt(), 1)); + // 2.2 Test send the second transaction message, this will not take snapshots. + ByteBuf byteBuf2 = Unpooled.buffer(); + topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) + .get(5, TimeUnit.SECONDS); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf2.refCnt(), 1)); + // 2.3 Test sending message failed. + Field publishFutureField = TopicTransactionBuffer.class.getDeclaredField("publishFuture"); + publishFutureField.setAccessible(true); + publishFutureField.set(topicTransactionBuffer, FutureUtil.failedFuture(new Exception("fail"))); + ByteBuf byteBuf3 = Unpooled.buffer(); + try { + topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) + .get(5, TimeUnit.SECONDS); + fail(); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), "fail"); + } + Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf3.refCnt(), 1)); // 3. release resource - byteBuf.release(); + byteBuf1.release(); + byteBuf2.release(); + byteBuf3.release(); } } From c60f300bb11baf06e455d855feab4c17cbbe3b5b Mon Sep 17 00:00:00 2001 From: xiangying Date: Tue, 23 Jul 2024 16:31:27 +0800 Subject: [PATCH 05/17] Resolve conflict --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +- .../transaction/buffer/impl/TopicTransactionBuffer.java | 3 ++- .../transaction/buffer/TopicTransactionBufferTest.java | 9 +-------- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c2a5ffe3b8e5f..921d138be757e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2206,7 +2206,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) long requestId = getLastMessageId.getRequestId(); Topic topic = consumer.getSubscription().getTopic(); - topic.checkIfTransactionBufferRecoverCompletely(true) + topic.checkIfTransactionBufferRecoverCompletely() .thenCompose(__ -> topic.getLastDispatchablePosition()) .thenApply(lastPosition -> { int partitionIndex = TopicName.getPartitionIndex(topic.getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 67085274a5b7a..a71e21341edae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -91,7 +91,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final int takeSnapshotIntervalTime; private final CompletableFuture transactionBufferFuture = new CompletableFuture<>(); - private CompletableFuture publishFuture = transactionBufferFuture.thenApply(__ -> PositionImpl.EARLIEST); + private CompletableFuture publishFuture = transactionBufferFuture + .thenApply(__ -> PositionFactory.EARLIEST); /** * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 1ecfbe2119c8f..3cabb8aff7d30 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -26,11 +26,8 @@ import lombok.Cleanup; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.when; -import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; -import static org.testng.AssertJUnit.fail; import io.opentelemetry.api.common.Attributes; import java.time.Duration; import java.util.Collections; @@ -41,7 +38,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.PositionFactory; @@ -59,7 +55,6 @@ import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; @@ -242,9 +237,7 @@ public void testGetMaxPositionAfterTBReady() throws Exception { String topic = "persistent://" + NAMESPACE1 + "/testGetMaxReadyPositionAfterTBReady"; // 1.1 Mock component. TransactionBuffer transactionBuffer = Mockito.spy(TransactionBuffer.class); - when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean())) - // Handle producer will check transaction buffer recover completely. - .thenReturn(CompletableFuture.completedFuture(null)) + when(transactionBuffer.checkIfTBRecoverCompletely()) // If the Transaction buffer failed to recover, we can not get the correct last max read id. .thenReturn(CompletableFuture.failedFuture(new Throwable("Mock fail"))) // If the transaction buffer recover successfully, the max read position can be acquired successfully. From 078771c322d340dead7ce3b2395d6fd600a9a6bc Mon Sep 17 00:00:00 2001 From: xiangying Date: Tue, 23 Jul 2024 16:58:31 +0800 Subject: [PATCH 06/17] Resolve conflict --- docker/pulsar/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/docker/pulsar/pom.xml b/docker/pulsar/pom.xml index 6f2b436b3c9e7..228c2b810313d 100644 --- a/docker/pulsar/pom.xml +++ b/docker/pulsar/pom.xml @@ -1,4 +1,3 @@ -