From d2beb06e21a651dc029595e4bac36dd9256f19ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Tue, 17 Jan 2023 13:36:54 +0100 Subject: [PATCH 1/3] [fix][txn] Catch and log runtime exceptions in async operations --- .../apache/pulsar/common/util/FutureUtil.java | 22 +++++ .../impl/MLTransactionMetadataStore.java | 99 +++++++++---------- .../impl/TxnLogBufferedWriter.java | 73 +++++++------- 3 files changed, 106 insertions(+), 88 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 19e3a62cc92af..eaba6857331a0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -258,4 +259,25 @@ public static CompletionException wrapToCompletionException(Throwable throwable) return new CompletionException(throwable); } } + + /** + * Creates a new {@link CompletableFuture} instance catching + * potential exceptions and completing the future exceptionally. + * + * @param runnable the runnable to execute + * @param executor the executor to use for executing the runnable + * @param completableFuture the future to complete in case of exceptions + * @return + */ + + public static void safeRunAsync(Runnable runnable, + Executor executor, + CompletableFuture completableFuture) { + CompletableFuture + .runAsync(runnable, executor) + .exceptionally((throwable) -> { + completableFuture.completeExceptionally(throwable); + return null; + }); + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index 53a515ff99164..cc468c790c9e0 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -115,7 +115,7 @@ public CompletableFuture init(TransactionRecoverTracke + tcID.toString() + " change state to Initializing error when init it")); } else { recoverTime.setRecoverStartTime(System.currentTimeMillis()); - internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() { + FutureUtil.safeRunAsync(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() { @Override public void replayComplete() { recoverTracker.appendOpenTransactionToTimeoutTracker(); @@ -203,7 +203,7 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran log.error(e.getMessage(), e); } } - })); + }), internalPinnedExecutor, completableFuture); } return completableFuture; } @@ -227,60 +227,59 @@ public CompletableFuture getTxnMeta(TxnID txnID) { @Override public CompletableFuture newTransaction(long timeOut) { - if (this.maxActiveTransactionsPerCoordinator == 0 - || this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) { - CompletableFuture completableFuture = new CompletableFuture<>(); - internalPinnedExecutor.execute(() -> { - if (!checkIfReady()) { - completableFuture.completeExceptionally(new CoordinatorException - .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction")); - return; - } - - long mostSigBits = tcID.getId(); - long leastSigBits = sequenceIdGenerator.generateSequenceId(); - TxnID txnID = new TxnID(mostSigBits, leastSigBits); - long currentTimeMillis = System.currentTimeMillis(); - TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() - .setTxnidMostBits(mostSigBits) - .setTxnidLeastBits(leastSigBits) - .setStartTime(currentTimeMillis) - .setTimeoutMs(timeOut) - .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) - .setLastModificationTime(currentTimeMillis) - .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); - transactionLog.append(transactionMetadataEntry) - .whenComplete((position, throwable) -> { - if (throwable != null) { - completableFuture.completeExceptionally(throwable); - } else { - appendLogCount.increment(); - TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut); - List positions = new ArrayList<>(); - positions.add(position); - Pair> pair = MutablePair.of(txn, positions); - txnMetaMap.put(leastSigBits, pair); - this.timeoutTracker.addTransaction(leastSigBits, timeOut); - createdTransactionCount.increment(); - completableFuture.complete(txnID); - } - }); - }); - return completableFuture; - } else { + if (this.maxActiveTransactionsPerCoordinator != 0 && + this.maxActiveTransactionsPerCoordinator <= txnMetaMap.size()) { return FutureUtil.failedFuture(new CoordinatorException.ReachMaxActiveTxnException("New txn op " + "reach max active txn! tcId : " + getTransactionCoordinatorID().getId())); } + CompletableFuture completableFuture = new CompletableFuture<>(); + FutureUtil.safeRunAsync(() -> { + if (!checkIfReady()) { + completableFuture.completeExceptionally(new CoordinatorException + .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction")); + return; + } + + long mostSigBits = tcID.getId(); + long leastSigBits = sequenceIdGenerator.generateSequenceId(); + TxnID txnID = new TxnID(mostSigBits, leastSigBits); + long currentTimeMillis = System.currentTimeMillis(); + TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() + .setTxnidMostBits(mostSigBits) + .setTxnidLeastBits(leastSigBits) + .setStartTime(currentTimeMillis) + .setTimeoutMs(timeOut) + .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) + .setLastModificationTime(currentTimeMillis) + .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); + transactionLog.append(transactionMetadataEntry) + .whenComplete((position, throwable) -> { + if (throwable != null) { + completableFuture.completeExceptionally(throwable); + } else { + appendLogCount.increment(); + TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut); + List positions = new ArrayList<>(); + positions.add(position); + Pair> pair = MutablePair.of(txn, positions); + txnMetaMap.put(leastSigBits, pair); + this.timeoutTracker.addTransaction(leastSigBits, timeOut); + createdTransactionCount.increment(); + completableFuture.complete(txnID); + } + }); + }, internalPinnedExecutor, completableFuture); + return completableFuture; } @Override public CompletableFuture addProducedPartitionToTxn(TxnID txnID, List partitions) { CompletableFuture promise = new CompletableFuture<>(); - internalPinnedExecutor.execute(() -> { + FutureUtil.safeRunAsync(() -> { if (!checkIfReady()) { promise .completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID, - State.Ready, getState(), "add produced partition")); + State.Ready, getState(), "add produced partition")); return; } getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> { @@ -313,7 +312,7 @@ public CompletableFuture addProducedPartitionToTxn(TxnID txnID, List addProducedPartitionToTxn(TxnID txnID, List addAckedPartitionToTxn(TxnID txnID, List txnSubscriptions) { CompletableFuture promise = new CompletableFuture<>(); - internalPinnedExecutor.execute(() -> { + FutureUtil.safeRunAsync(() -> { if (!checkIfReady()) { promise.completeExceptionally(new CoordinatorException .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition")); @@ -357,7 +356,7 @@ public CompletableFuture addAckedPartitionToTxn(TxnID txnID, promise.completeExceptionally(ex); return null; }); - }); + }, internalPinnedExecutor, promise); return promise; } @@ -365,7 +364,7 @@ public CompletableFuture addAckedPartitionToTxn(TxnID txnID, public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus, boolean isTimeout) { CompletableFuture promise = new CompletableFuture<>(); - internalPinnedExecutor.execute(() -> { + FutureUtil.safeRunAsync(() -> { if (!checkIfReady()) { promise.completeExceptionally(new CoordinatorException .TransactionMetadataStoreStateException(tcID, @@ -426,7 +425,7 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, promise.completeExceptionally(ex); return null; }); - }); + }, internalPinnedExecutor, promise); return promise; } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java index a87c040031f9a..5ad50088ffbde 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java @@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.util.FutureUtil; /*** * See PIP-160: https://github.com/apache/pulsar/issues/15516. @@ -214,13 +215,13 @@ public void asyncAddData(T data, AddDataCallback callback, Object ctx){ AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis(), byteBuf)); return; } - singleThreadExecutorForWrite.execute(() -> { - try { - internalAsyncAddData(data, callback, ctx); - } catch (Exception e){ - log.warn("Execute 'internalAsyncAddData' fail", e); - } - }); + CompletableFuture + .runAsync( + () -> internalAsyncAddData(data, callback, ctx), singleThreadExecutorForWrite) + .exceptionally(e -> { + log.warn("Execute 'internalAsyncAddData' fail", e); + return null; + }); } /** @@ -271,21 +272,21 @@ private void internalAsyncAddData(T data, AddDataCallback callback, Object ctx){ } private void trigFlushByTimingTask(){ - singleThreadExecutorForWrite.execute(() -> { - try { - if (flushContext.asyncAddArgsList.isEmpty()) { - return; - } - metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), bytesSize, - System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); - doFlush(); - } catch (Exception e){ - log.error("Trig flush by timing task fail.", e); - } finally { - // Start the next timing task. - nextTimingTrigger(); - } - }); + CompletableFuture + .runAsync(() -> { + if (flushContext.asyncAddArgsList.isEmpty()) { + return; + } + metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), bytesSize, + System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); + doFlush(); + }, singleThreadExecutorForWrite) + .whenComplete((ignore, e) -> { + if (e != null) { + log.warn("Execute 'trigFlushByTimingTask' fail", e); + } + nextTimingTrigger(); + }); } /** @@ -379,24 +380,20 @@ public CompletableFuture close() { } CompletableFuture closeFuture = new CompletableFuture(); // Cancel pending tasks and release resources. - singleThreadExecutorForWrite.execute(() -> { - try { - // If some requests are flushed, BK will trigger these callbacks, and the remaining requests in should - // fail. - failureCallbackByContextAndRecycle(flushContext, - new ManagedLedgerException.ManagedLedgerFencedException( + FutureUtil.safeRunAsync(() -> { + // If some requests are flushed, BK will trigger these callbacks, and the remaining requests in should + // fail. + failureCallbackByContextAndRecycle(flushContext, + new ManagedLedgerException.ManagedLedgerFencedException( new Exception("Transaction log buffered write has closed") - )); - // Cancel the timing task. - if (!timeout.isCancelled()){ - this.timeout.cancel(); - } - STATE_UPDATER.set(this, State.CLOSED); - closeFuture.complete(null); - } catch (Exception e){ - closeFuture.completeExceptionally(e); + )); + // Cancel the timing task. + if (!timeout.isCancelled()) { + this.timeout.cancel(); } - }); + STATE_UPDATER.set(this, State.CLOSED); + closeFuture.complete(null); + }, singleThreadExecutorForWrite, closeFuture); return closeFuture; } From ce0a85bf28f064c1e25d1417663cc2beda5f3d5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Tue, 17 Jan 2023 17:33:48 +0100 Subject: [PATCH 2/3] javadoc --- .../main/java/org/apache/pulsar/common/util/FutureUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index eaba6857331a0..162ef1e52ffd6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -261,8 +261,8 @@ public static CompletionException wrapToCompletionException(Throwable throwable) } /** - * Creates a new {@link CompletableFuture} instance catching - * potential exceptions and completing the future exceptionally. + * Executes an operation using the supplied {@link Executor} + * and notify failures on the supplied {@link CompletableFuture}. * * @param runnable the runnable to execute * @param executor the executor to use for executing the runnable From 5327fc20ad40f5b6a83174ed5461c3d623028967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 18 Jan 2023 10:14:27 +0100 Subject: [PATCH 3/3] style --- .../coordinator/impl/MLTransactionMetadataStore.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index cc468c790c9e0..aa6afcee3aed1 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -227,8 +227,8 @@ public CompletableFuture getTxnMeta(TxnID txnID) { @Override public CompletableFuture newTransaction(long timeOut) { - if (this.maxActiveTransactionsPerCoordinator != 0 && - this.maxActiveTransactionsPerCoordinator <= txnMetaMap.size()) { + if (this.maxActiveTransactionsPerCoordinator != 0 + && this.maxActiveTransactionsPerCoordinator <= txnMetaMap.size()) { return FutureUtil.failedFuture(new CoordinatorException.ReachMaxActiveTxnException("New txn op " + "reach max active txn! tcId : " + getTransactionCoordinatorID().getId())); }