diff --git a/tools/preconf-rpc/blocktracker/blocktracker.go b/tools/preconf-rpc/blocktracker/blocktracker.go index c9287f414..256d43a2b 100644 --- a/tools/preconf-rpc/blocktracker/blocktracker.go +++ b/tools/preconf-rpc/blocktracker/blocktracker.go @@ -91,15 +91,19 @@ func (b *blockTracker) Start(ctx context.Context) <-chan struct{} { b.log.Error("Block not found in cache", "blockNumber", bNo) continue } + txnsToClear := make([]common.Hash, 0) + b.txnToCheckMu.Lock() for txHash, resultCh := range b.txnsToCheck { if txn := block.Transaction(txHash); txn != nil { resultCh <- bNo close(resultCh) - b.txnToCheckMu.Lock() - delete(b.txnsToCheck, txHash) - b.txnToCheckMu.Unlock() + txnsToClear = append(txnsToClear, txHash) } } + for _, txHash := range txnsToClear { + delete(b.txnsToCheck, txHash) + } + b.txnToCheckMu.Unlock() } } }) diff --git a/tools/preconf-rpc/handlers/handlers.go b/tools/preconf-rpc/handlers/handlers.go index 7c0acf4f6..a5dc80a85 100644 --- a/tools/preconf-rpc/handlers/handlers.go +++ b/tools/preconf-rpc/handlers/handlers.go @@ -18,6 +18,7 @@ import ( const ( bridgeLimitWei = 1000000000000000000 // 1 ETH + defaultSubsidy = 10000000000000000 // 0.01 ETH ) type positionConstraintKey struct{} @@ -37,6 +38,9 @@ type Store interface { GetTransactionLogs(ctx context.Context, txnHash common.Hash) ([]*types.Log, error) GetBalance(ctx context.Context, account common.Address) (*big.Int, error) GetCurrentNonce(ctx context.Context, account common.Address) uint64 + HasBalance(ctx context.Context, account common.Address, amount *big.Int) bool + AlreadySubsidized(ctx context.Context, account common.Address) bool + AddSubsidy(ctx context.Context, account common.Address, amount *big.Int) error } type BlockTracker interface { @@ -47,6 +51,7 @@ type BlockTracker interface { type Sender interface { Enqueue(ctx context.Context, txn *sender.Transaction) error CancelTransaction(ctx context.Context, txHash common.Hash) (bool, error) + WaitForReceiptAvailable(ctx context.Context, txHash common.Hash) <-chan struct{} } func SetPositionConstraint(ctx context.Context, constraint *bidderapiv1.PositionConstraint) context.Context { @@ -135,9 +140,17 @@ func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) { return maxPriorityFeeJSON, false, nil }) server.RegisterHandler("eth_sendRawTransaction", h.handleSendRawTx) + server.RegisterHandler("eth_sendRawTransactionSync", h.handleSendRawTxSync) server.RegisterHandler("eth_getTransactionReceipt", h.handleGetTxReceipt) server.RegisterHandler("eth_getTransactionCount", h.handleGetTxCount) server.RegisterHandler("eth_getBlockByHash", h.handleGetBlockByHash) + // Prevent spam + server.RegisterHandler("eth_sendBundle", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) { + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeInvalidRequest, + "eth_sendBundle is not supported", + ) + }) // Custom methods for MEV Commit server.RegisterHandler("mevcommit_optInBlock", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) { timeToOptIn, err := h.bidder.Estimate() @@ -189,6 +202,25 @@ func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) { h.logger.Debug("Estimated bridge price", "bidAmount", bridgeCost, "bridgeAddress", h.bridgeAddress.Hex()) return resultJSON, false, nil }) + server.RegisterHandler("mevcommit_estimateBidPricePerGas", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) { + blockPrices := h.pricer.EstimatePrice(ctx) + + minPrice, maxPrice := getMinMaxPrice(blockPrices) + result := map[string]interface{}{ + "minGasPrice": minPrice.String(), + "maxGasPrice": maxPrice.String(), + } + resultJSON, err := json.Marshal(result) + if err != nil { + h.logger.Error("Failed to marshal gas price estimate to JSON", "error", err) + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeCustomError, + "failed to marshal gas price estimate", + ) + } + return resultJSON, false, nil + }) + server.RegisterHandler("mevcommit_cancelTransaction", h.handleCancelTransaction) server.RegisterHandler("mevcommit_getTransactionCommitments", h.handleGetTxCommitments) server.RegisterHandler("mevcommit_getBalance", h.handleMevCommitGetBalance) @@ -205,6 +237,24 @@ func getNextBlockPrice(blockPrices map[int64]float64) *big.Int { return big.NewInt(0) // Return zero if no suitable estimate is found } +func getMinMaxPrice(blockPrices map[int64]float64) (*big.Int, *big.Int) { + minPrice := big.NewInt(0) + maxPrice := big.NewInt(0) + + for confidence, price := range blockPrices { + if confidence == 90 { + minPriceInWei := price * 1e9 // Convert Gwei to Wei + minPrice = new(big.Int).SetUint64(uint64(minPriceInWei)) + } + if confidence == 99 { + maxPriceInWei := price * 1e9 // Convert Gwei to Wei + maxPrice = new(big.Int).SetUint64(uint64(maxPriceInWei)) + } + } + + return minPrice, maxPrice +} + func (h *rpcMethodHandler) handleGetBlockByHash( ctx context.Context, params ...any, @@ -369,6 +419,10 @@ func (h *rpcMethodHandler) handleSendRawTx( } } + if err := h.subsidizeOnce(ctx, txSender); err != nil { + h.logger.Warn("Failed to subsidize user", "error", err, "sender", txSender.Hex()) + } + txnToEnqueue := &sender.Transaction{ Transaction: txn, Raw: rawTxHex, @@ -385,7 +439,7 @@ func (h *rpcMethodHandler) handleSendRawTx( h.logger.Error("Failed to enqueue transaction for sending", "error", err, "sender", txSender.Hex()) return nil, false, rpcserver.NewJSONErr( rpcserver.CodeCustomError, - "failed to enqueue transaction for sending", + err.Error(), ) } @@ -401,6 +455,110 @@ func (h *rpcMethodHandler) handleSendRawTx( return txHashJSON, false, nil } +func (h *rpcMethodHandler) handleSendRawTxSync( + ctx context.Context, + params ...any, +) (json.RawMessage, bool, error) { + if len(params) != 1 { + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeInvalidRequest, + "sendRawTx requires exactly one parameter", + ) + } + if params[0] == nil { + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeParseError, + "sendRawTx parameter cannot be null", + ) + } + + rawTxHex := params[0].(string) + if len(rawTxHex) < 2 || rawTxHex[:2] != "0x" { + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeParseError, + "sendRawTx parameter must be a hex string starting with '0x'", + ) + } + + decodedTxn, err := hex.DecodeString(rawTxHex[2:]) + if err != nil { + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeParseError, + "sendRawTx parameter must be a valid hex string", + ) + } + + txn := new(types.Transaction) + if err := txn.UnmarshalBinary(decodedTxn); err != nil { + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeParseError, + "sendRawTx parameter must be a valid transaction", + ) + } + + txSender, err := types.Sender(types.LatestSignerForChainID(txn.ChainId()), txn) + if err != nil { + h.logger.Error("Failed to get transaction sender", "error", err) + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeCustomError, + "failed to get transaction sender", + ) + } + + txType := sender.TxTypeRegular + switch { + case txn.To().Cmp(h.depositAddress) == 0: + txType = sender.TxTypeDeposit + case txn.To().Cmp(h.bridgeAddress) == 0: + txType = sender.TxTypeInstantBridge + if txn.Value().Cmp(big.NewInt(bridgeLimitWei)) > 0 { + h.logger.Error("Bridge transaction with value greater than limit", "value", txn.Value().String()) + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeCustomError, + fmt.Sprintf("bridge transaction value exceeds limit %d wei", bridgeLimitWei), + ) + } + } + + if err := h.subsidizeOnce(ctx, txSender); err != nil { + h.logger.Warn("Failed to subsidize user", "error", err, "sender", txSender.Hex()) + } + + txnToEnqueue := &sender.Transaction{ + Transaction: txn, + Raw: rawTxHex, + Sender: txSender, + Type: txType, + } + constraint, ok := getPositionConstraint(ctx) + if ok { + txnToEnqueue.Constraint = constraint + } + + err = h.sndr.Enqueue(ctx, txnToEnqueue) + if err != nil { + h.logger.Error("Failed to enqueue transaction for sending", "error", err, "sender", txSender.Hex()) + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeCustomError, + err.Error(), + ) + } + + waitCh := h.sndr.WaitForReceiptAvailable(ctx, txn.Hash()) + select { + case <-waitCh: + // Receipt is now available + case <-ctx.Done(): + h.logger.Error("Context cancelled while waiting for transaction receipt", "txHash", txn.Hash().Hex()) + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeCustomError, + "context cancelled while waiting for transaction receipt", + ) + } + + return h.handleGetTxReceipt(ctx, txn.Hash().Hex()) +} + func (h *rpcMethodHandler) handleGetTxReceipt(ctx context.Context, params ...any) (json.RawMessage, bool, error) { if len(params) != 1 { return nil, false, rpcserver.NewJSONErr( @@ -443,6 +601,15 @@ func (h *rpcMethodHandler) handleGetTxReceipt(ctx context.Context, params ...any ) } + commitments, err := h.store.GetTransactionCommitments(ctx, txHash) + if err != nil && txn.Status != sender.TxStatusFailed { + h.logger.Error("Failed to get transaction commitments", "error", err, "txHash", txHash) + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeCustomError, + "failed to get transaction commitments", + ) + } + result := map[string]interface{}{ "type": hexutil.Uint(txn.Transaction.Type()), "transactionHash": txn.Hash().Hex(), @@ -463,6 +630,7 @@ func (h *rpcMethodHandler) handleGetTxReceipt(ctx context.Context, params ...any result["status"] = hexutil.Uint64(types.ReceiptStatusSuccessful) result["blockHash"] = txn.Hash().Hex() result["blockNumber"] = hexutil.EncodeBig(big.NewInt(txn.BlockNumber)) + result["maxBidAmount"] = getFinalBidAmount(commitments).String() } receiptJSON, err := json.Marshal(result) @@ -654,3 +822,27 @@ func (r *rpcMethodHandler) handleCancelTransaction(ctx context.Context, params . r.logger.Info("Transaction cancelled successfully", "txHash", txHash) return json.RawMessage(fmt.Sprintf(`{"cancelled": true, "txHash": "%s"}`, txHash.Hex())), false, nil } + +func (r *rpcMethodHandler) subsidizeOnce(ctx context.Context, account common.Address) error { + if r.store.HasBalance(ctx, account, big.NewInt(1)) { + return nil + } + if r.store.AlreadySubsidized(ctx, account) { + return nil + } + r.logger.Info("Subsidizing account for first transaction", "account", account.Hex()) + return r.store.AddSubsidy(ctx, account, big.NewInt(defaultSubsidy)) +} + +func getFinalBidAmount(cmts []*bidderapiv1.Commitment) *big.Int { + finalBid := big.NewInt(0) + for _, cmt := range cmts { + bidAmount, ok := new(big.Int).SetString(cmt.BidAmount, 10) + if ok { + if bidAmount.Cmp(finalBid) > 0 { + finalBid = bidAmount + } + } + } + return finalBid +} diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index 35b0174d1..c7aa93ff8 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -57,6 +57,8 @@ var ( ErrTransactionCancelled = errors.New("transaction cancelled by user") ErrTimeoutExceeded = errors.New("timeout exceeded while waiting for transaction to be processed") ErrMaxAttemptsPerBlockExceeded = errors.New("maximum attempts exceeded for transaction in the current block") + ErrNonceTooHigh = errors.New("nonce too high") + ErrNonceTooLow = errors.New("nonce too low") ) type Transaction struct { @@ -105,6 +107,7 @@ type BlockTracker interface { WaitForTxnInclusion(txnHash common.Hash) chan uint64 NextBlockNumber() (uint64, time.Duration, error) LatestBlockNumber() uint64 + AccountNonce(ctx context.Context, account common.Address) (uint64, error) } type Transferer interface { @@ -152,6 +155,8 @@ type TxSender struct { fastTrack func(cmts []*bidderapiv1.Commitment, optedInSlot bool) bool bidTimeout time.Duration timeoutMtx sync.RWMutex + receiptSignal map[common.Hash][]chan struct{} + receiptMtx sync.Mutex } func noOpFastTrack(_ []*bidderapiv1.Commitment, _ bool) bool { @@ -192,6 +197,7 @@ func NewTxSender( simulator: simulator, fastTrack: noOpFastTrack, bidTimeout: bidTimeout, + receiptSignal: make(map[common.Hash][]chan struct{}), }, nil } @@ -217,9 +223,22 @@ func validateTransaction(tx *Transaction) error { return nil } -func (t *TxSender) hasLowerNonce(ctx context.Context, tx *Transaction) bool { - currentNonce := t.store.GetCurrentNonce(ctx, tx.Sender) - return tx.Nonce() < currentNonce +func (t *TxSender) hasCorrectNonce(ctx context.Context, tx *Transaction) error { + currentNonce := t.store.GetCurrentNonce(ctx, tx.Sender) + 1 + backendNonce, err := t.blockTracker.AccountNonce(ctx, tx.Sender) + if err == nil { + if backendNonce > currentNonce { + currentNonce = backendNonce + } + } + switch { + case tx.Nonce() < currentNonce: + return ErrNonceTooLow + case tx.Nonce() > currentNonce: + return ErrNonceTooHigh + } + + return nil } func (t *TxSender) triggerSender() { @@ -240,8 +259,8 @@ func (t *TxSender) Enqueue(ctx context.Context, tx *Transaction) error { return err } - if t.hasLowerNonce(ctx, tx) { - return errors.New("transaction has a lower nonce than the current highest nonce") + if err := t.hasCorrectNonce(ctx, tx); err != nil { + return err } if err := t.store.AddQueuedTransaction(ctx, tx); err != nil { @@ -253,6 +272,34 @@ func (t *TxSender) Enqueue(ctx context.Context, tx *Transaction) error { return nil } +func (t *TxSender) WaitForReceiptAvailable(ctx context.Context, txnHash common.Hash) <-chan struct{} { + t.receiptMtx.Lock() + defer t.receiptMtx.Unlock() + + signal, found := t.receiptSignal[txnHash] + if !found { + signal = []chan struct{}{} + } + newSignal := make(chan struct{}) + signal = append(signal, newSignal) + t.receiptSignal[txnHash] = signal + return newSignal +} + +func (t *TxSender) signalReceiptAvailable(txnHash common.Hash) { + t.receiptMtx.Lock() + defer t.receiptMtx.Unlock() + + signals, found := t.receiptSignal[txnHash] + if !found { + return + } + for _, sig := range signals { + close(sig) + } + delete(t.receiptSignal, txnHash) +} + func (t *TxSender) CancelTransaction(ctx context.Context, txnHash common.Hash) (bool, error) { t.inflightMu.RLock() cancel, found := t.inflightTxns[txnHash] @@ -432,6 +479,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { txn.Status = TxStatusFailed txn.Details = err.Error() t.clearBlockAttemptHistory(txn, time.Now()) + defer t.signalReceiptAvailable(txn.Hash()) return t.store.StoreTransaction(ctx, txn, nil, nil) } return nil @@ -487,6 +535,7 @@ BID_LOOP: if err := t.store.StoreTransaction(ctx, txn, txn.commitments, txn.logs); err != nil { return fmt.Errorf("failed to store preconfirmed transaction: %w", err) } + t.signalReceiptAvailable(txn.Hash()) } retryTicker.Reset(result.timeUntillNextBlock + 1*time.Second) default: @@ -521,6 +570,7 @@ BID_LOOP: if err := t.store.StoreTransaction(ctx, txn, txn.commitments, txn.logs); err != nil { return fmt.Errorf("failed to store preconfirmed transaction: %w", err) } + t.signalReceiptAvailable(txn.Hash()) } endTime := time.Now() if len(txn.commitments) > 0 { @@ -682,17 +732,6 @@ func (t *TxSender) sendBid( if !isRetry { logs, err := t.simulator.Simulate(ctx, txn.Raw) if err != nil { - if t.blockTracker.LatestBlockNumber() < bidBlockNo { - logger.Warn( - "Simulation failed, but block may not be mined yet, will retry", - "error", err, - "blockNumber", bidBlockNo, - ) - return bidResult{}, &errRetry{ - err: fmt.Errorf("simulation may have failed due to unmined block: %w", err), - retryAfter: time.Second, - } - } logger.Error("Failed to simulate transaction", "error", err, "blockNumber", bidBlockNo) return bidResult{}, fmt.Errorf("failed to simulate transaction: %w", err) } @@ -764,6 +803,7 @@ BID_LOOP: if err := t.store.StoreTransaction(ctx, txn, txn.commitments, txn.logs); err != nil { logger.Error("Failed to store fast-tracked transaction", "error", err) } + t.signalReceiptAvailable(txn.Hash()) } case bidder.BidStatusCancelled: logger.Warn("Bid context cancelled by the bidder") diff --git a/tools/preconf-rpc/sender/sender_test.go b/tools/preconf-rpc/sender/sender_test.go index c9929a5c2..de1b00ef9 100644 --- a/tools/preconf-rpc/sender/sender_test.go +++ b/tools/preconf-rpc/sender/sender_test.go @@ -253,6 +253,10 @@ func (m *mockBlockTracker) LatestBlockNumber() uint64 { return 0 } +func (m *mockBlockTracker) AccountNonce(ctx context.Context, account common.Address) (uint64, error) { + return 0, nil +} + type mockTransferer struct{} func (m *mockTransferer) Transfer(ctx context.Context, to common.Address, chainID *big.Int, amount *big.Int) error { @@ -334,6 +338,8 @@ func TestSender(t *testing.T) { t.Fatalf("failed to enqueue transaction: %v", err) } + waitCh := sndr.WaitForReceiptAvailable(ctx, tx1.Hash()) + // Simulate opted in block bidderImpl.optinEstimate <- 2 @@ -386,6 +392,8 @@ func TestSender(t *testing.T) { close(resC) bidderImpl.out <- resC + <-waitCh + res := <-st.preconfirmedTxns if res.txn == nil { t.Fatal("expected a preconfirmed transaction, got nil") diff --git a/tools/preconf-rpc/store/store.go b/tools/preconf-rpc/store/store.go index 3c694f53f..039ef614b 100644 --- a/tools/preconf-rpc/store/store.go +++ b/tools/preconf-rpc/store/store.go @@ -596,3 +596,31 @@ func (s *rpcstore) AddSubsidy( return s.AddBalance(ctx, account, amount) } + +func (s *rpcstore) AlreadySubsidized( + ctx context.Context, + account common.Address, +) bool { + if account == (common.Address{}) { + return false + } + + query := ` + SELECT balance + FROM subsidies + WHERE account = $1; + ` + row := s.db.QueryRowContext(ctx, query, account.Hex()) + var currentBalanceString string + err := row.Scan(¤tBalanceString) + if err != nil { + return false + } + + currentBalance, ok := new(big.Int).SetString(currentBalanceString, 10) + if !ok { + return false + } + + return currentBalance.Sign() > 0 +}