diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index de96a317205bb..57081863a145e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2061,23 +2061,28 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) long requestId = getLastMessageId.getRequestId(); Topic topic = consumer.getSubscription().getTopic(); - Position lastPosition = topic.getLastPosition(); - int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - - Position markDeletePosition = null; - if (consumer.getSubscription() instanceof PersistentSubscription) { - markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() - .getMarkDeletedPosition(); - } - - getLargestBatchIndexWhenPossible( - topic, - (PositionImpl) lastPosition, - (PositionImpl) markDeletePosition, - partitionIndex, - requestId, - consumer.getSubscription().getName()); + topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() -> { + Position lastPosition = ((PersistentTopic) topic).getMaxReadPosition(); + int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + + Position markDeletePosition = null; + if (consumer.getSubscription() instanceof PersistentSubscription) { + markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() + .getMarkDeletedPosition(); + } + getLargestBatchIndexWhenPossible( + topic, + (PositionImpl) lastPosition, + (PositionImpl) markDeletePosition, + partitionIndex, + requestId, + consumer.getSubscription().getName()); + }).exceptionally(e -> { + writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), + ServerError.UnknownError, "Failed to recover Transaction Buffer.")); + return null; + }); } else { writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3f75c02d48d4b..a260c70112abf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -332,7 +332,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { - this.transactionBuffer = new TransactionBufferDisable(); + this.transactionBuffer = new TransactionBufferDisable(this); } transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); if (ledger instanceof ShadowManagedLedgerImpl) { @@ -421,7 +421,7 @@ public CompletableFuture initialize() { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { - this.transactionBuffer = new TransactionBufferDisable(); + this.transactionBuffer = new TransactionBufferDisable(this); } shadowSourceTopic = null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index 56b49f98efe8e..32355ce1b8cb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -211,9 +211,12 @@ public TransactionBufferReader newReader(long sequenceId) throws final ConcurrentMap buffers; final Map> txnIndex; + private final Topic topic; + public InMemTransactionBuffer(Topic topic) { this.buffers = new ConcurrentHashMap<>(); this.txnIndex = new HashMap<>(); + this.topic = topic; } @Override @@ -371,7 +374,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) { @Override public PositionImpl getMaxReadPosition() { - return PositionImpl.LATEST; + return (PositionImpl) topic.getLastPosition(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 8a76362f5a35f..c7cd1f647a4f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -443,8 +443,7 @@ void updateMaxReadPosition(TxnID txnID) { ongoingTxns.remove(txnID); if (!ongoingTxns.isEmpty()) { PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey()); - //max read position is less than first ongoing transaction message position, so entryId -1 - maxReadPosition = PositionImpl.get(position.getLedgerId(), position.getEntryId() - 1); + maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position); } else { maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index 22ba8e2d2e8ef..a75a2912e51a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; import org.apache.pulsar.broker.transaction.buffer.TransactionMeta; @@ -38,6 +39,10 @@ */ @Slf4j public class TransactionBufferDisable implements TransactionBuffer { + private final Topic topic; + public TransactionBufferDisable(Topic topic) { + this.topic = topic; + } @Override public CompletableFuture getTransactionMeta(TxnID txnID) { @@ -90,7 +95,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) { @Override public PositionImpl getMaxReadPosition() { - return PositionImpl.LATEST; + return (PositionImpl) topic.getLastPosition(); } @Override