diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d1fe9776e079d..a5c09d2892342 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1616,66 +1616,53 @@ protected void handleProducer(final CommandProducer cmdProducer) { }); schemaVersionFuture.thenAccept(schemaVersion -> { - topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> { - CompletionStage createInitSubFuture; - if (!Strings.isNullOrEmpty(initialSubscriptionName) - && topic.isPersistent() - && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { - createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) - .thenCompose(isAllowAutoSubscriptionCreation -> { - if (!isAllowAutoSubscriptionCreation) { - return CompletableFuture.failedFuture( - new BrokerServiceException.NotAllowedException( - "Could not create the initial subscription due to" - + " the auto subscription creation is not allowed.")); - } - return topic.createSubscription(initialSubscriptionName, - InitialPosition.Earliest, false, null); - }); - } else { - createInitSubFuture = CompletableFuture.completedFuture(null); - } - - createInitSubFuture.whenComplete((sub, ex) -> { - if (ex != null) { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - if (rc instanceof BrokerServiceException.NotAllowedException) { - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); - if (producerFuture.completeExceptionally(rc)) { - commandSender.sendErrorResponse(requestId, - ServerError.NotAllowedError, rc.getMessage()); + CompletionStage createInitSubFuture; + if (!Strings.isNullOrEmpty(initialSubscriptionName) + && topic.isPersistent() + && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { + createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + if (!isAllowAutoSubscriptionCreation) { + return CompletableFuture.failedFuture( + new BrokerServiceException.NotAllowedException( + "Could not create the initial subscription due to the " + + "auto subscription creation is not allowed.")); } - producers.remove(producerId, producerFuture); - return; - } - String msg = - "Failed to create the initial subscription: " + ex.getCause().getMessage(); + return topic.createSubscription(initialSubscriptionName, + InitialPosition.Earliest, false, null); + }); + } else { + createInitSubFuture = CompletableFuture.completedFuture(null); + } + + createInitSubFuture.whenComplete((sub, ex) -> { + if (ex != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + if (rc instanceof BrokerServiceException.NotAllowedException) { log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, msg, initialSubscriptionName, topicName); - if (producerFuture.completeExceptionally(ex)) { + remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(rc)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(ex), msg); + ServerError.NotAllowedError, rc.getMessage()); } producers.remove(producerId, producerFuture); return; } + String msg = + "Failed to create the initial subscription: " + ex.getCause().getMessage(); + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, msg, initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(ex)) { + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(ex), msg); + } + producers.remove(producerId, producerFuture); + return; + } - buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, + buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName, topicName, producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture); - }); - }).exceptionally(exception -> { - Throwable cause = exception.getCause(); - log.error("producerId {}, requestId {} : TransactionBuffer recover failed", - producerId, requestId, exception); - if (producerFuture.completeExceptionally(exception)) { - commandSender.sendErrorResponse(requestId, - ServiceUnitNotReadyException.getClientErrorCode(cause), - cause.getMessage()); - } - producers.remove(producerId, producerFuture); - return null; }); }); }); @@ -2249,7 +2236,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) long requestId = getLastMessageId.getRequestId(); Topic topic = consumer.getSubscription().getTopic(); - topic.checkIfTransactionBufferRecoverCompletely(true) + topic.checkIfTransactionBufferRecoverCompletely() .thenCompose(__ -> topic.getLastDispatchablePosition()) .thenApply(lastPosition -> { int partitionIndex = TopicName.getPartitionIndex(topic.getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 50a28c7979277..3ec09e9bfcd28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -146,12 +146,11 @@ default void setEntryTimestamp(long entryTimestamp) { void removeProducer(Producer producer); /** - * Wait TransactionBuffer Recovers completely. - * Take snapshot after TB Recovers completely. - * @param isTxnEnabled isTxnEnabled - * @return a future which has completely if isTxn = false. Or a future return by takeSnapshot. + * Wait TransactionBuffer recovers completely. + * + * @return a future that will be completed after the transaction buffer recover completely. */ - CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled); + CompletableFuture checkIfTransactionBufferRecoverCompletely(); /** * record add-latency. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 9456870589191..1b98ee2f8306d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -258,7 +258,7 @@ public boolean isReplicationBacklogExist() { } @Override - public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) { + public CompletableFuture checkIfTransactionBufferRecoverCompletely() { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 146ac05d695d5..d814e7ce11599 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -836,8 +836,8 @@ public CompletableFuture> addProducer(Producer producer, } @Override - public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) { - return getTransactionBuffer().checkIfTBRecoverCompletely(isTxnEnabled); + public CompletableFuture checkIfTransactionBufferRecoverCompletely() { + return getTransactionBuffer().checkIfTBRecoverCompletely(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index b379c4d1db10c..874f4c1c28a02 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -187,14 +187,11 @@ public interface TransactionBuffer { TransactionBufferStats getStats(boolean lowWaterMarks); /** - * Wait TransactionBuffer Recovers completely. - * Take snapshot after TB Recovers completely. - * @param isTxn - * @return a future which has completely if isTxn = false. Or a future return by takeSnapshot. + * Wait TransactionBuffer recovers completely. + * + * @return a future that will be completed after the transaction buffer recover completely. */ - CompletableFuture checkIfTBRecoverCompletely(boolean isTxn); - - + CompletableFuture checkIfTBRecoverCompletely(); long getOngoingTxnCount(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index ae755f0715ee2..4da7a48e96c51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -411,7 +411,7 @@ public TransactionBufferStats getStats(boolean lowWaterMarks) { @Override - public CompletableFuture checkIfTBRecoverCompletely(boolean isTxn) { + public CompletableFuture checkIfTBRecoverCompletely() { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 7561457d11f8e..2f90ff8922a81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -57,6 +57,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RecoverTimeRecord; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; @@ -89,8 +90,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final int takeSnapshotIntervalTime; + private final CompletableFuture transactionBufferFuture = new CompletableFuture<>(); + private CompletableFuture publishFuture = getTransactionBufferFuture() + .thenApply(__ -> PositionFactory.EARLIEST); + /** * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC. */ @@ -138,14 +143,14 @@ public void recoverComplete() { if (!changeToReadyState()) { log.error("[{}]Transaction buffer recover fail, current state: {}", topic.getName(), getState()); - transactionBufferFuture.completeExceptionally + getTransactionBufferFuture().completeExceptionally (new BrokerServiceException.ServiceUnitNotReadyException( "Transaction buffer recover failed to change the status to Ready," + "current state is: " + getState())); } else { timer.newTimeout(TopicTransactionBuffer.this, takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - transactionBufferFuture.complete(null); + getTransactionBufferFuture().complete(null); recoverTime.setRecoverEndTime(System.currentTimeMillis()); } } @@ -158,7 +163,7 @@ public void noNeedToRecover() { if (!changeToNoSnapshotState()) { log.error("[{}]Transaction buffer recover fail", topic.getName()); } else { - transactionBufferFuture.complete(null); + getTransactionBufferFuture().complete(null); recoverTime.setRecoverEndTime(System.currentTimeMillis()); } } @@ -196,10 +201,10 @@ public void recoverExceptionally(Throwable e) { // if transaction buffer recover fail throw PulsarClientException, // we need to change the PulsarClientException to ServiceUnitNotReadyException, // the tc do op will retry - transactionBufferFuture.completeExceptionally + getTransactionBufferFuture().completeExceptionally (new BrokerServiceException.ServiceUnitNotReadyException(e.getMessage(), e)); } else { - transactionBufferFuture.completeExceptionally(e); + getTransactionBufferFuture().completeExceptionally(e); } recoverTime.setRecoverEndTime(System.currentTimeMillis()); topic.close(true); @@ -212,35 +217,19 @@ public CompletableFuture getTransactionMeta(TxnID txnID) { return CompletableFuture.completedFuture(null); } + @VisibleForTesting + public CompletableFuture getPublishFuture() { + return publishFuture; + } + + @VisibleForTesting + public CompletableFuture getTransactionBufferFuture() { + return transactionBufferFuture; + } + @Override - public CompletableFuture checkIfTBRecoverCompletely(boolean isTxnEnabled) { - if (!isTxnEnabled) { - return CompletableFuture.completedFuture(null); - } else { - CompletableFuture completableFuture = new CompletableFuture<>(); - transactionBufferFuture.thenRun(() -> { - if (checkIfNoSnapshot()) { - snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { - if (changeToReadyStateFromNoSnapshot()) { - timer.newTimeout(TopicTransactionBuffer.this, - takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - } - completableFuture.complete(null); - }).exceptionally(exception -> { - log.error("Topic {} failed to take snapshot", this.topic.getName()); - completableFuture.completeExceptionally(exception); - return null; - }); - } else { - completableFuture.complete(null); - } - }).exceptionally(exception -> { - log.error("Topic {}: TransactionBuffer recover failed", this.topic.getName(), exception.getCause()); - completableFuture.completeExceptionally(exception.getCause()); - return null; - }); - return completableFuture; - } + public CompletableFuture checkIfTBRecoverCompletely() { + return getTransactionBufferFuture(); } @Override @@ -260,6 +249,45 @@ public long getCommittedTxnCount() { @Override public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { + // Method `takeAbortedTxnsSnapshot` will be executed in the different thread. + // So we need to retain the buffer in this thread. It will be released after message persistent. + buffer.retain(); + CompletableFuture future = getPublishFuture().thenCompose(ignore -> { + if (checkIfNoSnapshot()) { + CompletableFuture completableFuture = new CompletableFuture<>(); + // `publishFuture` will be completed after message persistent, so there will not be two threads + // writing snapshots at the same time. + snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { + if (changeToReadyStateFromNoSnapshot()) { + timer.newTimeout(TopicTransactionBuffer.this, + takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); + completableFuture.complete(null); + } else { + log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", + topic.getName()); + completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction Buffer take first snapshot failed, the current state is: " + getState())); + } + }).exceptionally(exception -> { + log.error("Topic {} failed to take snapshot", this.topic.getName()); + completableFuture.completeExceptionally(exception); + return null; + }); + return completableFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); + } else if (checkIfReady()) { + return internalAppendBufferToTxn(txnId, buffer); + } else { + // `publishFuture` will be completed after transaction buffer recover completely + // during initializing, so this case should not happen. + return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction Buffer recover failed, the current state is: " + getState())); + } + }).whenComplete(((position, throwable) -> buffer.release())); + publishFuture = future; + return future; + } + + private CompletableFuture internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer) { CompletableFuture completableFuture = new CompletableFuture<>(); Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits()); if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) { @@ -314,7 +342,7 @@ public CompletableFuture commitTxn(TxnID txnID, long lowWaterMark) { } CompletableFuture completableFuture = new CompletableFuture<>(); //Wait TB recover completely. - transactionBufferFuture.thenRun(() -> { + getTransactionBufferFuture().thenRun(() -> { ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits()); try { @@ -356,7 +384,7 @@ public CompletableFuture abortTxn(TxnID txnID, long lowWaterMark) { } CompletableFuture completableFuture = new CompletableFuture<>(); //Wait TB recover completely. - transactionBufferFuture.thenRun(() -> { + getTransactionBufferFuture().thenRun(() -> { //no message sent, need not to add abort mark by txn timeout. if (!checkIfReady()) { completableFuture.complete(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index d0efc47c49544..d4fd071fef8a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -132,7 +132,7 @@ public TransactionBufferStats getStats(boolean lowWaterMarks) { } @Override - public CompletableFuture checkIfTBRecoverCompletely(boolean isTxn) { + public CompletableFuture checkIfTBRecoverCompletely() { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 246ab5ef26a8f..3b3eaf7bb2292 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -595,6 +595,11 @@ public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception { .topic(topic).enableBatching(true) .create(); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); + producer.newMessage(transaction).send(); + transaction.abort().get(); + Awaitility.await().untilAsserted(() -> { Message message1 = reader.readNext(); TransactionBufferSnapshot snapshot1 = message1.getValue(); @@ -608,7 +613,7 @@ public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception { Awaitility.await().untilAsserted(() -> { Message message1 = reader.readNext(); TransactionBufferSnapshot snapshot1 = message1.getValue(); - Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 1); + Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 3); }); } @@ -716,7 +721,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception { .sendTimeout(0, TimeUnit.SECONDS) .create(); - Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady())); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfNoSnapshot())); //test publishing txn messages will not change maxReadPosition if don`t commit or abort. Transaction transaction = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); @@ -1657,7 +1662,7 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout persistentTopic.set(new PersistentTopic("topic-a", managedLedger, brokerService)); try { // Do check. - persistentTopic.get().checkIfTransactionBufferRecoverCompletely(true).get(5, TimeUnit.SECONDS); + persistentTopic.get().checkIfTransactionBufferRecoverCompletely().get(5, TimeUnit.SECONDS); fail("Expect failure by TB closed, but it is finished."); } catch (ExecutionException executionException){ Throwable t = executionException.getCause(); @@ -1815,8 +1820,6 @@ public void testTBSnapshotWriter() throws Exception { .createAsync(); getTopic("persistent://" + topic + "-partition-0"); Thread.sleep(3000); - // the producer shouldn't be created, because the transaction buffer snapshot writer future didn't finish. - assertFalse(producerFuture.isDone()); // The topic will be closed, because the transaction buffer snapshot writer future is failed, // the failed writer future will be removed, the producer will be reconnected and work well. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index dea79f391e39a..1ab97eb457a05 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,13 +18,15 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.fail; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import lombok.Cleanup; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.when; -import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; -import static org.testng.AssertJUnit.fail; import io.opentelemetry.api.common.Attributes; import java.time.Duration; import java.util.Collections; @@ -35,11 +37,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import lombok.Cleanup; + import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; @@ -50,16 +54,23 @@ import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; +import org.apache.pulsar.broker.transaction.buffer.utils.TransactionBufferTestImpl; +import org.apache.pulsar.broker.transaction.buffer.utils.TransactionBufferTestProvider; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -228,9 +239,7 @@ public void testGetMaxPositionAfterTBReady() throws Exception { String topic = "persistent://" + NAMESPACE1 + "/testGetMaxReadyPositionAfterTBReady"; // 1.1 Mock component. TransactionBuffer transactionBuffer = Mockito.spy(TransactionBuffer.class); - when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean())) - // Handle producer will check transaction buffer recover completely. - .thenReturn(CompletableFuture.completedFuture(null)) + when(transactionBuffer.checkIfTBRecoverCompletely()) // If the Transaction buffer failed to recover, we can not get the correct last max read id. .thenReturn(CompletableFuture.failedFuture(new Throwable("Mock fail"))) // If the transaction buffer recover successfully, the max read position can be acquired successfully. @@ -405,4 +414,174 @@ private void assertGetLastMessageId(Consumer consumer, MessageIdImpl expected assertEquals(expected.getLedgerId(), actual.getLedgerId()); } + /** + * This test verifies the state changes of a TransactionBuffer within a topic under different conditions. + * Initially, the TransactionBuffer is in a NoSnapshot state upon topic creation. + * It remains in the NoSnapshot state even after a normal message is sent. + * The state changes to Ready only after a transactional message is sent. + * The test also ensures that the TransactionBuffer can be correctly recovered after the topic is unloaded. + */ + @Test + public void testWriteSnapshotWhenFirstTxnMessageSend() throws Exception { + // 1. Prepare test environment. + String topic = "persistent://" + NAMESPACE1 + "/testWriteSnapshotWhenFirstTxnMessageSend"; + String txnMsg = "transaction message"; + String normalMsg = "normal message"; + admin.topics().createNonPartitionedTopic(topic); + PersistentTopic persistentTopic = (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(topic, false) + .get() + .get(); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("my-sub") + .subscribe(); + // 2. Test the state of transaction buffer after building producer with no new messages. + // The TransactionBuffer should be in NoSnapshot state before transaction message sent. + TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(topicTransactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + }); + // 3. Test the state of transaction buffer after sending normal messages. + // The TransactionBuffer should still be in NoSnapshot state after a normal message is sent. + producer.newMessage().value(normalMsg).send(); + Assert.assertEquals(topicTransactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + // 4. Test the state of transaction buffer after sending transaction messages. + // The transaction buffer should be in Ready state at this time. + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build() + .get(); + producer.newMessage(transaction).value(txnMsg).send(); + Assert.assertEquals(topicTransactionBuffer.getState(), TopicTransactionBufferState.State.Ready); + // 5. Test transaction buffer can be recovered correctly. + // There are 4 message sent to this topic, 2 normal message and 2 transaction message |m1|m2-txn1|m3-txn1|m4|. + // Aborting the transaction and unload the topic and then redelivering unacked messages, + // only normal messages can be received. + transaction.abort().get(5, TimeUnit.SECONDS); + producer.newMessage().value(normalMsg).send(); + admin.topics().unload(topic); + PersistentTopic persistentTopic2 = (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(topic, false) + .get() + .get(); + TopicTransactionBuffer topicTransactionBuffer2 = (TopicTransactionBuffer) persistentTopic2 + .getTransactionBuffer(); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(topicTransactionBuffer2.getState(), TopicTransactionBufferState.State.Ready); + }); + consumer.redeliverUnacknowledgedMessages(); + for (int i = 0; i < 2; i++) { + Message message = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertEquals(message.getValue(), normalMsg); + } + Message message = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertNull(message); + } + + /** + * Send some messages before transaction buffer ready and then send some messages after transaction buffer ready, + * these messages should be received in order. + */ + @Test + public void testMessagePublishInOrder() throws Exception { + // 1. Prepare test environment. + this.pulsarServiceList.forEach(pulsarService -> { + pulsarService.setTransactionBufferProvider(new TransactionBufferTestProvider()); + }); + String topic = "persistent://" + NAMESPACE1 + "/testMessagePublishInOrder" + RandomUtils.nextLong(); + admin.topics().createNonPartitionedTopic(topic); + PersistentTopic persistentTopic = (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(topic, false) + .get() + .get(); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .subscribe(); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build().get(); + + // 2. Set a new future in transaction buffer as `transactionBufferFuture` to simulate whether the + // transaction buffer recover completely. + TransactionBufferTestImpl topicTransactionBuffer = (TransactionBufferTestImpl) persistentTopic + .getTransactionBuffer(); + CompletableFuture completableFuture = new CompletableFuture<>(); + CompletableFuture originalFuture = topicTransactionBuffer.getPublishFuture(); + topicTransactionBuffer.setPublishFuture(completableFuture); + topicTransactionBuffer.setState(TopicTransactionBufferState.State.Ready); + // Register this topic to the transaction in advance to avoid the sending request pending here. + ((TransactionImpl) transaction).registerProducedTopic(topic).get(5, TimeUnit.SECONDS); + // 3. Test the messages sent before transaction buffer ready is in order. + for (int i = 0; i < 50; i++) { + producer.newMessage(transaction).value(i).sendAsync(); + } + // 4. Test the messages sent after transaction buffer ready is in order. + completableFuture.complete(originalFuture.get()); + for (int i = 50; i < 100; i++) { + producer.newMessage(transaction).value(i).sendAsync(); + } + transaction.commit().get(); + for (int i = 0; i < 100; i++) { + Message message = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertEquals(message.getValue(), i); + } + } + + /** + * Test `testMessagePublishInOrder` will test the ref count work as expected with no exception. + * And this test is used to test the memory leak due to ref count. + */ + @Test + public void testRefCountWhenAppendBufferToTxn() throws Exception { + // 1. Prepare test resource + this.pulsarServiceList.forEach(pulsarService -> { + pulsarService.setTransactionBufferProvider(new TransactionBufferTestProvider()); + }); + String topic = "persistent://" + NAMESPACE1 + "/testRefCountWhenAppendBufferToTxn"; + admin.topics().createNonPartitionedTopic(topic); + PersistentTopic persistentTopic = (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(topic, false) + .get() + .get(); + TransactionBufferTestImpl topicTransactionBuffer = (TransactionBufferTestImpl) persistentTopic + .getTransactionBuffer(); + // 2. Test reference count does not change in the method `appendBufferToTxn`. + // 2.1 Test sending first transaction message, this will take a snapshot. + ByteBuf byteBuf1 = Unpooled.buffer(); + topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) + .get(5, TimeUnit.SECONDS); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf1.refCnt(), 1)); + // 2.2 Test send the second transaction message, this will not take snapshots. + ByteBuf byteBuf2 = Unpooled.buffer(); + topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) + .get(5, TimeUnit.SECONDS); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf2.refCnt(), 1)); + // 2.3 Test sending message failed. + topicTransactionBuffer.setPublishFuture(FutureUtil.failedFuture(new Exception("fail"))); + ByteBuf byteBuf3 = Unpooled.buffer(); + try { + topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) + .get(5, TimeUnit.SECONDS); + fail(); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), "fail"); + } + Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf3.refCnt(), 1)); + // 3. release resource + byteBuf1.release(); + byteBuf2.release(); + byteBuf3.release(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java index eb7b24c7326dc..0b50f91fd403c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java @@ -195,6 +195,15 @@ public void testSyncNormalPositionWhenTBRecover(boolean clientEnableTransaction, .topic(topicName) .create(); + if (clientEnableTransaction) { + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build() + .get(); + producer.newMessage(transaction).send(); + transaction.commit().get(); + } + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() .getTopic(TopicName.get(topicName).toString(), false).get().get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java new file mode 100644 index 0000000000000..7ee14ffc3378e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.utils; + +import lombok.Setter; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; + +import java.util.concurrent.CompletableFuture; + +public class TransactionBufferTestImpl extends TopicTransactionBuffer { + @Setter + public CompletableFuture transactionBufferFuture = null; + @Setter + public State state = null; + @Setter + public CompletableFuture publishFuture = null; + + public TransactionBufferTestImpl(PersistentTopic topic) { + super(topic); + } + + @Override + public CompletableFuture getTransactionBufferFuture() { + return transactionBufferFuture == null ? super.getTransactionBufferFuture() : transactionBufferFuture; + } + + @Override + public State getState() { + return state == null ? super.getState() : state; + } + + @Override + public CompletableFuture getPublishFuture() { + return publishFuture == null ? super.getPublishFuture() : publishFuture; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestProvider.java new file mode 100644 index 0000000000000..7bc93c0e7cf25 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestProvider.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.utils; + +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; + +public class TransactionBufferTestProvider implements TransactionBufferProvider { + + @Override + public TransactionBuffer newTransactionBuffer(Topic originTopic) { + return new TransactionBufferTestImpl((PersistentTopic) originTopic); + } +} +