Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
Expand All @@ -54,10 +53,7 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
Expand Down Expand Up @@ -114,57 +110,6 @@ public TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
}

@Deprecated
public void start() {
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {

@Override
public void onLoad(NamespaceBundle bundle) {
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
.whenComplete((topics, ex) -> {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
&& name.isPartitioned()) {
handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
}
}
} else {
LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.",
bundle, ex);
}
});
}

@Override
public void unLoad(NamespaceBundle bundle) {
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
.whenComplete((topics, ex) -> {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
&& name.isPartitioned()) {
removeTransactionMetadataStore(
TransactionCoordinatorID.get(name.getPartitionIndex()));
}
}
} else {
LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.",
bundle, ex);
}
});
}
@Override
public boolean test(NamespaceBundle namespaceBundle) {
return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
}
});
}

public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
Expand Down Expand Up @@ -381,16 +326,15 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout,
future.complete(null);
return;
}
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (!isRetryableException(realCause)) {
if (!isRetryableException(ex)) {
LOG.error("End transaction fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, realCause);
+ "TxnAction : {}", txnID, txnAction, ex);
future.completeExceptionally(ex);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, realCause);
+ "TxnAction : {}", txnID, txnAction, ex);
}
transactionOpRetryTimer.newTimeout(timeout ->
endTransaction(txnID, txnAction, isTimeout, future),
Expand Down Expand Up @@ -436,7 +380,7 @@ public void endTransactionForTimeout(TxnID txnID) {
return null;
}
}).exceptionally(e -> {
if (isRetryableException(e.getCause())) {
if (isRetryableException(e)) {
endTransaction(txnID, TxnAction.ABORT_VALUE, true);
} else {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -487,15 +431,16 @@ private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAc
});
}

private static boolean isRetryableException(Throwable e) {
return (e instanceof TransactionMetadataStoreStateException
|| e instanceof RequestTimeoutException
|| e instanceof ManagedLedgerException
|| e instanceof BrokerPersistenceException
|| e instanceof LookupException
|| e instanceof ReachMaxPendingOpsException
|| e instanceof ConnectException)
&& !(e instanceof ManagedLedgerException.ManagedLedgerFencedException);
private static boolean isRetryableException(Throwable ex) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
return (realCause instanceof TransactionMetadataStoreStateException
|| realCause instanceof RequestTimeoutException
|| realCause instanceof ManagedLedgerException
|| realCause instanceof BrokerPersistenceException
|| realCause instanceof LookupException
|| realCause instanceof ReachMaxPendingOpsException
|| realCause instanceof ConnectException)
&& !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException);
}

private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
Expand All @@ -517,7 +462,7 @@ public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
return Collections.unmodifiableMap(stores);
}

public synchronized void close () {
public void close () {
this.internalPinnedExecutor.shutdown();
stores.forEach((tcId, metadataStore) -> {
metadataStore.closeAsync().whenComplete((v, ex) -> {
Expand Down