diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1f424df17a1c0..2f0255f4f2aae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4140,6 +4140,12 @@ public CompletableFuture getLastDispatchablePosition() { if (lastDispatchablePosition != null) { return CompletableFuture.completedFuture(lastDispatchablePosition); } + Position lastPosition; + if (transactionBuffer instanceof TransactionBufferDisable) { + lastPosition = getLastPosition(); + } else { + lastPosition = getMaxReadPosition(); + } return ledger.getLastDispatchablePosition(entry -> { MessageMetadata md = entry.getMessageMetadata(); if (md == null) { @@ -4154,7 +4160,7 @@ public CompletableFuture getLastDispatchablePosition() { return !isTxnAborted(txnID, entry.getPosition()); } return true; - }, getMaxReadPosition()).thenApply(position -> { + }, lastPosition).thenApply(position -> { // Update lastDispatchablePosition to the given position updateLastDispatchablePosition(position); return position; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index d4fd071fef8a7..f6e2ad04e5046 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -108,7 +109,7 @@ public void syncMaxReadPositionForNormalPublish(Position position, boolean isMar @Override public Position getMaxReadPosition() { - return topic.getLastPosition(); + return PositionFactory.LATEST; } @Override