From 632e2251956504c343e26159afeb1dc3759af439 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 4 Nov 2025 22:33:06 +0800 Subject: [PATCH 01/12] [fix][broker]Trsaction messages can never be sent successfully if concurrently executed transaction buffer snaoshot --- .../buffer/impl/TopicTransactionBuffer.java | 19 ++-- .../impl/TopicTransactionBufferState.java | 13 ++- .../transaction/TransactionConsumeTest.java | 102 ++++++++++++++++++ 3 files changed, 125 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 0c777afaa26c1..9f2cac69f81cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -108,6 +108,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final AbortedTxnProcessor.SnapshotType snapshotType; private final MaxReadPositionCallBack maxReadPositionCallBack; + private CompletableFuture firstSnapshottingFuture; private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) { return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled() @@ -272,28 +273,32 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI // Method `takeAbortedTxnsSnapshot` will be executed in the different thread. // So we need to retain the buffer in this thread. It will be released after message persistent. buffer.retain(); - CompletableFuture future = getPublishFuture().thenCompose(ignore -> { - if (checkIfNoSnapshot()) { - CompletableFuture completableFuture = new CompletableFuture<>(); + CompletableFuture future = getTransactionBufferFuture().thenCompose(ignore -> { + if (changeToFirstSnapshotting()) { + synchronized (TopicTransactionBuffer.this) { + firstSnapshottingFuture = new CompletableFuture<>(); + } // `publishFuture` will be completed after message persistent, so there will not be two threads // writing snapshots at the same time. snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { if (changeToReadyStateFromNoSnapshot()) { timer.newTimeout(TopicTransactionBuffer.this, takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - completableFuture.complete(null); + firstSnapshottingFuture.complete(null); } else { log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", topic.getName()); - completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( + firstSnapshottingFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( "Transaction Buffer take first snapshot failed, the current state is: " + getState())); } }).exceptionally(exception -> { log.error("Topic {} failed to take snapshot", this.topic.getName()); - completableFuture.completeExceptionally(exception); + firstSnapshottingFuture.completeExceptionally(exception); return null; }); - return completableFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); + return firstSnapshottingFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); + } else if (checkIfFirstSnapshotting()) { + return firstSnapshottingFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); } else if (checkIfReady()) { return internalAppendBufferToTxn(txnId, buffer); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java index 92ab1d07b690d..edbb8929ce6d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java @@ -33,7 +33,8 @@ public enum State { Initializing, Ready, Close, - NoSnapshot + NoSnapshot, + FirstSnapshotting } private static final AtomicReferenceFieldUpdater STATE_UPDATER = @@ -59,13 +60,21 @@ protected boolean changeToInitializingState() { } protected boolean changeToReadyStateFromNoSnapshot() { - return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, State.Ready); + return STATE_UPDATER.compareAndSet(this, State.FirstSnapshotting, State.Ready); + } + + protected boolean changeToFirstSnapshotting() { + return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, State.FirstSnapshotting); } protected void changeToCloseState() { STATE_UPDATER.set(this, State.Close); } + public boolean checkIfFirstSnapshotting() { + return STATE_UPDATER.get(this) == State.FirstSnapshotting; + } + public boolean checkIfReady() { return STATE_UPDATER.get(this) == State.Ready; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java index a7e2aac5174b2..88fb3137bd0dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java @@ -19,7 +19,10 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; @@ -28,18 +31,24 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.WeakHashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -62,6 +71,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -417,4 +427,96 @@ public void testAckChunkMessage() throws Exception { Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName) .getUnackedMessages(), 0); } + + @DataProvider + public Object[][] doCommitTxn() { + return new Object[][] { + {true}, + {false} + }; + } + + @Test(dataProvider = "doCommitTxn", timeOut = 60_000, invocationCount = 3) + public void testFirstTnxBufferSnapshotAndRecoveryConcurrently(boolean doCommitTxn) throws Exception { + String topic = BrokerTestUtil.newUniqueName("persistent://public/txn/tp"); + // Create many clients and publish with transaction, which will trigger transaction buffer snapshot + // concurrently. + int producerCount = 10; + List clientList = new ArrayList<>(); + List> producerList = new ArrayList<>(); + List> sendResults = new ArrayList<>(); + List pendingTnxList = new ArrayList<>(); + for (int i = 0; i < producerCount; i++) { + clientList.add(PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .enableTransaction(true) + .build()); + } + for (int i = 0; i < producerCount; i++) { + producerList.add(clientList.get(i).newProducer(Schema.STRING).topic(topic).create()); + } + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName("s1").subscribe(); + for (int i = 0; i < producerCount; i++) { + Transaction transaction = clientList.get(i).newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build().get(); + pendingTnxList.add(transaction); + final int index = i; + Producer producer = producerList.get(i); + new Thread(() -> { + sendResults.add(producer.newMessage(transaction).value(index + "").sendAsync()); + }).start(); + } + + // Verify that the transaction buffer snapshot succeed. + AtomicReference topicTransactionBuffer = new AtomicReference<>(); + for (PulsarService pulsar : pulsarServiceList) { + if (pulsar.getBrokerService().getTopics().containsKey(topic)) { + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topic, false).get().get(); + topicTransactionBuffer.set((TopicTransactionBuffer) persistentTopic.getTransactionBuffer()); + break; + } + } + Awaitility.await().untilAsserted(() -> { + assertNotNull(topicTransactionBuffer.get()); + assertEquals(topicTransactionBuffer.get().getState().toString(), "Ready"); + assertTrue(topicTransactionBuffer.get().getTransactionBufferFuture().isDone()); + assertFalse(topicTransactionBuffer.get().getTransactionBufferFuture().isCompletedExceptionally()); + }); + + // Verify that all messages are sent successfully. + for (int i = 0; i < producerCount; i++) { + sendResults.get(i).get(); + if (doCommitTxn) { + pendingTnxList.get(i).commit(); + } else { + pendingTnxList.get(i).abort(); + } + } + Set msgReceived = new HashSet<>(); + while (true) { + Message msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg == null) { + break; + } + msgReceived.add(msg.getValue()); + } + if (doCommitTxn) { + for (int i = 0; i < producerCount; i++) { + assertTrue(msgReceived.contains(i + "")); + } + } else { + assertTrue(msgReceived.isEmpty()); + } + + // cleanup. + consumer.close(); + for (int i = 0; i < producerCount; i++) { + producerList.get(i).close(); + clientList.get(i).close(); + } + admin.topics().delete(topic, false); + } } From 386d5b9ba57bdeca5a9728b92a3a1d502fafdc7f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 5 Nov 2025 00:28:11 +0800 Subject: [PATCH 02/12] checkstyle --- .../broker/transaction/buffer/impl/TopicTransactionBuffer.java | 3 ++- .../pulsar/broker/transaction/TransactionConsumeTest.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 9f2cac69f81cd..36723ce341aa5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -288,7 +288,8 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI } else { log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", topic.getName()); - firstSnapshottingFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( + firstSnapshottingFuture.completeExceptionally(new BrokerServiceException + .ServiceUnitNotReadyException( "Transaction Buffer take first snapshot failed, the current state is: " + getState())); } }).exceptionally(exception -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java index 88fb3137bd0dc..16ce35214dc95 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java @@ -32,7 +32,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.WeakHashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; From 6d4c0f5917f0feeb81c91a6ca19f36a640f8e8f0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Nov 2025 01:56:41 +0800 Subject: [PATCH 03/12] fix publish out of order --- .../impl/PendingAppendingTxnBufferTask.java | 36 +++++ .../buffer/impl/TopicTransactionBuffer.java | 153 ++++++++++++------ .../buffer/TopicTransactionBufferTest.java | 13 +- .../utils/TransactionBufferTestImpl.java | 15 ++ 4 files changed, 154 insertions(+), 63 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java new file mode 100644 index 0000000000000..3fcccf71f3397 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import java.util.concurrent.CompletableFuture; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.client.api.transaction.TxnID; + +@Getter +@AllArgsConstructor +public class PendingAppendingTxnBufferTask { + + private final TxnID txnId; + private final long sequenceId; + private final ByteBuf buffer; + private CompletableFuture pendingPublishFuture; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 36723ce341aa5..08c312641058e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -23,6 +23,7 @@ import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -92,9 +93,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final CompletableFuture transactionBufferFuture = new CompletableFuture<>(); - private CompletableFuture publishFuture = getTransactionBufferFuture() - .thenApply(__ -> PositionFactory.EARLIEST); - /** * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC. */ @@ -108,7 +106,9 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final AbortedTxnProcessor.SnapshotType snapshotType; private final MaxReadPositionCallBack maxReadPositionCallBack; - private CompletableFuture firstSnapshottingFuture; + private volatile CompletableFuture firstSnapshottingFuture = new CompletableFuture<>(); + /** if the first snapshot is in progress, it will pending following publishing tasks **/ + private final LinkedList pendingAppendingTxnBufferTasks = new LinkedList<>(); private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) { return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled() @@ -233,16 +233,6 @@ public CompletableFuture getTransactionMeta(TxnID txnID) { return CompletableFuture.completedFuture(null); } - @VisibleForTesting - public void setPublishFuture(CompletableFuture publishFuture) { - this.publishFuture = publishFuture; - } - - @VisibleForTesting - public CompletableFuture getPublishFuture() { - return publishFuture; - } - @VisibleForTesting public CompletableFuture getTransactionBufferFuture() { return transactionBufferFuture; @@ -270,50 +260,109 @@ public long getCommittedTxnCount() { @Override public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { - // Method `takeAbortedTxnsSnapshot` will be executed in the different thread. - // So we need to retain the buffer in this thread. It will be released after message persistent. - buffer.retain(); - CompletableFuture future = getTransactionBufferFuture().thenCompose(ignore -> { - if (changeToFirstSnapshotting()) { - synchronized (TopicTransactionBuffer.this) { - firstSnapshottingFuture = new CompletableFuture<>(); + synchronized (pendingAppendingTxnBufferTasks) { + // The first snapshot is in progress, the following publish tasks will be pending. + if (!pendingAppendingTxnBufferTasks.isEmpty()) { + CompletableFuture res = new CompletableFuture<>(); + pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); + return res; + } + + // `publishFuture` will be completed after transaction buffer recover completely + // during initializing, so this case should not happen. + if (!checkIfReady() && !checkIfNoSnapshot() && !checkIfFirstSnapshotting()) { + return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction Buffer recover failed, the current state is: " + getState())); + } + + // The transaction buffer is ready to write. + if (checkIfReady()) { + return internalAppendBufferToTxn(txnId, buffer, sequenceId); + } + + // Pending the current publishing and trigger new snapshot if needed. + CompletableFuture res = new CompletableFuture<>(); + pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); + // Trigger the first snapshot. + getTransactionBufferFuture().whenComplete((ignore1, ex1) -> { + PendingAppendingTxnBufferTask pendingTask1 = null; + if (ex1 != null) { + synchronized (pendingAppendingTxnBufferTasks) { + while ((pendingTask1 = pendingAppendingTxnBufferTasks.pop()) != null) { + pendingTask1.getPendingPublishFuture().completeExceptionally(ex1); + } + return; + } } - // `publishFuture` will be completed after message persistent, so there will not be two threads - // writing snapshots at the same time. - snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { - if (changeToReadyStateFromNoSnapshot()) { - timer.newTimeout(TopicTransactionBuffer.this, - takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - firstSnapshottingFuture.complete(null); - } else { - log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", + if (changeToFirstSnapshotting()) { + log.info("[{}] Start to take the first snapshot", topic.getName()); + // Flush pending publishing after the first snapshot finished. + takeFirstSnapshot().whenComplete((ignore2, ex2) -> { + log.error("[{}] Failed to take the first snapshot, flushing failed publishing requests", + topic.getName(), ex2); + if (ex2 != null) { + synchronized (pendingAppendingTxnBufferTasks) { + PendingAppendingTxnBufferTask pendingTask2 = null; + while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { + pendingTask2.getPendingPublishFuture().completeExceptionally(ex2); + } + return; + } + } + log.info("[{}] Finished to take the first snapshot, flushing publishing requests", topic.getName()); - firstSnapshottingFuture.completeExceptionally(new BrokerServiceException - .ServiceUnitNotReadyException( - "Transaction Buffer take first snapshot failed, the current state is: " + getState())); - } - }).exceptionally(exception -> { - log.error("Topic {} failed to take snapshot", this.topic.getName()); - firstSnapshottingFuture.completeExceptionally(exception); - return null; - }); - return firstSnapshottingFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); - } else if (checkIfFirstSnapshotting()) { - return firstSnapshottingFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); - } else if (checkIfReady()) { - return internalAppendBufferToTxn(txnId, buffer); + PendingAppendingTxnBufferTask pendingTask2 = null; + synchronized (pendingAppendingTxnBufferTasks) { + while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { + final ByteBuf data = pendingTask2.getBuffer(); + // Method `internalAppendBufferToTxn` will be executed in the different thread. + // So we need to retain the buffer in this thread. It will be released after message + // persistent. + data.retain(); + final CompletableFuture pendingFuture = + pendingTask2.getPendingPublishFuture(); + internalAppendBufferToTxn(pendingTask2.getTxnId(), pendingTask2.getBuffer(), + pendingTask2.getSequenceId()).whenComplete((positionAdded, ex3) -> { + data.release(); + if (ex3 != null) { + pendingFuture.completeExceptionally(ex3); + return; + } + pendingFuture.complete(positionAdded); + }); + } + } + }); + } + }); + return res; + } + } + + private CompletableFuture takeFirstSnapshot() { + CompletableFuture firstSnapshottingFuture = new CompletableFuture<>(); + snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { + if (changeToReadyStateFromNoSnapshot()) { + timer.newTimeout(TopicTransactionBuffer.this, + takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); + firstSnapshottingFuture.complete(null); } else { - // `publishFuture` will be completed after transaction buffer recover completely - // during initializing, so this case should not happen. - return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException( - "Transaction Buffer recover failed, the current state is: " + getState())); + log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", + topic.getName()); + firstSnapshottingFuture.completeExceptionally(new BrokerServiceException + .ServiceUnitNotReadyException( + "Transaction Buffer take first snapshot failed, the current state is: " + getState())); } - }).whenComplete(((position, throwable) -> buffer.release())); - setPublishFuture(future); - return future; + }).exceptionally(exception -> { + log.error("Topic {} failed to take snapshot", this.topic.getName()); + firstSnapshottingFuture.completeExceptionally(exception); + return null; + }); + return firstSnapshottingFuture; } - private CompletableFuture internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer) { + @VisibleForTesting + protected CompletableFuture internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer, long seq) { CompletableFuture completableFuture = new CompletableFuture<>(); Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits()); if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 5a54b37a637dd..23dc60529ea6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -38,7 +38,6 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.RandomUtils; @@ -513,14 +512,6 @@ public void testMessagePublishInOrder() throws Exception { .withTransactionTimeout(5, TimeUnit.HOURS) .build().get(); - // 2. Set a new future in transaction buffer as `transactionBufferFuture` to simulate whether the - // transaction buffer recover completely. - TransactionBufferTestImpl topicTransactionBuffer = (TransactionBufferTestImpl) persistentTopic - .getTransactionBuffer(); - CompletableFuture completableFuture = new CompletableFuture<>(); - CompletableFuture originalFuture = topicTransactionBuffer.getPublishFuture(); - topicTransactionBuffer.setPublishFuture(completableFuture); - topicTransactionBuffer.setState(TopicTransactionBufferState.State.Ready); // Register this topic to the transaction in advance to avoid the sending request pending here. ((TransactionImpl) transaction).registerProducedTopic(topic).get(5, TimeUnit.SECONDS); // 3. Test the messages sent before transaction buffer ready is in order. @@ -528,7 +519,6 @@ public void testMessagePublishInOrder() throws Exception { producer.newMessage(transaction).value(i).sendAsync(); } // 4. Test the messages sent after transaction buffer ready is in order. - completableFuture.complete(originalFuture.get()); for (int i = 50; i < 100; i++) { producer.newMessage(transaction).value(i).sendAsync(); } @@ -569,7 +559,7 @@ public void testRefCountWhenAppendBufferToTxn() throws Exception { .get(5, TimeUnit.SECONDS); Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf2.refCnt(), 1)); // 2.3 Test sending message failed. - topicTransactionBuffer.setPublishFuture(FutureUtil.failedFuture(new Exception("fail"))); + topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(true); ByteBuf byteBuf3 = Unpooled.buffer(); try { topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) @@ -579,6 +569,7 @@ public void testRefCountWhenAppendBufferToTxn() throws Exception { assertEquals(e.getCause().getMessage(), "fail"); } Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf3.refCnt(), 1)); + topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(false); // 3. release resource byteBuf1.release(); byteBuf2.release(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java index b1168d0850166..903fe2ee2b22b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java @@ -18,14 +18,21 @@ */ package org.apache.pulsar.broker.transaction.buffer.utils; +import io.netty.buffer.ByteBuf; +import java.util.concurrent.CompletableFuture; import lombok.Setter; +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; +import org.apache.pulsar.client.api.transaction.TxnID; public class TransactionBufferTestImpl extends TopicTransactionBuffer { @Setter public State state = null; + @Setter + private boolean followingInternalAppendBufferToTxnFail; + public TransactionBufferTestImpl(PersistentTopic topic) { super(topic); } @@ -34,4 +41,12 @@ public TransactionBufferTestImpl(PersistentTopic topic) { public State getState() { return state == null ? super.getState() : state; } + + @Override + protected CompletableFuture internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer, long seq) { + if (followingInternalAppendBufferToTxnFail) { + return CompletableFuture.failedFuture(new RuntimeException("fail")); + } + return super.internalAppendBufferToTxn(txnId, buffer, seq); + } } From 237f2b4a022baa9e3c28968c9a46c73eb67ca18d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Nov 2025 10:03:45 +0800 Subject: [PATCH 04/12] checkstyle --- .../broker/transaction/buffer/impl/TopicTransactionBuffer.java | 2 +- .../broker/transaction/buffer/TopicTransactionBufferTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 08c312641058e..88a11782cbcea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -107,7 +107,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final AbortedTxnProcessor.SnapshotType snapshotType; private final MaxReadPositionCallBack maxReadPositionCallBack; private volatile CompletableFuture firstSnapshottingFuture = new CompletableFuture<>(); - /** if the first snapshot is in progress, it will pending following publishing tasks **/ + /** if the first snapshot is in progress, it will pending following publishing tasks. **/ private final LinkedList pendingAppendingTxnBufferTasks = new LinkedList<>(); private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 23dc60529ea6a..3afd5e7d6dfbc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -68,7 +68,6 @@ import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; From b87a8c74b2575f5f644b0cb3b2884383d26d736c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Nov 2025 11:35:54 +0800 Subject: [PATCH 05/12] fix bugs --- .../buffer/impl/TopicTransactionBuffer.java | 65 ++++++++++++------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 88a11782cbcea..dabd871b91122 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -264,6 +264,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI // The first snapshot is in progress, the following publish tasks will be pending. if (!pendingAppendingTxnBufferTasks.isEmpty()) { CompletableFuture res = new CompletableFuture<>(); + buffer.retain(); pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); return res; } @@ -282,6 +283,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI // Pending the current publishing and trigger new snapshot if needed. CompletableFuture res = new CompletableFuture<>(); + buffer.retain(); pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); // Trigger the first snapshot. getTransactionBufferFuture().whenComplete((ignore1, ex1) -> { @@ -289,6 +291,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI if (ex1 != null) { synchronized (pendingAppendingTxnBufferTasks) { while ((pendingTask1 = pendingAppendingTxnBufferTasks.pop()) != null) { + pendingTask1.getBuffer().release(); pendingTask1.getPendingPublishFuture().completeExceptionally(ex1); } return; @@ -298,38 +301,56 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI log.info("[{}] Start to take the first snapshot", topic.getName()); // Flush pending publishing after the first snapshot finished. takeFirstSnapshot().whenComplete((ignore2, ex2) -> { - log.error("[{}] Failed to take the first snapshot, flushing failed publishing requests", - topic.getName(), ex2); if (ex2 != null) { + log.error("[{}] Failed to take the first snapshot, flushing failed publishing requests", + topic.getName(), ex2); synchronized (pendingAppendingTxnBufferTasks) { PendingAppendingTxnBufferTask pendingTask2 = null; while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { + pendingTask2.getBuffer().release(); pendingTask2.getPendingPublishFuture().completeExceptionally(ex2); } return; } } - log.info("[{}] Finished to take the first snapshot, flushing publishing requests", - topic.getName()); + log.info("[{}] Finished to take the first snapshot, flushing publishing {} requests", + topic.getName(), pendingAppendingTxnBufferTasks.size()); PendingAppendingTxnBufferTask pendingTask2 = null; - synchronized (pendingAppendingTxnBufferTasks) { - while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { - final ByteBuf data = pendingTask2.getBuffer(); - // Method `internalAppendBufferToTxn` will be executed in the different thread. - // So we need to retain the buffer in this thread. It will be released after message - // persistent. - data.retain(); - final CompletableFuture pendingFuture = - pendingTask2.getPendingPublishFuture(); - internalAppendBufferToTxn(pendingTask2.getTxnId(), pendingTask2.getBuffer(), - pendingTask2.getSequenceId()).whenComplete((positionAdded, ex3) -> { - data.release(); - if (ex3 != null) { - pendingFuture.completeExceptionally(ex3); - return; - } - pendingFuture.complete(positionAdded); - }); + try { + synchronized (pendingAppendingTxnBufferTasks) { + while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { + final ByteBuf data = pendingTask2.getBuffer(); + // Method `internalAppendBufferToTxn` will be executed in the different thread. + // So we need to retain the buffer in this thread. It will be released after message + // persistent. + final CompletableFuture pendingFuture = + pendingTask2.getPendingPublishFuture(); + internalAppendBufferToTxn(pendingTask2.getTxnId(), pendingTask2.getBuffer(), + pendingTask2.getSequenceId()) + .whenComplete((positionAdded, ex3) -> { + if (ex3 != null) { + pendingFuture.completeExceptionally(ex3); + return; + } + pendingFuture.complete(positionAdded); + data.release(); + }); + } + } + } catch (Exception e) { + // If there are some error when adding entries or caching entries, this log will be printed. + log.error("[{}] Failed to flush pending publishing requests after taking the first" + + " snapshot, please raise a github issue.", + topic.getName(), e); + if (pendingTask2 != null) { + pendingTask2.getBuffer().release(); + pendingTask2.getPendingPublishFuture().completeExceptionally(e); + } + synchronized (pendingAppendingTxnBufferTasks) { + while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { + pendingTask2.getBuffer().release(); + pendingTask2.getPendingPublishFuture().completeExceptionally(ex2); + } } } }); From 4914fb1ea0b2db48df61ef6bd0854ed45947cd9d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Nov 2025 15:10:32 +0800 Subject: [PATCH 06/12] fix bugs --- .../buffer/impl/TopicTransactionBuffer.java | 20 ++++++++++++++----- .../impl/TopicTransactionBufferState.java | 4 ++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index dabd871b91122..0929daa242c2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -290,7 +290,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI PendingAppendingTxnBufferTask pendingTask1 = null; if (ex1 != null) { synchronized (pendingAppendingTxnBufferTasks) { - while ((pendingTask1 = pendingAppendingTxnBufferTasks.pop()) != null) { + while ((pendingTask1 = pendingAppendingTxnBufferTasks.poll()) != null) { pendingTask1.getBuffer().release(); pendingTask1.getPendingPublishFuture().completeExceptionally(ex1); } @@ -306,7 +306,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI topic.getName(), ex2); synchronized (pendingAppendingTxnBufferTasks) { PendingAppendingTxnBufferTask pendingTask2 = null; - while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { + while ((pendingTask2 = pendingAppendingTxnBufferTasks.poll()) != null) { pendingTask2.getBuffer().release(); pendingTask2.getPendingPublishFuture().completeExceptionally(ex2); } @@ -318,7 +318,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI PendingAppendingTxnBufferTask pendingTask2 = null; try { synchronized (pendingAppendingTxnBufferTasks) { - while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { + while ((pendingTask2 = pendingAppendingTxnBufferTasks.poll()) != null) { final ByteBuf data = pendingTask2.getBuffer(); // Method `internalAppendBufferToTxn` will be executed in the different thread. // So we need to retain the buffer in this thread. It will be released after message @@ -347,7 +347,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI pendingTask2.getPendingPublishFuture().completeExceptionally(e); } synchronized (pendingAppendingTxnBufferTasks) { - while ((pendingTask2 = pendingAppendingTxnBufferTasks.pop()) != null) { + while ((pendingTask2 = pendingAppendingTxnBufferTasks.poll()) != null) { pendingTask2.getBuffer().release(); pendingTask2.getPendingPublishFuture().completeExceptionally(ex2); } @@ -626,7 +626,17 @@ public CompletableFuture clearSnapshot() { @Override public CompletableFuture closeAsync() { - changeToCloseState(); + synchronized (pendingAppendingTxnBufferTasks) { + if (!checkIfClosed()) { + PendingAppendingTxnBufferTask pendingTask = null; + while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { + pendingTask.getPendingPublishFuture().completeExceptionally( + new BrokerServiceException.ServiceUnitNotReadyException("Topic is closed")); + pendingTask.getBuffer().release(); + } + } + changeToCloseState(); + } return this.snapshotAbortedTxnProcessor.closeAsync(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java index edbb8929ce6d0..e9199c434a811 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java @@ -83,6 +83,10 @@ public boolean checkIfNoSnapshot() { return STATE_UPDATER.get(this) == State.NoSnapshot; } + public boolean checkIfClosed() { + return STATE_UPDATER.get(this) == State.Close; + } + public State getState() { return STATE_UPDATER.get(this); } From 125d7c49305676624dd5e7f1b44807aa959e22fb Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Nov 2025 21:08:03 +0800 Subject: [PATCH 07/12] deal with comments --- .../buffer/impl/TopicTransactionBuffer.java | 48 ++++++++----------- .../impl/TopicTransactionBufferState.java | 4 ++ 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 0929daa242c2e..b23d79c2a8c7f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -271,7 +271,9 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI // `publishFuture` will be completed after transaction buffer recover completely // during initializing, so this case should not happen. - if (!checkIfReady() && !checkIfNoSnapshot() && !checkIfFirstSnapshotting()) { + if (!checkIfReady() && !checkIfNoSnapshot() && !checkIfFirstSnapshotting() && !checkIfInitializing()) { + log.error("[{}] unexpected state: {} when try to take the first transaction buffer snapshot", + topic.getName(), getState()); return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException( "Transaction Buffer recover failed, the current state is: " + getState())); } @@ -285,17 +287,22 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI CompletableFuture res = new CompletableFuture<>(); buffer.retain(); pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); + + final java.util.function.Consumer failPendingTasks = throwable -> { + synchronized (pendingAppendingTxnBufferTasks) { + PendingAppendingTxnBufferTask pendingTask = null; + while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { + pendingTask.getBuffer().release(); + pendingTask.getPendingPublishFuture().completeExceptionally(throwable); + } + } + }; + // Trigger the first snapshot. getTransactionBufferFuture().whenComplete((ignore1, ex1) -> { - PendingAppendingTxnBufferTask pendingTask1 = null; if (ex1 != null) { - synchronized (pendingAppendingTxnBufferTasks) { - while ((pendingTask1 = pendingAppendingTxnBufferTasks.poll()) != null) { - pendingTask1.getBuffer().release(); - pendingTask1.getPendingPublishFuture().completeExceptionally(ex1); - } - return; - } + failPendingTasks.accept(ex1); + return; } if (changeToFirstSnapshotting()) { log.info("[{}] Start to take the first snapshot", topic.getName()); @@ -304,14 +311,8 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI if (ex2 != null) { log.error("[{}] Failed to take the first snapshot, flushing failed publishing requests", topic.getName(), ex2); - synchronized (pendingAppendingTxnBufferTasks) { - PendingAppendingTxnBufferTask pendingTask2 = null; - while ((pendingTask2 = pendingAppendingTxnBufferTasks.poll()) != null) { - pendingTask2.getBuffer().release(); - pendingTask2.getPendingPublishFuture().completeExceptionally(ex2); - } - return; - } + failPendingTasks.accept(ex1); + return; } log.info("[{}] Finished to take the first snapshot, flushing publishing {} requests", topic.getName(), pendingAppendingTxnBufferTasks.size()); @@ -320,15 +321,13 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI synchronized (pendingAppendingTxnBufferTasks) { while ((pendingTask2 = pendingAppendingTxnBufferTasks.poll()) != null) { final ByteBuf data = pendingTask2.getBuffer(); - // Method `internalAppendBufferToTxn` will be executed in the different thread. - // So we need to retain the buffer in this thread. It will be released after message - // persistent. final CompletableFuture pendingFuture = pendingTask2.getPendingPublishFuture(); internalAppendBufferToTxn(pendingTask2.getTxnId(), pendingTask2.getBuffer(), pendingTask2.getSequenceId()) .whenComplete((positionAdded, ex3) -> { if (ex3 != null) { + data.release(); pendingFuture.completeExceptionally(ex3); return; } @@ -340,18 +339,13 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI } catch (Exception e) { // If there are some error when adding entries or caching entries, this log will be printed. log.error("[{}] Failed to flush pending publishing requests after taking the first" - + " snapshot, please raise a github issue.", + + " snapshot.", topic.getName(), e); if (pendingTask2 != null) { pendingTask2.getBuffer().release(); pendingTask2.getPendingPublishFuture().completeExceptionally(e); } - synchronized (pendingAppendingTxnBufferTasks) { - while ((pendingTask2 = pendingAppendingTxnBufferTasks.poll()) != null) { - pendingTask2.getBuffer().release(); - pendingTask2.getPendingPublishFuture().completeExceptionally(ex2); - } - } + failPendingTasks.accept(ex1); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java index e9199c434a811..9a8f2041bf4e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java @@ -71,6 +71,10 @@ protected void changeToCloseState() { STATE_UPDATER.set(this, State.Close); } + public boolean checkIfInitializing() { + return STATE_UPDATER.get(this) == State.Initializing; + } + public boolean checkIfFirstSnapshotting() { return STATE_UPDATER.get(this) == State.FirstSnapshotting; } From c794413958849f556aa4f296ff51de332d4aaa95 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 7 Nov 2025 01:49:03 +0800 Subject: [PATCH 08/12] fix bugs --- .../buffer/impl/TopicTransactionBuffer.java | 80 +++++++++++-------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index b23d79c2a8c7f..22abd58ccf3a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -298,9 +298,44 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI } }; + final Runnable flushPendingTasks = () -> { + PendingAppendingTxnBufferTask pendingTask = null; + try { + synchronized (pendingAppendingTxnBufferTasks) { + while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { + final ByteBuf data = pendingTask.getBuffer(); + final CompletableFuture pendingFuture = + pendingTask.getPendingPublishFuture(); + internalAppendBufferToTxn(pendingTask.getTxnId(), pendingTask.getBuffer(), + pendingTask.getSequenceId()) + .whenComplete((positionAdded, ex3) -> { + if (ex3 != null) { + data.release(); + pendingFuture.completeExceptionally(ex3); + return; + } + pendingFuture.complete(positionAdded); + data.release(); + }); + } + } + } catch (Exception e) { + // If there are some error when adding entries or caching entries, this log will be printed. + log.error("[{}] Failed to flush pending publishing requests after taking the first" + + " snapshot.", + topic.getName(), e); + if (pendingTask != null) { + pendingTask.getBuffer().release(); + pendingTask.getPendingPublishFuture().completeExceptionally(e); + } + failPendingTasks.accept(e); + } + }; + // Trigger the first snapshot. - getTransactionBufferFuture().whenComplete((ignore1, ex1) -> { + transactionBufferFuture.whenComplete((ignore1, ex1) -> { if (ex1 != null) { + log.error("[{}] Transaction buffer recover failed", topic.getName(), ex1); failPendingTasks.accept(ex1); return; } @@ -311,43 +346,22 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI if (ex2 != null) { log.error("[{}] Failed to take the first snapshot, flushing failed publishing requests", topic.getName(), ex2); - failPendingTasks.accept(ex1); + failPendingTasks.accept(ex2); return; } log.info("[{}] Finished to take the first snapshot, flushing publishing {} requests", topic.getName(), pendingAppendingTxnBufferTasks.size()); - PendingAppendingTxnBufferTask pendingTask2 = null; - try { - synchronized (pendingAppendingTxnBufferTasks) { - while ((pendingTask2 = pendingAppendingTxnBufferTasks.poll()) != null) { - final ByteBuf data = pendingTask2.getBuffer(); - final CompletableFuture pendingFuture = - pendingTask2.getPendingPublishFuture(); - internalAppendBufferToTxn(pendingTask2.getTxnId(), pendingTask2.getBuffer(), - pendingTask2.getSequenceId()) - .whenComplete((positionAdded, ex3) -> { - if (ex3 != null) { - data.release(); - pendingFuture.completeExceptionally(ex3); - return; - } - pendingFuture.complete(positionAdded); - data.release(); - }); - } - } - } catch (Exception e) { - // If there are some error when adding entries or caching entries, this log will be printed. - log.error("[{}] Failed to flush pending publishing requests after taking the first" - + " snapshot.", - topic.getName(), e); - if (pendingTask2 != null) { - pendingTask2.getBuffer().release(); - pendingTask2.getPendingPublishFuture().completeExceptionally(e); - } - failPendingTasks.accept(ex1); - } + flushPendingTasks.run(); }); + } else if (checkIfReady()) { + log.info("[{}] No need to take the first snapshot, flushing publishing {} requests", + topic.getName(), pendingAppendingTxnBufferTasks.size()); + flushPendingTasks.run(); + } else { + log.error("[{}] Transaction buffer recover failed, current state is {}", topic.getName(), + getState()); + failPendingTasks.accept(new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction Buffer recover failed, the current state is: " + getState())); } }); return res; From 679d7e329973ad123798ec9a980abcb318760485 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 10 Nov 2025 10:04:18 +0800 Subject: [PATCH 09/12] address comments --- .../impl/PendingAppendingTxnBufferTask.java | 36 ------------------- .../buffer/impl/TopicTransactionBuffer.java | 26 +++++++------- .../utils/TransactionBufferTestImpl.java | 2 +- 3 files changed, 15 insertions(+), 49 deletions(-) delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java deleted file mode 100644 index 3fcccf71f3397..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.transaction.buffer.impl; - -import io.netty.buffer.ByteBuf; -import java.util.concurrent.CompletableFuture; -import lombok.AllArgsConstructor; -import lombok.Getter; -import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.client.api.transaction.TxnID; - -@Getter -@AllArgsConstructor -public class PendingAppendingTxnBufferTask { - - private final TxnID txnId; - private final long sequenceId; - private final ByteBuf buffer; - private CompletableFuture pendingPublishFuture; -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 22abd58ccf3a3..8ec5783158b40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -258,6 +258,9 @@ public long getCommittedTxnCount() { return this.txnCommittedCounter.sum(); } + private record PendingAppendingTxnBufferTask(TxnID txnId, long sequenceId, ByteBuf buffer, + CompletableFuture pendingPublishFuture) {} + @Override public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { synchronized (pendingAppendingTxnBufferTasks) { @@ -292,8 +295,8 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI synchronized (pendingAppendingTxnBufferTasks) { PendingAppendingTxnBufferTask pendingTask = null; while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { - pendingTask.getBuffer().release(); - pendingTask.getPendingPublishFuture().completeExceptionally(throwable); + pendingTask.buffer.release(); + pendingTask.pendingPublishFuture.completeExceptionally(throwable); } } }; @@ -303,19 +306,18 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI try { synchronized (pendingAppendingTxnBufferTasks) { while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { - final ByteBuf data = pendingTask.getBuffer(); + final ByteBuf data = pendingTask.buffer; final CompletableFuture pendingFuture = - pendingTask.getPendingPublishFuture(); - internalAppendBufferToTxn(pendingTask.getTxnId(), pendingTask.getBuffer(), - pendingTask.getSequenceId()) + pendingTask.pendingPublishFuture; + internalAppendBufferToTxn(pendingTask.txnId, pendingTask.buffer, + pendingTask.sequenceId) .whenComplete((positionAdded, ex3) -> { + data.release(); if (ex3 != null) { - data.release(); pendingFuture.completeExceptionally(ex3); return; } pendingFuture.complete(positionAdded); - data.release(); }); } } @@ -325,8 +327,8 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI + " snapshot.", topic.getName(), e); if (pendingTask != null) { - pendingTask.getBuffer().release(); - pendingTask.getPendingPublishFuture().completeExceptionally(e); + pendingTask.buffer.release(); + pendingTask.pendingPublishFuture.completeExceptionally(e); } failPendingTasks.accept(e); } @@ -638,9 +640,9 @@ public CompletableFuture closeAsync() { if (!checkIfClosed()) { PendingAppendingTxnBufferTask pendingTask = null; while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { - pendingTask.getPendingPublishFuture().completeExceptionally( + pendingTask.pendingPublishFuture.completeExceptionally( new BrokerServiceException.ServiceUnitNotReadyException("Topic is closed")); - pendingTask.getBuffer().release(); + pendingTask.buffer.release(); } } changeToCloseState(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java index 903fe2ee2b22b..f1a003ff194d4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java @@ -45,7 +45,7 @@ public State getState() { @Override protected CompletableFuture internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer, long seq) { if (followingInternalAppendBufferToTxnFail) { - return CompletableFuture.failedFuture(new RuntimeException("fail")); + return CompletableFuture.failedFuture(new RuntimeException("failed because an injected error for test")); } return super.internalAppendBufferToTxn(txnId, buffer, seq); } From a339af5c69173f6cfbe8fab68d6020bf4c94c380 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 10 Nov 2025 10:34:01 +0800 Subject: [PATCH 10/12] address comments --- .../broker/transaction/buffer/impl/TopicTransactionBuffer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 8ec5783158b40..eba34bd8e1661 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -106,7 +106,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final AbortedTxnProcessor.SnapshotType snapshotType; private final MaxReadPositionCallBack maxReadPositionCallBack; - private volatile CompletableFuture firstSnapshottingFuture = new CompletableFuture<>(); /** if the first snapshot is in progress, it will pending following publishing tasks. **/ private final LinkedList pendingAppendingTxnBufferTasks = new LinkedList<>(); From 788bd3140212ff2b5a7400b7a8d588c65ff79068 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 10 Nov 2025 22:16:00 +0800 Subject: [PATCH 11/12] address commentys --- .../buffer/impl/TopicTransactionBuffer.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index eba34bd8e1661..2df6e717981e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -258,7 +258,13 @@ public long getCommittedTxnCount() { } private record PendingAppendingTxnBufferTask(TxnID txnId, long sequenceId, ByteBuf buffer, - CompletableFuture pendingPublishFuture) {} + CompletableFuture pendingPublishFuture) { + + void fail(Throwable throwable) { + buffer.release(); + pendingPublishFuture.completeExceptionally(throwable); + } + } @Override public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { @@ -294,8 +300,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI synchronized (pendingAppendingTxnBufferTasks) { PendingAppendingTxnBufferTask pendingTask = null; while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { - pendingTask.buffer.release(); - pendingTask.pendingPublishFuture.completeExceptionally(throwable); + pendingTask.fail(throwable); } } }; @@ -326,8 +331,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI + " snapshot.", topic.getName(), e); if (pendingTask != null) { - pendingTask.buffer.release(); - pendingTask.pendingPublishFuture.completeExceptionally(e); + pendingTask.fail(e); } failPendingTasks.accept(e); } @@ -638,10 +642,9 @@ public CompletableFuture closeAsync() { synchronized (pendingAppendingTxnBufferTasks) { if (!checkIfClosed()) { PendingAppendingTxnBufferTask pendingTask = null; + Throwable t = new BrokerServiceException.ServiceUnitNotReadyException("Topic is closed"); while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { - pendingTask.pendingPublishFuture.completeExceptionally( - new BrokerServiceException.ServiceUnitNotReadyException("Topic is closed")); - pendingTask.buffer.release(); + pendingTask.fail(t); } } changeToCloseState(); From 29e8d32d993263a4f9047f968957a818f4ac7002 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 11 Nov 2025 09:49:34 +0800 Subject: [PATCH 12/12] fix tests --- .../broker/transaction/buffer/TopicTransactionBufferTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 3afd5e7d6dfbc..d76a5a88dbd64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -563,9 +563,9 @@ public void testRefCountWhenAppendBufferToTxn() throws Exception { try { topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) .get(5, TimeUnit.SECONDS); - fail(); + fail("this appending should fail because we injected an error"); } catch (Exception e) { - assertEquals(e.getCause().getMessage(), "fail"); + assertEquals(e.getCause().getMessage(), "failed because an injected error for test"); } Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf3.refCnt(), 1)); topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(false);