From 4734422353d7c4acc384e97c9566be6c14a3356e Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 28 Oct 2025 10:49:45 +0800 Subject: [PATCH 1/2] fix getMaxReadPosition in TransactionBufferDisable should return latest --- .../transaction/buffer/impl/TransactionBufferDisable.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index d4fd071fef8a7..f6e2ad04e5046 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -108,7 +109,7 @@ public void syncMaxReadPositionForNormalPublish(Position position, boolean isMar @Override public Position getMaxReadPosition() { - return topic.getLastPosition(); + return PositionFactory.LATEST; } @Override From 9df0df53eb80bb32b4cfe50f4be87a7ec43b7e92 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Mon, 3 Nov 2025 14:54:50 +0800 Subject: [PATCH 2/2] fix getLastPosition in handleGetLastMessageId() --- .../pulsar/broker/service/persistent/PersistentTopic.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1f424df17a1c0..2f0255f4f2aae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4140,6 +4140,12 @@ public CompletableFuture getLastDispatchablePosition() { if (lastDispatchablePosition != null) { return CompletableFuture.completedFuture(lastDispatchablePosition); } + Position lastPosition; + if (transactionBuffer instanceof TransactionBufferDisable) { + lastPosition = getLastPosition(); + } else { + lastPosition = getMaxReadPosition(); + } return ledger.getLastDispatchablePosition(entry -> { MessageMetadata md = entry.getMessageMetadata(); if (md == null) { @@ -4154,7 +4160,7 @@ public CompletableFuture getLastDispatchablePosition() { return !isTxnAborted(txnID, entry.getPosition()); } return true; - }, getMaxReadPosition()).thenApply(position -> { + }, lastPosition).thenApply(position -> { // Update lastDispatchablePosition to the given position updateLastDispatchablePosition(position); return position;