From 1431e0b0183a5ab921fbcfdc2ea05feac2db9db8 Mon Sep 17 00:00:00 2001 From: Alok Date: Wed, 13 Aug 2025 17:47:01 +0530 Subject: [PATCH 1/2] fix: issues from RPC testing --- tools/preconf-rpc/sender/sender.go | 41 ++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index 2331f03df..0e7501041 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -52,6 +52,7 @@ var ( ErrNegativeTransactionValue = errors.New("negative transaction value") ErrZeroGasLimit = errors.New("zero gas limit") ErrTransactionCancelled = errors.New("transaction cancelled by user") + ErrTimeoutExceeded = errors.New("timeout exceeded while waiting for transaction to be processed") ) type Transaction struct { @@ -125,6 +126,7 @@ type TxSender struct { inflightTxns map[common.Hash]chan struct{} inflightAccount map[common.Address]struct{} inflightMu sync.RWMutex + processMu sync.RWMutex txnAttemptHistory *lru.Cache[common.Hash, *txnAttempt] } @@ -218,8 +220,30 @@ func (t *TxSender) CancelTransaction(ctx context.Context, txnHash common.Hash) ( cancel, found := t.inflightTxns[txnHash] t.inflightMu.RUnlock() if !found { - t.logger.Warn("Transaction not found in flight", "hash", txnHash.Hex()) - return false, nil + return func() (bool, error) { + // we need to hold the processMu lock till we check as a parallel goroutine + // might try to process the transaction and update its status + t.processMu.RLock() + defer t.processMu.RUnlock() + + txn, err := t.store.GetTransactionByHash(ctx, txnHash) + if err == nil { + // if a transaction is not yet enqueued due to nonce order, we mark it + // cancelled directly in the store + if txn.Status == TxStatusPending { + txn.Status = TxStatusFailed + txn.Details = ErrTransactionCancelled.Error() + if err := t.store.StoreTransaction(ctx, txn, nil); err != nil { + t.logger.Error("Failed to store cancelled transaction", "hash", txnHash.Hex(), "error", err) + return false, fmt.Errorf("failed to store cancelled transaction: %w", err) + } + t.logger.Info("Transaction cancelled before processing", "hash", txnHash.Hex()) + return true, nil + } + } + t.logger.Warn("Transaction not found", "hash", txnHash.Hex()) + return false, nil + }() } t.logger.Info("Cancelling transaction", "hash", txnHash.Hex()) @@ -275,7 +299,9 @@ func (t *TxSender) Start(ctx context.Context) chan struct{} { t.logger.Info("Context cancelled, stopping TxSender") return ctx.Err() case <-t.trigger: + t.processMu.Lock() t.processQueuedTransactions(t.egCtx) + t.processMu.Unlock() } } }) @@ -339,6 +365,8 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { case t.workerPool <- struct{}{}: t.eg.Go(func() error { defer func() { <-t.workerPool }() + defer t.triggerSender() // Trigger to reprocess after this transaction + canExecute, cancel := t.markInflight(txn) if !canExecute { // Transaction is already being processed or sender has an inflight transaction @@ -351,6 +379,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { t.logger.Error("Failed to process transaction", "sender", txn.Sender.Hex(), "error", err) txn.Status = TxStatusFailed txn.Details = err.Error() + t.clearBlockAttemptHistory(txn.Hash()) return t.store.StoreTransaction(ctx, txn, nil) } return nil @@ -541,6 +570,10 @@ func (t *TxSender) sendBid( cost, err := t.calculatePriceForNextBlock(txn, bidBlockNo, prices, optedInSlot) if err != nil { t.logger.Error("Failed to calculate price for next block", "error", err) + if errors.Is(err, ErrTimeoutExceeded) { + t.logger.Warn("Timeout exceeded while trying to process transaction", "txnHash", txn.Hash().Hex()) + return bidResult{}, ErrTimeoutExceeded + } return bidResult{}, &errRetry{ err: fmt.Errorf("failed to calculate price: %w", err), retryAfter: time.Second, @@ -664,6 +697,10 @@ func (t *TxSender) calculatePriceForNextBlock( } } + if time.Since(attempts.startTime) > 10*time.Minute { + return nil, ErrTimeoutExceeded + } + // default confidence level for the next block confidence := defaultConfidence isRetry := false From 9dda6a7f9bba33a254e145e4d1e9bd0fc9cb1b73 Mon Sep 17 00:00:00 2001 From: Alok Date: Wed, 13 Aug 2025 20:30:11 +0530 Subject: [PATCH 2/2] fix: issues from RPC testing --- tools/preconf-rpc/sender/sender.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index 0e7501041..992dca739 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -37,11 +37,12 @@ const ( ) const ( - blockTime = 12 // seconds, typical Ethereum block time - bidTimeout = 3 * time.Second // timeout for bid operations - defaultConfidence = 90 // default confidence level for the next block - confidenceSecondAttempt = 95 // confidence level for the second attempt - confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts + blockTime = 12 // seconds, typical Ethereum block time + bidTimeout = 3 * time.Second // timeout for bid operations + defaultConfidence = 90 // default confidence level for the next block + confidenceSecondAttempt = 95 // confidence level for the second attempt + confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts + transactionTimeout = 10 * time.Minute // timeout for transaction processing ) var ( @@ -697,7 +698,7 @@ func (t *TxSender) calculatePriceForNextBlock( } } - if time.Since(attempts.startTime) > 10*time.Minute { + if time.Since(attempts.startTime) > transactionTimeout { return nil, ErrTimeoutExceeded }