Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -258,4 +259,25 @@ public static CompletionException wrapToCompletionException(Throwable throwable)
return new CompletionException(throwable);
}
}

/**
* Executes an operation using the supplied {@link Executor}
* and notify failures on the supplied {@link CompletableFuture}.
*
* @param runnable the runnable to execute
* @param executor the executor to use for executing the runnable
* @param completableFuture the future to complete in case of exceptions
* @return
*/

public static void safeRunAsync(Runnable runnable,
Comment thread
nicoloboschi marked this conversation as resolved.
Executor executor,
CompletableFuture completableFuture) {
CompletableFuture
.runAsync(runnable, executor)
.exceptionally((throwable) -> {
completableFuture.completeExceptionally(throwable);
Comment thread
nicoloboschi marked this conversation as resolved.
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public CompletableFuture<TransactionMetadataStore> init(TransactionRecoverTracke
+ tcID.toString() + " change state to Initializing error when init it"));
} else {
recoverTime.setRecoverStartTime(System.currentTimeMillis());
internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
FutureUtil.safeRunAsync(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
@Override
public void replayComplete() {
recoverTracker.appendOpenTransactionToTimeoutTracker();
Expand Down Expand Up @@ -203,7 +203,7 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran
log.error(e.getMessage(), e);
}
}
}));
}), internalPinnedExecutor, completableFuture);
}
return completableFuture;
}
Expand All @@ -227,60 +227,59 @@ public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {

@Override
public CompletableFuture<TxnID> newTransaction(long timeOut) {
if (this.maxActiveTransactionsPerCoordinator == 0
|| this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) {
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
if (!checkIfReady()) {
completableFuture.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
return;
}

long mostSigBits = tcID.getId();
long leastSigBits = sequenceIdGenerator.generateSequenceId();
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
long currentTimeMillis = System.currentTimeMillis();
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
.setTxnidMostBits(mostSigBits)
.setTxnidLeastBits(leastSigBits)
.setStartTime(currentTimeMillis)
.setTimeoutMs(timeOut)
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
.setLastModificationTime(currentTimeMillis)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
transactionLog.append(transactionMetadataEntry)
.whenComplete((position, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
appendLogCount.increment();
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
List<Position> positions = new ArrayList<>();
positions.add(position);
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
txnMetaMap.put(leastSigBits, pair);
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
createdTransactionCount.increment();
completableFuture.complete(txnID);
}
});
});
return completableFuture;
} else {
if (this.maxActiveTransactionsPerCoordinator != 0
&& this.maxActiveTransactionsPerCoordinator <= txnMetaMap.size()) {
return FutureUtil.failedFuture(new CoordinatorException.ReachMaxActiveTxnException("New txn op "
+ "reach max active txn! tcId : " + getTransactionCoordinatorID().getId()));
}
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
completableFuture.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
return;
}

long mostSigBits = tcID.getId();
long leastSigBits = sequenceIdGenerator.generateSequenceId();
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
long currentTimeMillis = System.currentTimeMillis();
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
.setTxnidMostBits(mostSigBits)
.setTxnidLeastBits(leastSigBits)
.setStartTime(currentTimeMillis)
.setTimeoutMs(timeOut)
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
.setLastModificationTime(currentTimeMillis)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
transactionLog.append(transactionMetadataEntry)
.whenComplete((position, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
appendLogCount.increment();
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
List<Position> positions = new ArrayList<>();
positions.add(position);
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
txnMetaMap.put(leastSigBits, pair);
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
createdTransactionCount.increment();
completableFuture.complete(txnID);
}
});
}, internalPinnedExecutor, completableFuture);
return completableFuture;
}

@Override
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
CompletableFuture<Void> promise = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise
.completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "add produced partition"));
State.Ready, getState(), "add produced partition"));
return;
}
getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
Expand Down Expand Up @@ -313,15 +312,15 @@ public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<Strin
promise.completeExceptionally(ex);
return null;
});
});
}, internalPinnedExecutor, promise);
return promise;
}

@Override
public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
List<TransactionSubscription> txnSubscriptions) {
CompletableFuture<Void> promise = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition"));
Expand Down Expand Up @@ -357,15 +356,15 @@ public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
promise.completeExceptionally(ex);
return null;
});
});
}, internalPinnedExecutor, promise);
return promise;
}

@Override
public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
TxnStatus expectedStatus, boolean isTimeout) {
CompletableFuture<Void> promise = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID,
Expand Down Expand Up @@ -426,7 +425,7 @@ public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
promise.completeExceptionally(ex);
return null;
});
});
}, internalPinnedExecutor, promise);
return promise;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;

/***
* See PIP-160: https://github.com/apache/pulsar/issues/15516.
Expand Down Expand Up @@ -214,13 +215,13 @@ public void asyncAddData(T data, AddDataCallback callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis(), byteBuf));
return;
}
singleThreadExecutorForWrite.execute(() -> {
try {
internalAsyncAddData(data, callback, ctx);
} catch (Exception e){
log.warn("Execute 'internalAsyncAddData' fail", e);
}
});
CompletableFuture
.runAsync(
() -> internalAsyncAddData(data, callback, ctx), singleThreadExecutorForWrite)
.exceptionally(e -> {
log.warn("Execute 'internalAsyncAddData' fail", e);
return null;
});
}

/**
Expand Down Expand Up @@ -271,21 +272,21 @@ private void internalAsyncAddData(T data, AddDataCallback callback, Object ctx){
}

private void trigFlushByTimingTask(){
singleThreadExecutorForWrite.execute(() -> {
try {
if (flushContext.asyncAddArgsList.isEmpty()) {
return;
}
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), bytesSize,
System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime);
doFlush();
} catch (Exception e){
log.error("Trig flush by timing task fail.", e);
} finally {
// Start the next timing task.
nextTimingTrigger();
}
});
CompletableFuture
.runAsync(() -> {
if (flushContext.asyncAddArgsList.isEmpty()) {
return;
}
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), bytesSize,
System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime);
doFlush();
}, singleThreadExecutorForWrite)
.whenComplete((ignore, e) -> {
if (e != null) {
log.warn("Execute 'trigFlushByTimingTask' fail", e);
}
nextTimingTrigger();
});
}

/**
Expand Down Expand Up @@ -379,24 +380,20 @@ public CompletableFuture<Void> close() {
}
CompletableFuture closeFuture = new CompletableFuture();
// Cancel pending tasks and release resources.
singleThreadExecutorForWrite.execute(() -> {
try {
// If some requests are flushed, BK will trigger these callbacks, and the remaining requests in should
// fail.
failureCallbackByContextAndRecycle(flushContext,
new ManagedLedgerException.ManagedLedgerFencedException(
FutureUtil.safeRunAsync(() -> {
// If some requests are flushed, BK will trigger these callbacks, and the remaining requests in should
// fail.
failureCallbackByContextAndRecycle(flushContext,
new ManagedLedgerException.ManagedLedgerFencedException(
new Exception("Transaction log buffered write has closed")
));
// Cancel the timing task.
if (!timeout.isCancelled()){
this.timeout.cancel();
}
STATE_UPDATER.set(this, State.CLOSED);
closeFuture.complete(null);
} catch (Exception e){
closeFuture.completeExceptionally(e);
));
// Cancel the timing task.
if (!timeout.isCancelled()) {
this.timeout.cancel();
}
});
STATE_UPDATER.set(this, State.CLOSED);
closeFuture.complete(null);
}, singleThreadExecutorForWrite, closeFuture);
return closeFuture;
}

Expand Down