diff --git a/tools/preconf-rpc/blocktracker/blocktracker.go b/tools/preconf-rpc/blocktracker/blocktracker.go index 00caced10..3c6780efa 100644 --- a/tools/preconf-rpc/blocktracker/blocktracker.go +++ b/tools/preconf-rpc/blocktracker/blocktracker.go @@ -85,12 +85,16 @@ func (b *blockTracker) LatestBlockNumber() uint64 { } func (b *blockTracker) NextBlockNumber() (uint64, time.Duration, error) { - block, found := b.blocks.Get(b.latestBlockNo.Load()) + latestBlockNo := b.latestBlockNo.Load() + block, found := b.blocks.Get(latestBlockNo) if !found { return 0, 0, errors.New("latest block not found in cache") } blockTime := time.Unix(int64(block.Time()), 0) - return b.latestBlockNo.Load() + 1, time.Until(blockTime.Add(12 * time.Second)), nil + if time.Since(blockTime) >= 11*time.Second { + return latestBlockNo + 2, time.Until(blockTime.Add(24 * time.Second)), nil + } + return latestBlockNo + 1, time.Until(blockTime.Add(12 * time.Second)), nil } func (b *blockTracker) CheckTxnInclusion( diff --git a/tools/preconf-rpc/handlers/handlers.go b/tools/preconf-rpc/handlers/handlers.go index df6023ccb..5d51d7436 100644 --- a/tools/preconf-rpc/handlers/handlers.go +++ b/tools/preconf-rpc/handlers/handlers.go @@ -83,7 +83,7 @@ func NewRPCMethodHandler( func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) { // Ethereum JSON-RPC methods overridden - server.RegisterHandler("eth_getBlockNumber", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) { + server.RegisterHandler("eth_blockNumber", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) { blockNumber := h.blockTracker.LatestBlockNumber() blockNumberJSON, err := json.Marshal(hexutil.Uint64(blockNumber)) @@ -411,8 +411,7 @@ func (h *rpcMethodHandler) handleGetTxReceipt(ctx context.Context, params ...any return nil, true, nil } - if txn.Status != sender.TxStatusFailed && - (txn.Status != sender.TxStatusPreConfirmed || h.blockTracker.LatestBlockNumber() > uint64(txn.BlockNumber)) { + if txn.Status != sender.TxStatusFailed && txn.Status != sender.TxStatusPreConfirmed { return nil, true, nil } @@ -571,11 +570,8 @@ func (h *rpcMethodHandler) handleMevCommitGetBalance(ctx context.Context, params balance, err := h.store.GetBalance(ctx, common.HexToAddress(account)) if err != nil { - h.logger.Error("Failed to get balance for account", "error", err, "account", account) - return nil, false, rpcserver.NewJSONErr( - rpcserver.CodeCustomError, - "failed to get balance for account", - ) + h.logger.Warn("Failed to get balance for account, returning 0", "error", err, "account", account) + balance = big.NewInt(0) } return json.RawMessage(fmt.Sprintf(`{"balance": "%s"}`, balance)), false, nil diff --git a/tools/preconf-rpc/notifier/notifier.go b/tools/preconf-rpc/notifier/notifier.go index 66ce1a6a6..185b134de 100644 --- a/tools/preconf-rpc/notifier/notifier.go +++ b/tools/preconf-rpc/notifier/notifier.go @@ -282,7 +282,7 @@ func (n *Notifier) StartTransactionNotifier( func (n *Notifier) NotifyTransactionStatus( txn *sender.Transaction, noOfAttempts int, - start time.Time, + timeTaken time.Duration, ) { n.queuedMu.Lock() defer n.queuedMu.Unlock() @@ -290,7 +290,7 @@ func (n *Notifier) NotifyTransactionStatus( n.queuedTxns = append(n.queuedTxns, txnInfo{ txn: txn, noOfAttempts: noOfAttempts, - timeTaken: time.Since(start).Round(time.Millisecond), + timeTaken: timeTaken, }) } diff --git a/tools/preconf-rpc/rpcserver/rpcserver.go b/tools/preconf-rpc/rpcserver/rpcserver.go index d4461cc2f..1d4a20af8 100644 --- a/tools/preconf-rpc/rpcserver/rpcserver.go +++ b/tools/preconf-rpc/rpcserver/rpcserver.go @@ -70,8 +70,6 @@ var cacheMethods = map[string]bool{ "eth_call": true, "eth_getCode": true, "eth_getStorageAt": true, - "eth_feeHistory": true, - "eth_gasPrice": true, "eth_getLogs": true, "net_version": true, } @@ -168,7 +166,7 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() defer func() { - s.logger.Debug("Request processing time", "method", req.Method, "id", req.ID, "duration", time.Since(start)) + s.logger.Info("Request processing time", "method", req.Method, "id", req.ID, "duration", time.Since(start).String()) }() if cacheMethods[req.Method] { @@ -201,7 +199,7 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if cacheMethods[req.Method] && resp.Result != nil { key := cacheKey(req.Method, req.Params) - s.cache.Add(key, cacheEntry{ + _ = s.cache.Add(key, cacheEntry{ until: time.Now().Add(pickTTL(req.Method, *resp.Result)), data: *resp.Result, }) @@ -350,8 +348,6 @@ func pickTTL(method string, params json.RawMessage) time.Duration { return 24 * time.Hour case "eth_getCode": return 24 * time.Hour - case "eth_feeHistory": - return 3 * time.Second case "eth_call": // if block tag provided and hex number → immutable if strings.HasSuffix(string(params), "\"") { // cheap check @@ -359,7 +355,7 @@ func pickTTL(method string, params json.RawMessage) time.Duration { return 24 * time.Hour } } - return 1 * time.Second + return 1 * time.Millisecond default: return 2 * time.Second } diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index c444dfcef..a08d46b69 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -37,12 +37,12 @@ const ( ) const ( - blockTime = 12 // seconds, typical Ethereum block time - bidTimeout = 100 * time.Millisecond // timeout for bid operations - defaultConfidence = 90 // default confidence level for the next block - confidenceSecondAttempt = 95 // confidence level for the second attempt - confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts - transactionTimeout = 10 * time.Minute // timeout for transaction processing + blockTime = 12 // seconds, typical Ethereum block time + bidTimeout = 3 * time.Second // timeout for bid operation + defaultConfidence = 90 // default confidence level for the next block + confidenceSecondAttempt = 95 // confidence level for the second attempt + confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts + transactionTimeout = 10 * time.Minute // timeout for transaction processing ) var ( @@ -113,7 +113,7 @@ type txnAttempt struct { } type Notifier interface { - NotifyTransactionStatus(txn *Transaction, noOfAttempts int, start time.Time) + NotifyTransactionStatus(txn *Transaction, noOfAttempts int, timeTaken time.Duration) } type TxSender struct { @@ -135,6 +135,8 @@ type TxSender struct { txnAttemptHistory *lru.Cache[common.Hash, *txnAttempt] notifier Notifier fastTrack func(cmts []*bidderapiv1.Commitment, optedInSlot bool) bool + bidTimeout time.Duration + timeoutMtx sync.RWMutex } func noOpFastTrack(_ []*bidderapiv1.Commitment, _ bool) bool { @@ -177,6 +179,7 @@ func NewTxSender( txnAttemptHistory: txnAttemptHistory, notifier: notifier, fastTrack: fastTrack, + bidTimeout: bidTimeout, }, nil } @@ -307,6 +310,20 @@ func (t *TxSender) CancelTransaction(ctx context.Context, txnHash common.Hash) ( } } +func (t *TxSender) UpdateBidTimeout(timeout time.Duration) { + t.timeoutMtx.Lock() + defer t.timeoutMtx.Unlock() + + t.bidTimeout = timeout +} + +func (t *TxSender) getBidTimeout() time.Duration { + t.timeoutMtx.RLock() + defer t.timeoutMtx.RUnlock() + + return t.bidTimeout +} + func (t *TxSender) Start(ctx context.Context) chan struct{} { t.eg, t.egCtx = errgroup.WithContext(ctx) done := make(chan struct{}) @@ -374,7 +391,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { t.logger.Info("No queued transactions to process") return } - t.logger.Info("Processing queued transactions", "count", len(txns)) + t.logger.Debug("Processing queued transactions", "count", len(txns)) for _, txn := range txns { txn := txn // capture range variable select { @@ -398,7 +415,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { t.logger.Error("Failed to process transaction", "sender", txn.Sender.Hex(), "error", err) txn.Status = TxStatusFailed txn.Details = err.Error() - t.clearBlockAttemptHistory(txn) + t.clearBlockAttemptHistory(txn, time.Now()) return t.store.StoreTransaction(ctx, txn, nil) } return nil @@ -422,6 +439,7 @@ BID_LOOP: default: } + preConfirmed := false result, err = t.sendBid(ctx, txn) switch { case err != nil: @@ -449,13 +467,16 @@ BID_LOOP: txn.BlockNumber = int64(result.blockNumber) t.logger.Info( "Transaction fast-tracked based on commitments", + "transactionHash", txn.Hash().Hex(), "sender", txn.Sender.Hex(), "type", txn.Type, "blockNumber", result.blockNumber, "bidAmount", result.bidAmount.String(), ) - t.clearBlockAttemptHistory(txn) - break BID_LOOP + if err := t.store.StoreTransaction(ctx, txn, result.commitments); err != nil { + return fmt.Errorf("failed to store fast-tracked transaction: %w", err) + } + preConfirmed = true case result.optedInSlot: if result.noOfProviders == len(result.commitments) { // This means that all builders have committed to the bid and it @@ -465,20 +486,24 @@ BID_LOOP: txn.BlockNumber = int64(result.blockNumber) t.logger.Info( "Transaction pre-confirmed", + "transactionHash", txn.Hash().Hex(), "sender", txn.Sender.Hex(), "type", txn.Type, "blockNumber", result.blockNumber, "bidAmount", result.bidAmount.String(), ) - t.clearBlockAttemptHistory(txn) - break BID_LOOP + if err := t.store.StoreTransaction(ctx, txn, result.commitments); err != nil { + return fmt.Errorf("failed to store preconfirmed transaction: %w", err) + } + preConfirmed = true } default: } - if result.noOfProviders > len(result.commitments) { + if !preConfirmed && result.noOfProviders > len(result.commitments) { t.logger.Warn( "Not all builders committed to the bid", + "transactionHash", txn.Hash().Hex(), "noOfProviders", result.noOfProviders, "noOfCommitments", len(result.commitments), "sender", txn.Sender.Hex(), @@ -501,24 +526,30 @@ BID_LOOP: return fmt.Errorf("failed to check transaction inclusion: %w", err) } if included { - txn.Status = TxStatusConfirmed - txn.BlockNumber = int64(result.blockNumber) - t.logger.Info( - "Transaction confirmed for non opted-in slot", - "sender", txn.Sender.Hex(), - "type", txn.Type, - "blockNumber", result.blockNumber, - "bidAmount", result.bidAmount.String(), - ) - t.clearBlockAttemptHistory(txn) + if !preConfirmed { + txn.Status = TxStatusConfirmed + txn.BlockNumber = int64(result.blockNumber) + t.logger.Info( + "Transaction confirmed", + "transactionHash", txn.Hash().Hex(), + "sender", txn.Sender.Hex(), + "type", txn.Type, + "blockNumber", result.blockNumber, + "bidAmount", result.bidAmount.String(), + ) + if err := t.store.StoreTransaction(ctx, txn, result.commitments); err != nil { + return fmt.Errorf("failed to store preconfirmed transaction: %w", err) + } + } + endTime := time.Now() + if len(result.commitments) > 0 { + endTime = time.UnixMilli(result.commitments[len(result.commitments)-1].DispatchTimestamp) + } + t.clearBlockAttemptHistory(txn, endTime) break BID_LOOP } } - if err := t.store.StoreTransaction(ctx, txn, result.commitments); err != nil { - return fmt.Errorf("failed to store preconfirmed transaction: %w", err) - } - switch txn.Type { case TxTypeRegular: if err := t.store.DeductBalance(ctx, txn.Sender, result.bidAmount); err != nil { @@ -597,7 +628,7 @@ func (t *TxSender) sendBid( // Allow for certain level of tolerance w.r.t timestamps optedInSlot := math.Abs(float64(timeToOptIn)-float64(timeUntilNextBlock.Seconds())) < float64(blockTime/3) - cctx, cancel := context.WithTimeout(ctx, bidTimeout) + cctx, cancel := context.WithTimeout(ctx, t.getBidTimeout()) defer cancel() cost, err := t.calculatePriceForNextBlock(txn, bidBlockNo, prices, optedInSlot) @@ -663,7 +694,7 @@ func (t *TxSender) sendBid( WaitForOptIn: false, BlockNumber: uint64(bidBlockNo), RevertingTxHashes: []string{txn.Hash().Hex()}, - DecayDuration: bidTimeout * 2, + DecayDuration: t.getBidTimeout() * 2, }, ) if err != nil { @@ -782,7 +813,7 @@ func (t *TxSender) calculatePriceForNextBlock( ) } -func (t *TxSender) clearBlockAttemptHistory(txn *Transaction) { +func (t *TxSender) clearBlockAttemptHistory(txn *Transaction, endTime time.Time) { attempts, found := t.txnAttemptHistory.Get(txn.Hash()) if !found { return @@ -796,6 +827,7 @@ func (t *TxSender) clearBlockAttemptHistory(txn *Transaction) { t.logger.Info( "Clearing block attempt history for transaction", "hash", txn.Hash().Hex(), + "blockNumber", txn.BlockNumber, "blockAttempts", len(attempts.attempts), "startTime", attempts.startTime.Format(time.RFC3339), "startBlockNumber", attempts.attempts[0].blockNumber, @@ -804,5 +836,5 @@ func (t *TxSender) clearBlockAttemptHistory(txn *Transaction) { _ = t.txnAttemptHistory.Remove(txn.Hash()) - t.notifier.NotifyTransactionStatus(txn, totalAttempts, attempts.startTime) + t.notifier.NotifyTransactionStatus(txn, totalAttempts, endTime.Sub(attempts.startTime).Round(time.Millisecond)) } diff --git a/tools/preconf-rpc/sender/sender_test.go b/tools/preconf-rpc/sender/sender_test.go index e22263f8d..57f92f11f 100644 --- a/tools/preconf-rpc/sender/sender_test.go +++ b/tools/preconf-rpc/sender/sender_test.go @@ -261,7 +261,7 @@ type mockNotifier struct { notifications []string } -func (m *mockNotifier) NotifyTransactionStatus(txn *sender.Transaction, attempts int, start time.Time) { +func (m *mockNotifier) NotifyTransactionStatus(txn *sender.Transaction, attempts int, start time.Duration) { m.notifications = append(m.notifications, txn.Hash().Hex()) } @@ -399,6 +399,16 @@ func TestSender(t *testing.T) { t.Fatalf("expected 1 commitment, got %d", len(res.commitments)) } + checkOp := <-blockTracker.in + if checkOp.hash != tx1.Hash() { + t.Fatalf("expected transaction hash %s, got %s", tx1.Hash().Hex(), checkOp.hash.Hex()) + } + if checkOp.block != 1 { + t.Fatalf("expected block number 1, got %d", checkOp.block) + } + // Simulate transaction inclusion + blockTracker.out <- true + tx2 := &sender.Transaction{ Transaction: types.NewTransaction( 2, @@ -493,7 +503,7 @@ func TestSender(t *testing.T) { close(resC) bidder.out <- resC - checkOp := <-blockTracker.in + checkOp = <-blockTracker.in if checkOp.hash != tx2.Hash() { t.Fatalf("expected transaction hash %s, got %s", tx2.Hash().Hex(), checkOp.hash.Hex()) } diff --git a/tools/preconf-rpc/service/service.go b/tools/preconf-rpc/service/service.go index cf24c77f6..fe6b8e205 100644 --- a/tools/preconf-rpc/service/service.go +++ b/tools/preconf-rpc/service/service.go @@ -404,6 +404,52 @@ func New(config *Config) (*Service, error) { } }) + mux.HandleFunc("POST /subsidize", func(w http.ResponseWriter, r *http.Request) { + if err := checkAuthorization(r); err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + // Get account address and amount from URL params + account := r.URL.Query().Get("account") + if account == "" || !common.IsHexAddress(account) { + http.Error(w, "invalid or missing account address", http.StatusBadRequest) + return + } + + amountStr := r.URL.Query().Get("amount") + amount, ok := new(big.Int).SetString(amountStr, 10) + if !ok { + http.Error(w, "invalid amount", http.StatusBadRequest) + return + } + + if err := rpcstore.AddSubsidy(r.Context(), common.HexToAddress(account), amount); err != nil { + http.Error(w, fmt.Sprintf("failed to add subsidy: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + }) + + mux.HandleFunc("POST /update_bid_timeout", func(w http.ResponseWriter, r *http.Request) { + if err := checkAuthorization(r); err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + timeoutStr := r.URL.Query().Get("timeout") + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + http.Error(w, "invalid timeout", http.StatusBadRequest) + return + } + sndr.UpdateBidTimeout(timeout) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + }) + srv := http.Server{ Addr: fmt.Sprintf(":%d", config.HTTPPort), Handler: mux, diff --git a/tools/preconf-rpc/store/store.go b/tools/preconf-rpc/store/store.go index 1fb34479d..f54d47f05 100644 --- a/tools/preconf-rpc/store/store.go +++ b/tools/preconf-rpc/store/store.go @@ -48,6 +48,12 @@ CREATE TABLE IF NOT EXISTS balances ( balance NUMERIC(24, 0) );` +var subsidiesTable = ` +CREATE TABLE IF NOT EXISTS subsidies ( + account TEXT PRIMARY KEY, + balance NUMERIC(24, 0) +);` + type rpcstore struct { db *sql.DB } @@ -57,6 +63,7 @@ func New(db *sql.DB) (*rpcstore, error) { transactionsTable, commitmentsTable, balancesTable, + subsidiesTable, } { _, err := db.Exec(table) if err != nil { @@ -279,7 +286,7 @@ func (s *rpcstore) StoreTransaction( insertCommitment := ` INSERT INTO commitments (commitment_digest, transaction_hash, provider_address, commitment_data) VALUES ($1, $2, $3, $4) - ON CONFLICT (commitment_digest) DO NOTHING; + ON CONFLICT (commitment_digest) DO UPDATE SET commitment_data = EXCLUDED.commitment_data; ` commitmentData, err := proto.Marshal(commitment) if err != nil { @@ -353,7 +360,7 @@ func (s *rpcstore) GetCurrentNonce(ctx context.Context, sender common.Address) u query := ` SELECT COALESCE(MAX(nonce), 0) FROM mcTransactions - WHERE sender = $1 AND status = 'pending'; + WHERE sender = $1 AND (status = 'pending' OR status = 'pre-confirmed'); ` row := s.db.QueryRowContext(ctx, query, sender.Hex()) var nextNonce uint64 @@ -480,3 +487,30 @@ func (s *rpcstore) GetBalance( return balanceInt, nil } + +func (s *rpcstore) AddSubsidy( + ctx context.Context, + account common.Address, + amount *big.Int, +) error { + if account == (common.Address{}) || amount == nil || amount.Sign() <= 0 { + return fmt.Errorf("invalid account or amount: account=%s, amount=%s", account.Hex(), amount.String()) + } + + query := ` + INSERT INTO subsidies (account, balance) + VALUES ($1, $2) + ON CONFLICT (account) DO UPDATE SET balance = subsidies.balance + $2 + WHERE subsidies.balance + $2 >= 0; + ` + + _, err := s.db.ExecContext(ctx, query, account.Hex(), amount.String()) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("account %s not found or insufficient balance: %w", account.Hex(), ErrInsufficientBalance) + } + return fmt.Errorf("failed to add balance for account %s: %w", account.Hex(), err) + } + + return s.AddBalance(ctx, account, amount) +}