From 7ff1a16753c85a94488cbd675aa8d34f5a330681 Mon Sep 17 00:00:00 2001 From: technoboy Date: Sun, 24 Apr 2022 10:36:42 +0800 Subject: [PATCH] Remove unused start method in tx-metadata-store. --- .../TransactionMetadataStoreService.java | 85 ++++--------------- 1 file changed, 15 insertions(+), 70 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 569f590034268..8aff792efec6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -41,7 +41,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException; import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl; @@ -54,10 +53,7 @@ import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.TxnAction; -import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; @@ -114,57 +110,6 @@ public TransactionMetadataStoreService(TransactionMetadataStoreProvider transact this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); } - @Deprecated - public void start() { - pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { - - @Override - public void onLoad(NamespaceBundle bundle) { - pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle) - .whenComplete((topics, ex) -> { - if (ex == null) { - for (String topic : topics) { - TopicName name = TopicName.get(topic); - if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName() - .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName()) - && name.isPartitioned()) { - handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex())); - } - } - } else { - LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.", - bundle, ex); - } - }); - } - - @Override - public void unLoad(NamespaceBundle bundle) { - pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle) - .whenComplete((topics, ex) -> { - if (ex == null) { - for (String topic : topics) { - TopicName name = TopicName.get(topic); - if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName() - .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName()) - && name.isPartitioned()) { - removeTransactionMetadataStore( - TransactionCoordinatorID.get(name.getPartitionIndex())); - } - } - } else { - LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.", - bundle, ex); - } - }); - } - @Override - public boolean test(NamespaceBundle namespaceBundle) { - return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE); - } - }); - } - public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tcId) { CompletableFuture completableFuture = new CompletableFuture<>(); internalPinnedExecutor.execute(() -> { @@ -381,16 +326,15 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, future.complete(null); return; } - Throwable realCause = FutureUtil.unwrapCompletionException(ex); - if (!isRetryableException(realCause)) { + if (!isRetryableException(ex)) { LOG.error("End transaction fail! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, realCause); + + "TxnAction : {}", txnID, txnAction, ex); future.completeExceptionally(ex); return; } if (LOG.isDebugEnabled()) { LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, realCause); + + "TxnAction : {}", txnID, txnAction, ex); } transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout, future), @@ -436,7 +380,7 @@ public void endTransactionForTimeout(TxnID txnID) { return null; } }).exceptionally(e -> { - if (isRetryableException(e.getCause())) { + if (isRetryableException(e)) { endTransaction(txnID, TxnAction.ABORT_VALUE, true); } else { if (LOG.isDebugEnabled()) { @@ -487,15 +431,16 @@ private CompletableFuture endTxnInTransactionBuffer(TxnID txnID, int txnAc }); } - private static boolean isRetryableException(Throwable e) { - return (e instanceof TransactionMetadataStoreStateException - || e instanceof RequestTimeoutException - || e instanceof ManagedLedgerException - || e instanceof BrokerPersistenceException - || e instanceof LookupException - || e instanceof ReachMaxPendingOpsException - || e instanceof ConnectException) - && !(e instanceof ManagedLedgerException.ManagedLedgerFencedException); + private static boolean isRetryableException(Throwable ex) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + return (realCause instanceof TransactionMetadataStoreStateException + || realCause instanceof RequestTimeoutException + || realCause instanceof ManagedLedgerException + || realCause instanceof BrokerPersistenceException + || realCause instanceof LookupException + || realCause instanceof ReachMaxPendingOpsException + || realCause instanceof ConnectException) + && !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException); } private CompletableFuture endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) { @@ -517,7 +462,7 @@ public Map getStores() { return Collections.unmodifiableMap(stores); } - public synchronized void close () { + public void close () { this.internalPinnedExecutor.shutdown(); stores.forEach((tcId, metadataStore) -> { metadataStore.closeAsync().whenComplete((v, ex) -> {