diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index 2331f03df..992dca739 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -37,11 +37,12 @@ const ( ) const ( - blockTime = 12 // seconds, typical Ethereum block time - bidTimeout = 3 * time.Second // timeout for bid operations - defaultConfidence = 90 // default confidence level for the next block - confidenceSecondAttempt = 95 // confidence level for the second attempt - confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts + blockTime = 12 // seconds, typical Ethereum block time + bidTimeout = 3 * time.Second // timeout for bid operations + defaultConfidence = 90 // default confidence level for the next block + confidenceSecondAttempt = 95 // confidence level for the second attempt + confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts + transactionTimeout = 10 * time.Minute // timeout for transaction processing ) var ( @@ -52,6 +53,7 @@ var ( ErrNegativeTransactionValue = errors.New("negative transaction value") ErrZeroGasLimit = errors.New("zero gas limit") ErrTransactionCancelled = errors.New("transaction cancelled by user") + ErrTimeoutExceeded = errors.New("timeout exceeded while waiting for transaction to be processed") ) type Transaction struct { @@ -125,6 +127,7 @@ type TxSender struct { inflightTxns map[common.Hash]chan struct{} inflightAccount map[common.Address]struct{} inflightMu sync.RWMutex + processMu sync.RWMutex txnAttemptHistory *lru.Cache[common.Hash, *txnAttempt] } @@ -218,8 +221,30 @@ func (t *TxSender) CancelTransaction(ctx context.Context, txnHash common.Hash) ( cancel, found := t.inflightTxns[txnHash] t.inflightMu.RUnlock() if !found { - t.logger.Warn("Transaction not found in flight", "hash", txnHash.Hex()) - return false, nil + return func() (bool, error) { + // we need to hold the processMu lock till we check as a parallel goroutine + // might try to process the transaction and update its status + t.processMu.RLock() + defer t.processMu.RUnlock() + + txn, err := t.store.GetTransactionByHash(ctx, txnHash) + if err == nil { + // if a transaction is not yet enqueued due to nonce order, we mark it + // cancelled directly in the store + if txn.Status == TxStatusPending { + txn.Status = TxStatusFailed + txn.Details = ErrTransactionCancelled.Error() + if err := t.store.StoreTransaction(ctx, txn, nil); err != nil { + t.logger.Error("Failed to store cancelled transaction", "hash", txnHash.Hex(), "error", err) + return false, fmt.Errorf("failed to store cancelled transaction: %w", err) + } + t.logger.Info("Transaction cancelled before processing", "hash", txnHash.Hex()) + return true, nil + } + } + t.logger.Warn("Transaction not found", "hash", txnHash.Hex()) + return false, nil + }() } t.logger.Info("Cancelling transaction", "hash", txnHash.Hex()) @@ -275,7 +300,9 @@ func (t *TxSender) Start(ctx context.Context) chan struct{} { t.logger.Info("Context cancelled, stopping TxSender") return ctx.Err() case <-t.trigger: + t.processMu.Lock() t.processQueuedTransactions(t.egCtx) + t.processMu.Unlock() } } }) @@ -339,6 +366,8 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { case t.workerPool <- struct{}{}: t.eg.Go(func() error { defer func() { <-t.workerPool }() + defer t.triggerSender() // Trigger to reprocess after this transaction + canExecute, cancel := t.markInflight(txn) if !canExecute { // Transaction is already being processed or sender has an inflight transaction @@ -351,6 +380,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { t.logger.Error("Failed to process transaction", "sender", txn.Sender.Hex(), "error", err) txn.Status = TxStatusFailed txn.Details = err.Error() + t.clearBlockAttemptHistory(txn.Hash()) return t.store.StoreTransaction(ctx, txn, nil) } return nil @@ -541,6 +571,10 @@ func (t *TxSender) sendBid( cost, err := t.calculatePriceForNextBlock(txn, bidBlockNo, prices, optedInSlot) if err != nil { t.logger.Error("Failed to calculate price for next block", "error", err) + if errors.Is(err, ErrTimeoutExceeded) { + t.logger.Warn("Timeout exceeded while trying to process transaction", "txnHash", txn.Hash().Hex()) + return bidResult{}, ErrTimeoutExceeded + } return bidResult{}, &errRetry{ err: fmt.Errorf("failed to calculate price: %w", err), retryAfter: time.Second, @@ -664,6 +698,10 @@ func (t *TxSender) calculatePriceForNextBlock( } } + if time.Since(attempts.startTime) > transactionTimeout { + return nil, ErrTimeoutExceeded + } + // default confidence level for the next block confidence := defaultConfidence isRetry := false