From 59db2d7d0beef7b790ca1002bbe7036564399288 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 8 Jul 2024 16:46:10 +0800 Subject: [PATCH 1/6] do not take snapshot when no txn. --- .../buffer/impl/TopicTransactionBuffer.java | 27 ++++++------ .../buffer/TopicTransactionBufferTest.java | 42 +++++++++++++++++++ 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index b4662e5fa83ed..5de1bf6eed0b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -219,19 +219,7 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxnEnabled) } else { CompletableFuture completableFuture = new CompletableFuture<>(); transactionBufferFuture.thenRun(() -> { - if (checkIfNoSnapshot()) { - snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { - if (changeToReadyStateFromNoSnapshot()) { - timer.newTimeout(TopicTransactionBuffer.this, - takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - } - completableFuture.complete(null); - }).exceptionally(exception -> { - log.error("Topic {} failed to take snapshot", this.topic.getName()); - completableFuture.completeExceptionally(exception); - return null; - }); - } else { + if (checkIfNoSnapshot() || checkIfReady()) { completableFuture.complete(null); } }).exceptionally(exception -> { @@ -436,17 +424,26 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { } } + private void takeAbortedTxnSnapshot(Position maxReadPosition) { + this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition) + .thenRun(() -> { + if (checkIfNoSnapshot()) { + changeToReadyStateFromNoSnapshot(); + } + }); + } + private void takeSnapshotByChangeTimes() { if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) { this.changeMaxReadPositionCount.set(0); - this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); + takeAbortedTxnSnapshot(this.maxReadPosition); } } private void takeSnapshotByTimeout() { if (changeMaxReadPositionCount.get() > 0) { this.changeMaxReadPositionCount.set(0); - this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); + takeAbortedTxnSnapshot(this.maxReadPosition); } this.timer.newTimeout(TopicTransactionBuffer.this, takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index dea79f391e39a..378bc09141f52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -343,6 +343,48 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { assertGetLastMessageId(consumer, expectedLastMessageID1); } + @Test + public void testNormalProductionNoSnapshot() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/testNormalProductionNoSnapshot"; + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + PersistentTopic topicRef = null; + for (int i = 0; i < pulsarServiceList.size(); i++) { + try { + topicRef = (PersistentTopic) pulsarServiceList.get(i) + .getBrokerService().getTopic(topic, true).get().get(); + break; + } catch (Exception e) { + } + } + TopicTransactionBuffer transactionBuffer = (TopicTransactionBuffer) topicRef.getTransactionBuffer(); + + // check the transaction buffer state is NoSnapshot + Assert.assertEquals(transactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + + // send 5 original messages. + for (int i = 0; i < 5; i++) { + producer.newMessage().send(); + } + + // check the transaction buffer state is NoSnapshot + Assert.assertEquals(transactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + + // create consumer to get last message id, trigger the snapshot + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + consumer.getLastMessageIds(); + + // check the transaction buffer state is Ready + Assert.assertEquals(transactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + } + /** * produce 3 messages and then trigger a ledger switch, * then create a transaction and send a transactional message. From 3408b019d81b8e42f1e555cc3048f8cd5550d6d9 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 9 Jul 2024 10:13:40 +0800 Subject: [PATCH 2/6] take snapshot before the first txn message. --- .../broker/transaction/buffer/impl/TopicTransactionBuffer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 5de1bf6eed0b5..290196ef4ce3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -256,6 +256,9 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI + "Please use a new transaction to send message.")); return completableFuture; } + if (checkIfNoSnapshot()) { + takeAbortedTxnSnapshot(this.maxReadPosition); + } topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { From 20a436172057d44278b48690fc118779f92d4a55 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 15 Jul 2024 10:08:57 +0800 Subject: [PATCH 3/6] fix. --- .../transaction/buffer/impl/TopicTransactionBuffer.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 290196ef4ce3f..17b03dadf4d7c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -221,6 +221,9 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxnEnabled) transactionBufferFuture.thenRun(() -> { if (checkIfNoSnapshot() || checkIfReady()) { completableFuture.complete(null); + } else { + completableFuture.completeExceptionally(new BrokerServiceException + .ServiceUnitNotReadyException("TransactionBuffer recover failed")); } }).exceptionally(exception -> { log.error("Topic {}: TransactionBuffer recover failed", this.topic.getName(), exception.getCause()); @@ -428,7 +431,7 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { } private void takeAbortedTxnSnapshot(Position maxReadPosition) { - this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition) + this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition) .thenRun(() -> { if (checkIfNoSnapshot()) { changeToReadyStateFromNoSnapshot(); From 6ae479734fb3e43ff831eee806cf9915fa3382ff Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 15 Jul 2024 10:27:53 +0800 Subject: [PATCH 4/6] fix. --- .../buffer/impl/TopicTransactionBuffer.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 17b03dadf4d7c..5767fb2d440bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -260,7 +260,8 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI return completableFuture; } if (checkIfNoSnapshot()) { - takeAbortedTxnSnapshot(this.maxReadPosition); + this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition) + .thenRun(() -> changeToReadyStateFromNoSnapshot()).join(); } topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() { @Override @@ -430,26 +431,17 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { } } - private void takeAbortedTxnSnapshot(Position maxReadPosition) { - this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition) - .thenRun(() -> { - if (checkIfNoSnapshot()) { - changeToReadyStateFromNoSnapshot(); - } - }); - } - private void takeSnapshotByChangeTimes() { if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) { this.changeMaxReadPositionCount.set(0); - takeAbortedTxnSnapshot(this.maxReadPosition); + this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } } private void takeSnapshotByTimeout() { if (changeMaxReadPositionCount.get() > 0) { this.changeMaxReadPositionCount.set(0); - takeAbortedTxnSnapshot(this.maxReadPosition); + this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } this.timer.newTimeout(TopicTransactionBuffer.this, takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); From b1d48225c3ca7c66c84a3e178363ec99fa9c8afe Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 17 Jul 2024 15:43:50 +0800 Subject: [PATCH 5/6] change. --- .../pulsar/broker/service/ServerCnx.java | 114 +++++++++--------- .../apache/pulsar/broker/service/Topic.java | 6 + .../nonpersistent/NonPersistentTopic.java | 5 + .../service/persistent/PersistentTopic.java | 5 + .../transaction/buffer/TransactionBuffer.java | 6 +- .../buffer/impl/TopicTransactionBuffer.java | 17 ++- 6 files changed, 92 insertions(+), 61 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4933aee974d08..63a3709db4b97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1583,67 +1583,69 @@ protected void handleProducer(final CommandProducer cmdProducer) { }); schemaVersionFuture.thenAccept(schemaVersion -> { - topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> { - CompletionStage createInitSubFuture; - if (!Strings.isNullOrEmpty(initialSubscriptionName) - && topic.isPersistent() - && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { - createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) - .thenCompose(isAllowAutoSubscriptionCreation -> { - if (!isAllowAutoSubscriptionCreation) { - return CompletableFuture.failedFuture( - new BrokerServiceException.NotAllowedException( - "Could not create the initial subscription due to" - + " the auto subscription creation is not allowed.")); - } - return topic.createSubscription(initialSubscriptionName, - InitialPosition.Earliest, false, null); - }); - } else { - createInitSubFuture = CompletableFuture.completedFuture(null); - } + topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled) + .thenCompose(future -> topic.takeFirstSnapshotIfNeed()) + .thenAccept(future -> { + CompletionStage createInitSubFuture; + if (!Strings.isNullOrEmpty(initialSubscriptionName) + && topic.isPersistent() + && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { + createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + if (!isAllowAutoSubscriptionCreation) { + return CompletableFuture.failedFuture( + new BrokerServiceException.NotAllowedException( + "Could not create the initial subscription due to" + + " the auto subscription creation is not allowed.")); + } + return topic.createSubscription(initialSubscriptionName, + InitialPosition.Earliest, false, null); + }); + } else { + createInitSubFuture = CompletableFuture.completedFuture(null); + } - createInitSubFuture.whenComplete((sub, ex) -> { - if (ex != null) { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - if (rc instanceof BrokerServiceException.NotAllowedException) { - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); - if (producerFuture.completeExceptionally(rc)) { - commandSender.sendErrorResponse(requestId, - ServerError.NotAllowedError, rc.getMessage()); + createInitSubFuture.whenComplete((sub, ex) -> { + if (ex != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + if (rc instanceof BrokerServiceException.NotAllowedException) { + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(rc)) { + commandSender.sendErrorResponse(requestId, + ServerError.NotAllowedError, rc.getMessage()); + } + producers.remove(producerId, producerFuture); + return; + } + String msg = + "Failed to create the initial subscription: " + ex.getCause().getMessage(); + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, msg, initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(ex)) { + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(ex), msg); + } + producers.remove(producerId, producerFuture); + return; } - producers.remove(producerId, producerFuture); - return; - } - String msg = - "Failed to create the initial subscription: " + ex.getCause().getMessage(); - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, msg, initialSubscriptionName, topicName); - if (producerFuture.completeExceptionally(ex)) { + + buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, + metadata, schemaVersion, epoch, userProvidedProducerName, topicName, + producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture); + }); + }).exceptionally(exception -> { + Throwable cause = exception.getCause(); + log.error("producerId {}, requestId {} : TransactionBuffer recover failed", + producerId, requestId, exception); + if (producerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(ex), msg); + ServiceUnitNotReadyException.getClientErrorCode(cause), + cause.getMessage()); } producers.remove(producerId, producerFuture); - return; - } - - buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, - metadata, schemaVersion, epoch, userProvidedProducerName, topicName, - producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture); - }); - }).exceptionally(exception -> { - Throwable cause = exception.getCause(); - log.error("producerId {}, requestId {} : TransactionBuffer recover failed", - producerId, requestId, exception); - if (producerFuture.completeExceptionally(exception)) { - commandSender.sendErrorResponse(requestId, - ServiceUnitNotReadyException.getClientErrorCode(cause), - cause.getMessage()); - } - producers.remove(producerId, producerFuture); - return null; - }); + return null; + }); }); }); return backlogQuotaCheckFuture; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 50a28c7979277..3cce23e4d574a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -153,6 +153,12 @@ default void setEntryTimestamp(long entryTimestamp) { */ CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled); + /** + * Take snapshot if needed. + * @return a future represents the result of take snapshot operation. + */ + CompletableFuture takeFirstSnapshotIfNeed(); + /** * record add-latency. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 3801ac7f3ee82..3882ac9176d2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -261,6 +261,11 @@ public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture takeFirstSnapshotIfNeed() { + return CompletableFuture.completedFuture(null); + } + @Override public CompletableFuture subscribe(SubscriptionOption option) { return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 07deb1168072a..e818da8e8c50a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -845,6 +845,11 @@ public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean return getTransactionBuffer().checkIfTBRecoverCompletely(isTxnEnabled); } + @Override + public CompletableFuture takeFirstSnapshotIfNeed() { + return getTransactionBuffer().takeFirstSnapshotIfNeed(); + } + @Override protected CompletableFuture incrementTopicEpoch(Optional currentEpoch) { long newEpoch = currentEpoch.orElse(-1L) + 1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index b379c4d1db10c..ee586b5e22374 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -194,7 +194,11 @@ public interface TransactionBuffer { */ CompletableFuture checkIfTBRecoverCompletely(boolean isTxn); - + /** + * Take snapshot if needed. + * @return a future represents the result of take snapshot operation. + */ + CompletableFuture takeFirstSnapshotIfNeed(); long getOngoingTxnCount(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 5767fb2d440bf..2858cc959487b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -234,6 +234,19 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxnEnabled) } } + @Override + public CompletableFuture takeFirstSnapshotIfNeed() { + CompletableFuture completableFuture = new CompletableFuture<>(); + if (checkIfNoSnapshot()) { + this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition) + .thenRun(() -> changeToReadyStateFromNoSnapshot()) + .thenAccept(__ -> completableFuture.complete(null)); + } else { + completableFuture.complete(null); + } + return completableFuture; + } + @Override public long getOngoingTxnCount() { return this.ongoingTxns.size(); @@ -259,10 +272,6 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI + "Please use a new transaction to send message.")); return completableFuture; } - if (checkIfNoSnapshot()) { - this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition) - .thenRun(() -> changeToReadyStateFromNoSnapshot()).join(); - } topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { From d314a8a96adf3cc8e639ce42f08ff94ca35c59a2 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 17 Jul 2024 15:52:37 +0800 Subject: [PATCH 6/6] fix. --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +- .../main/java/org/apache/pulsar/broker/service/Topic.java | 3 ++- .../broker/service/nonpersistent/NonPersistentTopic.java | 2 +- .../pulsar/broker/service/persistent/PersistentTopic.java | 4 ++-- .../pulsar/broker/transaction/buffer/TransactionBuffer.java | 3 ++- .../transaction/buffer/impl/InMemTransactionBuffer.java | 5 +++++ .../transaction/buffer/impl/TopicTransactionBuffer.java | 4 ++-- .../transaction/buffer/impl/TransactionBufferDisable.java | 5 +++++ 8 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 63a3709db4b97..a7f13870ae055 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1584,7 +1584,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { schemaVersionFuture.thenAccept(schemaVersion -> { topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled) - .thenCompose(future -> topic.takeFirstSnapshotIfNeed()) + .thenCompose(future -> topic.takeFirstSnapshotIfNeed(isTxnEnabled)) .thenAccept(future -> { CompletionStage createInitSubFuture; if (!Strings.isNullOrEmpty(initialSubscriptionName) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 3cce23e4d574a..661335685accf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -155,9 +155,10 @@ default void setEntryTimestamp(long entryTimestamp) { /** * Take snapshot if needed. + * @param isTxnEnabled isTxnEnabled * @return a future represents the result of take snapshot operation. */ - CompletableFuture takeFirstSnapshotIfNeed(); + CompletableFuture takeFirstSnapshotIfNeed(boolean isTxnEnabled); /** * record add-latency. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 3882ac9176d2e..bb58d1d8825c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -262,7 +262,7 @@ public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean } @Override - public CompletableFuture takeFirstSnapshotIfNeed() { + public CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e818da8e8c50a..1900faf1187d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -846,8 +846,8 @@ public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean } @Override - public CompletableFuture takeFirstSnapshotIfNeed() { - return getTransactionBuffer().takeFirstSnapshotIfNeed(); + public CompletableFuture takeFirstSnapshotIfNeed(boolean isTxnEnabled) { + return getTransactionBuffer().takeFirstSnapshotIfNeed(isTxnEnabled); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index ee586b5e22374..07e3aaf4f5969 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -196,9 +196,10 @@ public interface TransactionBuffer { /** * Take snapshot if needed. + * @param enableTxn * @return a future represents the result of take snapshot operation. */ - CompletableFuture takeFirstSnapshotIfNeed(); + CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn); long getOngoingTxnCount(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index ae755f0715ee2..338ac24ad3d98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -415,6 +415,11 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxn) { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn) { + return CompletableFuture.completedFuture(null); + } + @Override public long getOngoingTxnCount() { return this.buffers.values().stream() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 2858cc959487b..a181cea2fa210 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -235,9 +235,9 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxnEnabled) } @Override - public CompletableFuture takeFirstSnapshotIfNeed() { + public CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn) { CompletableFuture completableFuture = new CompletableFuture<>(); - if (checkIfNoSnapshot()) { + if (enableTxn && checkIfNoSnapshot()) { this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition) .thenRun(() -> changeToReadyStateFromNoSnapshot()) .thenAccept(__ -> completableFuture.complete(null)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index d0efc47c49544..3a2ac632f4ba1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -136,6 +136,11 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxn) { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn) { + return CompletableFuture.completedFuture(null); + } + @Override public long getOngoingTxnCount() { return 0;