Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions tools/preconf-rpc/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ const (
)

const (
blockTime = 12 // seconds, typical Ethereum block time
bidTimeout = 3 * time.Second // timeout for bid operations
defaultConfidence = 90 // default confidence level for the next block
confidenceSecondAttempt = 95 // confidence level for the second attempt
confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts
blockTime = 12 // seconds, typical Ethereum block time
bidTimeout = 3 * time.Second // timeout for bid operations
defaultConfidence = 90 // default confidence level for the next block
confidenceSecondAttempt = 95 // confidence level for the second attempt
confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts
transactionTimeout = 10 * time.Minute // timeout for transaction processing
)

var (
Expand All @@ -52,6 +53,7 @@ var (
ErrNegativeTransactionValue = errors.New("negative transaction value")
ErrZeroGasLimit = errors.New("zero gas limit")
ErrTransactionCancelled = errors.New("transaction cancelled by user")
ErrTimeoutExceeded = errors.New("timeout exceeded while waiting for transaction to be processed")
)

type Transaction struct {
Expand Down Expand Up @@ -125,6 +127,7 @@ type TxSender struct {
inflightTxns map[common.Hash]chan struct{}
inflightAccount map[common.Address]struct{}
inflightMu sync.RWMutex
processMu sync.RWMutex
txnAttemptHistory *lru.Cache[common.Hash, *txnAttempt]
}

Expand Down Expand Up @@ -218,8 +221,30 @@ func (t *TxSender) CancelTransaction(ctx context.Context, txnHash common.Hash) (
cancel, found := t.inflightTxns[txnHash]
t.inflightMu.RUnlock()
if !found {
t.logger.Warn("Transaction not found in flight", "hash", txnHash.Hex())
return false, nil
return func() (bool, error) {
// we need to hold the processMu lock till we check as a parallel goroutine
// might try to process the transaction and update its status
t.processMu.RLock()
defer t.processMu.RUnlock()

txn, err := t.store.GetTransactionByHash(ctx, txnHash)
if err == nil {
// if a transaction is not yet enqueued due to nonce order, we mark it
// cancelled directly in the store
if txn.Status == TxStatusPending {
txn.Status = TxStatusFailed
txn.Details = ErrTransactionCancelled.Error()
if err := t.store.StoreTransaction(ctx, txn, nil); err != nil {
t.logger.Error("Failed to store cancelled transaction", "hash", txnHash.Hex(), "error", err)
return false, fmt.Errorf("failed to store cancelled transaction: %w", err)
}
t.logger.Info("Transaction cancelled before processing", "hash", txnHash.Hex())
return true, nil
}
}
t.logger.Warn("Transaction not found", "hash", txnHash.Hex())
return false, nil
}()
}

t.logger.Info("Cancelling transaction", "hash", txnHash.Hex())
Expand Down Expand Up @@ -275,7 +300,9 @@ func (t *TxSender) Start(ctx context.Context) chan struct{} {
t.logger.Info("Context cancelled, stopping TxSender")
return ctx.Err()
case <-t.trigger:
t.processMu.Lock()
t.processQueuedTransactions(t.egCtx)
t.processMu.Unlock()
}
}
})
Expand Down Expand Up @@ -339,6 +366,8 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) {
case t.workerPool <- struct{}{}:
t.eg.Go(func() error {
defer func() { <-t.workerPool }()
defer t.triggerSender() // Trigger to reprocess after this transaction

canExecute, cancel := t.markInflight(txn)
if !canExecute {
// Transaction is already being processed or sender has an inflight transaction
Expand All @@ -351,6 +380,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) {
t.logger.Error("Failed to process transaction", "sender", txn.Sender.Hex(), "error", err)
txn.Status = TxStatusFailed
txn.Details = err.Error()
t.clearBlockAttemptHistory(txn.Hash())
return t.store.StoreTransaction(ctx, txn, nil)
}
return nil
Expand Down Expand Up @@ -541,6 +571,10 @@ func (t *TxSender) sendBid(
cost, err := t.calculatePriceForNextBlock(txn, bidBlockNo, prices, optedInSlot)
if err != nil {
t.logger.Error("Failed to calculate price for next block", "error", err)
if errors.Is(err, ErrTimeoutExceeded) {
t.logger.Warn("Timeout exceeded while trying to process transaction", "txnHash", txn.Hash().Hex())
return bidResult{}, ErrTimeoutExceeded
}
return bidResult{}, &errRetry{
err: fmt.Errorf("failed to calculate price: %w", err),
retryAfter: time.Second,
Expand Down Expand Up @@ -664,6 +698,10 @@ func (t *TxSender) calculatePriceForNextBlock(
}
}

if time.Since(attempts.startTime) > transactionTimeout {
return nil, ErrTimeoutExceeded
}

// default confidence level for the next block
confidence := defaultConfidence
isRetry := false
Expand Down
Loading