From d25cb6217757ca8a449f0d473f8182edc7cccb52 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Fri, 1 Aug 2025 22:12:22 +0530 Subject: [PATCH 1/4] fix: preconf RPC CORS headers --- tools/preconf-rpc/rpcserver/rpcserver.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tools/preconf-rpc/rpcserver/rpcserver.go b/tools/preconf-rpc/rpcserver/rpcserver.go index ce2c81ac2..f424cb1ff 100644 --- a/tools/preconf-rpc/rpcserver/rpcserver.go +++ b/tools/preconf-rpc/rpcserver/rpcserver.go @@ -159,6 +159,7 @@ func (s *JSONRPCServer) writeResponse(w http.ResponseWriter, id any, result *jso Result: result, Error: nil, } + setCorsHeaders(w) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(response); err != nil { http.Error(w, "Failed to write response", http.StatusInternalServerError) @@ -178,6 +179,7 @@ func (s *JSONRPCServer) writeError(w http.ResponseWriter, id any, code int, mess Data: nil, }, } + setCorsHeaders(w) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(response); err != nil { http.Error(w, "Failed to write error response", http.StatusInternalServerError) @@ -206,6 +208,7 @@ func (s *JSONRPCServer) proxyRequest(w http.ResponseWriter, body []byte) { _ = resp.Body.Close() }() + setCorsHeaders(w) w.Header().Set("Content-Type", "application/json") w.WriteHeader(resp.StatusCode) rdr := io.LimitReader(resp.Body, defaultMaxBodySize) @@ -219,3 +222,9 @@ func (s *JSONRPCServer) proxyRequest(w http.ResponseWriter, body []byte) { return } } + +func setCorsHeaders(w http.ResponseWriter) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") +} From ea1db6f35a3240c06eba3537058685ce76139eb4 Mon Sep 17 00:00:00 2001 From: Alok Date: Sat, 2 Aug 2025 16:36:01 +0530 Subject: [PATCH 2/4] fix: temp --- .../preconf-rpc/blocktracker/blocktracker.go | 52 +++++++++++-------- tools/preconf-rpc/pricer/pricer.go | 49 ++++++++++++++--- 2 files changed, 74 insertions(+), 27 deletions(-) diff --git a/tools/preconf-rpc/blocktracker/blocktracker.go b/tools/preconf-rpc/blocktracker/blocktracker.go index ebecc7b7b..8304a9668 100644 --- a/tools/preconf-rpc/blocktracker/blocktracker.go +++ b/tools/preconf-rpc/blocktracker/blocktracker.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "math/big" + "sync" "sync/atomic" "time" @@ -22,7 +23,7 @@ type blockTracker struct { blocks *lru.Cache[uint64, *types.Block] client EthClient log *slog.Logger - checkTrigger chan struct{} + checkCond *sync.Cond } func NewBlockTracker(client EthClient, log *slog.Logger) (*blockTracker, error) { @@ -36,7 +37,7 @@ func NewBlockTracker(client EthClient, log *slog.Logger) (*blockTracker, error) blocks: cache, client: client, log: log, - checkTrigger: make(chan struct{}, 1), + checkCond: sync.NewCond(&sync.Mutex{}), }, nil } @@ -73,11 +74,9 @@ func (b *blockTracker) Start(ctx context.Context) <-chan struct{} { } func (b *blockTracker) triggerCheck() { - select { - case b.checkTrigger <- struct{}{}: - default: - // Non-blocking send, if channel is full, we skip - } + b.checkCond.L.Lock() + b.checkCond.Broadcast() + b.checkCond.L.Unlock() } func (b *blockTracker) LatestBlockNumber() uint64 { @@ -89,21 +88,33 @@ func (b *blockTracker) CheckTxnInclusion( txHash common.Hash, blockNumber uint64, ) (bool, error) { -WaitForBlock: - for { - select { - case <-ctx.Done(): - return false, ctx.Err() - case <-b.checkTrigger: - if blockNumber <= b.latestBlockNo.Load() { - break WaitForBlock - } + if blockNumber <= b.latestBlockNo.Load() { + return b.checkTxnInclusion(ctx, txHash, blockNumber) + } + + waitCh := make(chan struct{}) + go func() { + b.checkCond.L.Lock() + defer b.checkCond.L.Unlock() + for blockNumber > b.latestBlockNo.Load() { + b.checkCond.Wait() } + close(waitCh) + }() + + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-waitCh: + return b.checkTxnInclusion(ctx, txHash, blockNumber) } +} +func (b *blockTracker) checkTxnInclusion(ctx context.Context, txHash common.Hash, blockNumber uint64) (bool, error) { + var err error block, ok := b.blocks.Get(blockNumber) if !ok { - block, err := b.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber))) + block, err = b.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber))) if err != nil { b.log.Error("Failed to get block by number", "error", err, "blockNumber", blockNumber) return false, err @@ -111,10 +122,9 @@ WaitForBlock: _ = b.blocks.Add(blockNumber, block) } - for _, tx := range block.Transactions() { - if tx.Hash().Cmp(txHash) == 0 { - return true, nil - } + if txn := block.Transaction(txHash); txn != nil { + return true, nil } + return false, nil } diff --git a/tools/preconf-rpc/pricer/pricer.go b/tools/preconf-rpc/pricer/pricer.go index 892036340..27f05b5e1 100644 --- a/tools/preconf-rpc/pricer/pricer.go +++ b/tools/preconf-rpc/pricer/pricer.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "io" + "log/slog" "net/http" + "sync" "time" ) @@ -21,23 +23,58 @@ type BlockPrice struct { EstimatedPrices []EstimatedPrice `json:"estimatedPrices"` } -type BlockPrices struct { - MsSinceLastBlock int64 `json:"msSinceLastBlock"` +type blockPrices struct { CurrentBlockNumber int64 `json:"currentBlockNumber"` Prices []BlockPrice `json:"blockPrices"` } type BidPricer struct { - apiKey string + apiKey string + log *slog.Logger + mu sync.RWMutex // Protects currentEstimates + currentEstimates map[int64]float64 } -func NewPricer(apiKey string) *BidPricer { +func NewPricer(apiKey string, logger *slog.Logger) *BidPricer { return &BidPricer{ apiKey: apiKey, + log: logger, } } -func (b *BidPricer) EstimatePrice(ctx context.Context) (*BlockPrices, error) { +func (b *BidPricer) Start(ctx context.Context) <-chan struct{} { + done := make(chan struct{}) + go func() { + defer close(done) + ticker := time.NewTicker(2 * time.Second) // Adjust the ticker interval as needed + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if _, err := b.syncEstimate(ctx); err != nil { + b.log.Error("Failed to estimate price", "error", err) + } + } + } + }() + return done +} + +func (b *BidPricer) EstimatePrice(ctx context.Context) map[int64]float64 { + b.mu.RLock() + defer b.mu.RUnlock() + + estimates := make(map[int64]float64) + for blockNumber, price := range b.currentEstimates { + estimates[blockNumber] = price + } + return estimates +} + +func (b *BidPricer) SyncEstimate(ctx context.Context) error { client := &http.Client{ Timeout: 10 * time.Second, } @@ -69,7 +106,7 @@ func (b *BidPricer) EstimatePrice(ctx context.Context) (*BlockPrices, error) { return nil, err } - bp := new(BlockPrices) + bp := new(blockPrices) if err := json.Unmarshal(respBuf, bp); err != nil { return nil, err } From 727be0cd66f8ad620dda357a3c6d7848693b260b Mon Sep 17 00:00:00 2001 From: Alok Date: Mon, 4 Aug 2025 19:29:25 +0530 Subject: [PATCH 3/4] fix: temp --- .../preconf-rpc/blocktracker/blocktracker.go | 12 ++- .../blocktracker/blocktracker_test.go | 46 ++++++++- tools/preconf-rpc/pricer/pricer.go | 69 ++++++++++---- tools/preconf-rpc/pricer/pricer_test.go | 38 +++----- tools/preconf-rpc/sender/sender.go | 95 +++++++++++-------- 5 files changed, 171 insertions(+), 89 deletions(-) diff --git a/tools/preconf-rpc/blocktracker/blocktracker.go b/tools/preconf-rpc/blocktracker/blocktracker.go index 8304a9668..00caced10 100644 --- a/tools/preconf-rpc/blocktracker/blocktracker.go +++ b/tools/preconf-rpc/blocktracker/blocktracker.go @@ -2,6 +2,7 @@ package blocktracker import ( "context" + "errors" "log/slog" "math/big" "sync" @@ -64,8 +65,8 @@ func (b *blockTracker) Start(ctx context.Context) <-chan struct{} { } _ = b.blocks.Add(blockNo, block) b.latestBlockNo.Store(block.NumberU64()) - b.log.Debug("New block detected", "number", block.NumberU64(), "hash", block.Hash().Hex()) b.triggerCheck() + b.log.Debug("New block detected", "number", block.NumberU64(), "hash", block.Hash().Hex()) } } } @@ -83,6 +84,15 @@ func (b *blockTracker) LatestBlockNumber() uint64 { return b.latestBlockNo.Load() } +func (b *blockTracker) NextBlockNumber() (uint64, time.Duration, error) { + block, found := b.blocks.Get(b.latestBlockNo.Load()) + if !found { + return 0, 0, errors.New("latest block not found in cache") + } + blockTime := time.Unix(int64(block.Time()), 0) + return b.latestBlockNo.Load() + 1, time.Until(blockTime.Add(12 * time.Second)), nil +} + func (b *blockTracker) CheckTxnInclusion( ctx context.Context, txHash common.Hash, diff --git a/tools/preconf-rpc/blocktracker/blocktracker_test.go b/tools/preconf-rpc/blocktracker/blocktracker_test.go index 2f077248a..54f395de9 100644 --- a/tools/preconf-rpc/blocktracker/blocktracker_test.go +++ b/tools/preconf-rpc/blocktracker/blocktracker_test.go @@ -3,13 +3,15 @@ package blocktracker_test import ( "context" "hash" - "log/slog" "math/big" + "os" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/primev/mev-commit/tools/preconf-rpc/blocktracker" + "github.com/primev/mev-commit/x/util" "golang.org/x/crypto/sha3" ) @@ -74,7 +76,7 @@ func TestBlockTracker(t *testing.T) { blk1 := types.NewBlock( &types.Header{ Number: big.NewInt(100), - Time: 1622547800, + Time: uint64(time.Now().Unix()), }, &types.Body{Transactions: []*types.Transaction{tx1, tx2}}, nil, // No receipts @@ -84,7 +86,7 @@ func TestBlockTracker(t *testing.T) { blk2 := types.NewBlock( &types.Header{ Number: big.NewInt(101), - Time: 1622547900, + Time: uint64(time.Now().Add(12 * time.Second).Unix()), }, &types.Body{Transactions: []*types.Transaction{tx3}}, nil, // No receipts @@ -99,7 +101,7 @@ func TestBlockTracker(t *testing.T) { }, } - tracker, err := blocktracker.NewBlockTracker(client, slog.Default()) + tracker, err := blocktracker.NewBlockTracker(client, util.NewTestLogger(os.Stdout)) if err != nil { t.Fatalf("Failed to create block tracker: %v", err) } @@ -112,6 +114,26 @@ func TestBlockTracker(t *testing.T) { client.blockNumber <- 100 + start := time.Now() + for { + bidBlockNo, duration, err := tracker.NextBlockNumber() + if err == nil { + if bidBlockNo != 101 { + t.Fatalf("Expected next block number to be 101, got %d", bidBlockNo) + } + if duration <= 0 { + t.Fatalf("Expected positive duration, got %v", duration) + } + break + } else { + t.Logf("Waiting for next block number: %v", err) + } + if time.Since(start) > 5*time.Second { + t.Fatalf("Timeout waiting for next block number") + } + time.Sleep(100 * time.Millisecond) + } + included, err := tracker.CheckTxnInclusion(ctx, tx1.Hash(), 100) if err != nil { t.Fatalf("Error checking transaction inclusion: %v", err) @@ -128,6 +150,22 @@ func TestBlockTracker(t *testing.T) { client.blockNumber <- 101 + start = time.Now() + for { + bidBlockNo, duration, err := tracker.NextBlockNumber() + if err == nil { + if bidBlockNo == 102 && duration > 0 { + break + } + } else { + t.Logf("Waiting for next block number: %v", err) + } + if time.Since(start) > 5*time.Second { + t.Fatalf("Timeout waiting for next block number") + } + time.Sleep(100 * time.Millisecond) + } + included, err = tracker.CheckTxnInclusion(ctx, tx4.Hash(), 101) if err != nil { t.Fatalf("Error checking transaction inclusion: %v", err) diff --git a/tools/preconf-rpc/pricer/pricer.go b/tools/preconf-rpc/pricer/pricer.go index 27f05b5e1..0bd8c962a 100644 --- a/tools/preconf-rpc/pricer/pricer.go +++ b/tools/preconf-rpc/pricer/pricer.go @@ -29,17 +29,27 @@ type blockPrices struct { } type BidPricer struct { - apiKey string - log *slog.Logger - mu sync.RWMutex // Protects currentEstimates - currentEstimates map[int64]float64 + apiKey string + log *slog.Logger + mu sync.RWMutex // Protects currentEstimates + currentEstimates map[int64]float64 + currentBlockNumber int64 } -func NewPricer(apiKey string, logger *slog.Logger) *BidPricer { - return &BidPricer{ - apiKey: apiKey, - log: logger, +func NewPricer(apiKey string, logger *slog.Logger) (*BidPricer, error) { + bp := &BidPricer{ + apiKey: apiKey, + log: logger, + currentEstimates: make(map[int64]float64), } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := bp.syncEstimate(ctx); err != nil { + return nil, err + } + return bp, nil } func (b *BidPricer) Start(ctx context.Context) <-chan struct{} { @@ -54,7 +64,7 @@ func (b *BidPricer) Start(ctx context.Context) <-chan struct{} { case <-ctx.Done(): return case <-ticker.C: - if _, err := b.syncEstimate(ctx); err != nil { + if err := b.syncEstimate(ctx); err != nil { b.log.Error("Failed to estimate price", "error", err) } } @@ -68,20 +78,20 @@ func (b *BidPricer) EstimatePrice(ctx context.Context) map[int64]float64 { defer b.mu.RUnlock() estimates := make(map[int64]float64) - for blockNumber, price := range b.currentEstimates { - estimates[blockNumber] = price + for confidence, price := range b.currentEstimates { + estimates[confidence] = price } return estimates } -func (b *BidPricer) SyncEstimate(ctx context.Context) error { +func (b *BidPricer) syncEstimate(ctx context.Context) error { client := &http.Client{ Timeout: 10 * time.Second, } req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil) if err != nil { - return nil, err + return err } if b.apiKey != "" { @@ -90,7 +100,7 @@ func (b *BidPricer) SyncEstimate(ctx context.Context) error { resp, err := client.Do(req) if err != nil { - return nil, err + return err } defer func() { @@ -98,22 +108,43 @@ func (b *BidPricer) SyncEstimate(ctx context.Context) error { }() if resp.StatusCode != http.StatusOK { - return nil, errors.New("failed to fetch price estimate: " + resp.Status) + return errors.New("failed to fetch price estimate: " + resp.Status) } respBuf, err := io.ReadAll(io.LimitReader(resp.Body, 1024*1024)) if err != nil { - return nil, err + return err } bp := new(blockPrices) if err := json.Unmarshal(respBuf, bp); err != nil { - return nil, err + return err } if len(bp.Prices) == 0 { - return nil, errors.New("no block prices available") + return errors.New("no block prices available") } - return bp, nil + if b.currentBlockNumber < bp.CurrentBlockNumber+1 { + for _, price := range bp.Prices { + if price.BlockNumber == bp.CurrentBlockNumber+1 { + b.mu.Lock() + for _, estimatedPrice := range price.EstimatedPrices { + switch estimatedPrice.Confidence { + case 90: + b.currentEstimates[int64(estimatedPrice.Confidence)] = estimatedPrice.PriorityFeePerGasGwei + case 95: + b.currentEstimates[int64(estimatedPrice.Confidence)] = estimatedPrice.PriorityFeePerGasGwei + case 99: + b.currentEstimates[int64(estimatedPrice.Confidence)] = estimatedPrice.PriorityFeePerGasGwei + } + } + b.currentBlockNumber = price.BlockNumber + b.mu.Unlock() + b.log.Debug("Updated current estimates", "blockNumber", price.BlockNumber, "estimates", b.currentEstimates) + } + } + } + + return nil } diff --git a/tools/preconf-rpc/pricer/pricer_test.go b/tools/preconf-rpc/pricer/pricer_test.go index fd897e6c4..4b43613de 100644 --- a/tools/preconf-rpc/pricer/pricer_test.go +++ b/tools/preconf-rpc/pricer/pricer_test.go @@ -2,44 +2,36 @@ package pricer_test import ( "context" + "io" "testing" "github.com/primev/mev-commit/tools/preconf-rpc/pricer" + "github.com/primev/mev-commit/x/util" ) func TestEstimatePrice(t *testing.T) { t.Parallel() - bp := pricer.NewPricer("") - - ctx := context.Background() - - prices, err := bp.EstimatePrice(ctx) + logger := util.NewTestLogger(io.Discard) + bp, err := pricer.NewPricer("", logger) if err != nil { - t.Fatalf("failed to estimate price: %v", err) - } - - if prices.CurrentBlockNumber == 0 { - t.Error("expected non-zero current block number") + t.Fatalf("failed to create pricer: %v", err) } - if len(prices.Prices) == 0 { - t.Error("expected at least one block price") - } + ctx := context.Background() - price := prices.Prices[0] + prices := bp.EstimatePrice(ctx) - if price.BlockNumber == 0 { - t.Error("expected non-zero block number in price") + if len(prices) != 3 { + t.Fatalf("expected 3 confidence levels, got %d", len(prices)) } - if len(price.EstimatedPrices) == 0 { - t.Error("expected at least one estimated price") - } - - for _, estPrice := range price.EstimatedPrices { - if estPrice.PriorityFeePerGasGwei <= 0 { - t.Errorf("expected positive priority fee per gas, got %f", estPrice.PriorityFeePerGasGwei) + for confidence, price := range prices { + if confidence <= 0 { + t.Errorf("expected positive confidence level, got %d", confidence) + } + if price <= 0 { + t.Errorf("expected positive price, got %f", price) } } diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index d8ab37f99..926444c69 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -15,7 +15,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" lru "github.com/hashicorp/golang-lru/v2" bidderapiv1 "github.com/primev/mev-commit/p2p/gen/go/bidderapi/v1" - "github.com/primev/mev-commit/tools/preconf-rpc/pricer" optinbidder "github.com/primev/mev-commit/x/opt-in-bidder" "golang.org/x/sync/errgroup" ) @@ -88,11 +87,12 @@ type Bidder interface { } type Pricer interface { - EstimatePrice(ctx context.Context) (*pricer.BlockPrices, error) + EstimatePrice(ctx context.Context) map[int64]float64 } type BlockTracker interface { CheckTxnInclusion(ctx context.Context, txnHash common.Hash, blockNumber uint64) (bool, error) + NextBlockNumber() (uint64, time.Duration, error) } type Transferer interface { @@ -100,7 +100,7 @@ type Transferer interface { } type blockAttempt struct { - blockNumber int64 + blockNumber uint64 attempts int } @@ -424,8 +424,7 @@ BID_LOOP: "blockNumber", result.blockNumber, "bidAmount", result.bidAmount.String(), ) - blockTimeUsed := time.Since(result.startTime).Milliseconds() + result.msSinceLastBlock - if blockTimeUsed < (blockTime*1000 - 1000) { + if (result.timeUntillNextBlock - time.Second) > time.Since(result.startTime) { // If not all builders committed, we will retry the bid process // immediately if we have atleast 1 second left before the next block continue @@ -491,13 +490,13 @@ func (e *errRetry) Error() string { } type bidResult struct { - startTime time.Time - msSinceLastBlock int64 - noOfProviders int - blockNumber uint64 - optedInSlot bool - bidAmount *big.Int - commitments []*bidderapiv1.Commitment + startTime time.Time + timeUntillNextBlock time.Duration + noOfProviders int + blockNumber uint64 + optedInSlot bool + bidAmount *big.Int + commitments []*bidderapiv1.Commitment } func (t *TxSender) sendBid( @@ -513,22 +512,33 @@ func (t *TxSender) sendBid( timeToOptIn = blockTime * 32 } - prices, err := t.pricer.EstimatePrice(ctx) + start := time.Now() + bidBlockNo, timeUntilNextBlock, err := t.blockTracker.NextBlockNumber() if err != nil { - t.logger.Error("Failed to estimate transaction price", "error", err) + t.logger.Error("Failed to get next block number", "error", err) return bidResult{}, &errRetry{ - err: fmt.Errorf("failed to estimate transaction price: %w", err), + err: fmt.Errorf("failed to get next block number: %w", err), retryAfter: time.Second, } } - start := time.Now() - optedInSlot := math.Abs(float64(timeToOptIn)-float64(blockTime-(prices.MsSinceLastBlock/1000))) < float64(blockTime/2) + if timeUntilNextBlock <= time.Second { + t.logger.Warn("Next block time is too short, skipping bid", "timeUntilNextBlock", timeUntilNextBlock) + return bidResult{}, &errRetry{ + err: fmt.Errorf("next block time is too short: %s", timeUntilNextBlock), + retryAfter: time.Second, + } + } + + prices := t.pricer.EstimatePrice(ctx) + + // Allow for certain level of tolerance w.r.t timestamps + optedInSlot := math.Abs(float64(timeToOptIn)-float64(timeUntilNextBlock.Seconds())) < float64(blockTime/3) cctx, cancel := context.WithTimeout(ctx, bidTimeout) defer cancel() - cost, blockNo, err := t.calculatePriceForNextBlock(txn, prices, optedInSlot) + cost, err := t.calculatePriceForNextBlock(txn, bidBlockNo, prices, optedInSlot) if err != nil { t.logger.Error("Failed to calculate price for next block", "error", err) return bidResult{}, &errRetry{ @@ -585,7 +595,7 @@ func (t *TxSender) sendBid( strings.TrimPrefix(txn.Raw, "0x"), &optinbidder.BidOpts{ WaitForOptIn: false, - BlockNumber: uint64(blockNo), + BlockNumber: uint64(bidBlockNo), RevertingTxHashes: []string{txn.Hash().Hex()}, DecayDuration: bidTimeout * 2, }, @@ -596,10 +606,10 @@ func (t *TxSender) sendBid( } result := bidResult{ - commitments: make([]*bidderapiv1.Commitment, 0), - bidAmount: cost, - startTime: start, - msSinceLastBlock: prices.MsSinceLastBlock, + commitments: make([]*bidderapiv1.Commitment, 0), + bidAmount: cost, + startTime: start, + // msSinceLastBlock: prices.MsSinceLastBlock, } BID_LOOP: for { @@ -642,9 +652,10 @@ BID_LOOP: func (t *TxSender) calculatePriceForNextBlock( txn *Transaction, - prices *pricer.BlockPrices, + bidBlockNo uint64, + prices map[int64]float64, optedInSlot bool, -) (*big.Int, int64, error) { +) (*big.Int, error) { attempts, found := t.txnAttemptHistory.Get(txn.Hash()) if !found { attempts = &txnAttempt{ @@ -657,16 +668,20 @@ func (t *TxSender) calculatePriceForNextBlock( confidence := defaultConfidence isRetry := false - for _, attempt := range attempts.attempts { - if attempt.blockNumber == prices.CurrentBlockNumber+1 { + for i := len(attempts.attempts) - 1; i >= 0; i-- { + if attempts.attempts[i].blockNumber < bidBlockNo { + break // We only care about attempts for the current block + } + if attempts.attempts[i].blockNumber == bidBlockNo { isRetry = true - attempt.attempts++ + attempts.attempts[i].attempts++ switch { - case attempt.attempts == 2: + case attempts.attempts[i].attempts == 2: confidence = confidenceSecondAttempt - case attempt.attempts > 2: + case attempts.attempts[i].attempts > 2: confidence = confidenceSubsequentAttempts } + break // No need to check further attempts for the same block } } @@ -677,27 +692,23 @@ func (t *TxSender) calculatePriceForNextBlock( // If this is the first attempt for the next block, we add it to the history if !isRetry { attempts.attempts = append(attempts.attempts, &blockAttempt{ - blockNumber: prices.CurrentBlockNumber + 1, + blockNumber: bidBlockNo, attempts: 1, }) } _ = t.txnAttemptHistory.Add(txn.Hash(), attempts) - for _, price := range prices.Prices { - if price.BlockNumber == prices.CurrentBlockNumber+1 { - for _, estimatedPrice := range price.EstimatedPrices { - if estimatedPrice.Confidence == confidence { - // the gwei value is in float, so we need to convert it to wei before multiplying with gas limit - priceInWei := estimatedPrice.PriorityFeePerGasGwei * 1e9 // Convert Gwei to Wei - return new(big.Int).Mul(big.NewInt(int64(priceInWei)), big.NewInt(int64(txn.Gas()))), price.BlockNumber, nil - } - } + for conf, price := range prices { + if conf == int64(confidence) { + // the gwei value is in float, so we need to convert it to wei before multiplying with gas limit + priceInWei := price * 1e9 // Convert Gwei to Wei + return new(big.Int).Mul(big.NewInt(int64(priceInWei)), big.NewInt(int64(txn.Gas()))), nil } } - return nil, 0, fmt.Errorf( - "no estimated price found for block %d with confidence %d", prices.CurrentBlockNumber+1, confidence, + return nil, fmt.Errorf( + "no estimated price found for block %d with confidence %d", bidBlockNo, confidence, ) } From a746192fad6412843147f5cccb0ea2112a9e37d0 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Tue, 5 Aug 2025 01:02:18 +0530 Subject: [PATCH 4/4] fix: preconf RPC fixes --- tools/preconf-rpc/handlers/handlers.go | 35 ++--- tools/preconf-rpc/sender/sender.go | 8 +- tools/preconf-rpc/sender/sender_test.go | 182 +++++++++++------------- tools/preconf-rpc/service/service.go | 5 +- 4 files changed, 100 insertions(+), 130 deletions(-) diff --git a/tools/preconf-rpc/handlers/handlers.go b/tools/preconf-rpc/handlers/handlers.go index b49313f5b..1363f68be 100644 --- a/tools/preconf-rpc/handlers/handlers.go +++ b/tools/preconf-rpc/handlers/handlers.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" bidderapiv1 "github.com/primev/mev-commit/p2p/gen/go/bidderapi/v1" - "github.com/primev/mev-commit/tools/preconf-rpc/pricer" "github.com/primev/mev-commit/tools/preconf-rpc/rpcserver" "github.com/primev/mev-commit/tools/preconf-rpc/sender" ) @@ -22,7 +21,7 @@ type Bidder interface { } type Pricer interface { - EstimatePrice(ctx context.Context) (*pricer.BlockPrices, error) + EstimatePrice(ctx context.Context) map[int64]float64 } type Store interface { @@ -135,14 +134,7 @@ func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) { return json.RawMessage(fmt.Sprintf(`{"timeInSecs": "%d"}`, timeToOptIn)), false, nil }) server.RegisterHandler("mevcommit_estimateDeposit", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) { - blockPrices, err := h.pricer.EstimatePrice(ctx) - if err != nil { - h.logger.Error("Failed to estimate deposit price", "error", err) - return nil, false, rpcserver.NewJSONErr( - rpcserver.CodeCustomError, - "failed to estimate deposit price", - ) - } + blockPrices := h.pricer.EstimatePrice(ctx) cost := getNextBlockPrice(blockPrices) result := map[string]interface{}{ "bidAmount": cost.String(), @@ -161,14 +153,7 @@ func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) { return resultJSON, false, nil }) server.RegisterHandler("mevcommit_estimateBridge", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) { - blockPrices, err := h.pricer.EstimatePrice(ctx) - if err != nil { - h.logger.Error("Failed to estimate bridge price", "error", err) - return nil, false, rpcserver.NewJSONErr( - rpcserver.CodeCustomError, - "failed to estimate bridge price", - ) - } + blockPrices := h.pricer.EstimatePrice(ctx) cost := getNextBlockPrice(blockPrices) bridgeCost := new(big.Int).Mul(cost, big.NewInt(2)) result := map[string]interface{}{ @@ -192,15 +177,11 @@ func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) { server.RegisterHandler("mevcommit_getBalance", h.handleMevCommitGetBalance) } -func getNextBlockPrice(blockPrices *pricer.BlockPrices) *big.Int { - for _, price := range blockPrices.Prices { - if price.BlockNumber == blockPrices.CurrentBlockNumber+1 { - for _, estimate := range price.EstimatedPrices { - if estimate.Confidence == 99 { - priceInWei := estimate.PriorityFeePerGasGwei * 1e9 - return new(big.Int).Mul(new(big.Int).SetUint64(uint64(priceInWei)), big.NewInt(21000)) - } - } +func getNextBlockPrice(blockPrices map[int64]float64) *big.Int { + for confidence, price := range blockPrices { + if confidence == 99 { + priceInWei := price * 1e9 // Convert Gwei to Wei + return new(big.Int).Mul(new(big.Int).SetUint64(uint64(priceInWei)), big.NewInt(21000)) } } diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index 926444c69..2331f03df 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -606,10 +606,10 @@ func (t *TxSender) sendBid( } result := bidResult{ - commitments: make([]*bidderapiv1.Commitment, 0), - bidAmount: cost, - startTime: start, - // msSinceLastBlock: prices.MsSinceLastBlock, + commitments: make([]*bidderapiv1.Commitment, 0), + bidAmount: cost, + startTime: start, + timeUntillNextBlock: timeUntilNextBlock, } BID_LOOP: for { diff --git a/tools/preconf-rpc/sender/sender_test.go b/tools/preconf-rpc/sender/sender_test.go index faa0b26c5..04da270ca 100644 --- a/tools/preconf-rpc/sender/sender_test.go +++ b/tools/preconf-rpc/sender/sender_test.go @@ -7,11 +7,11 @@ import ( "os" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" bidderapiv1 "github.com/primev/mev-commit/p2p/gen/go/bidderapi/v1" - "github.com/primev/mev-commit/tools/preconf-rpc/pricer" "github.com/primev/mev-commit/tools/preconf-rpc/sender" optinbidder "github.com/primev/mev-commit/x/opt-in-bidder" "github.com/primev/mev-commit/x/util" @@ -194,30 +194,18 @@ func (m *mockBidder) Bid( } type mockPricer struct { - out chan *pricer.BlockPrices - errOut chan error + out chan map[int64]float64 } -func (m *mockPricer) EstimatePrice( - ctx context.Context, -) (*pricer.BlockPrices, error) { - select { - case err := <-m.errOut: - if err != nil { - return nil, err - } - default: - // No error, continue - } - +func (m *mockPricer) EstimatePrice(ctx context.Context) map[int64]float64 { select { case prices := <-m.out: if prices == nil { - return nil, errors.New("nil price returned") + return nil } - return prices, nil + return prices case <-ctx.Done(): - return nil, ctx.Err() + return nil } } @@ -226,9 +214,17 @@ type op struct { block uint64 } +type blockNoOp struct { + block uint64 + timeTillNextBlock time.Duration +} + type mockBlockTracker struct { - in chan op - out chan bool + in chan op + out chan bool + bnIn chan struct{} + bnOut chan blockNoOp + bnErr chan error } func (m *mockBlockTracker) CheckTxnInclusion(ctx context.Context, txnHash common.Hash, blockNumber uint64) (bool, error) { @@ -244,6 +240,17 @@ func (m *mockBlockTracker) CheckTxnInclusion(ctx context.Context, txnHash common } } +func (m *mockBlockTracker) NextBlockNumber() (uint64, time.Duration, error) { + m.bnIn <- struct{}{} + + select { + case <-m.bnErr: + return 0, 0, errors.New("error getting next block number") + case op := <-m.bnOut: + return op.block, op.timeTillNextBlock, nil + } +} + type mockTransferer struct{} func (m *mockTransferer) Transfer(ctx context.Context, to common.Address, chainID *big.Int, amount *big.Int) error { @@ -255,8 +262,7 @@ func TestSender(t *testing.T) { st := newMockStore() testPricer := &mockPricer{ - out: make(chan *pricer.BlockPrices, 10), - errOut: make(chan error, 1), + out: make(chan map[int64]float64, 10), } bidder := &mockBidder{ optinEstimate: make(chan int64, 10), @@ -264,8 +270,11 @@ func TestSender(t *testing.T) { out: make(chan chan optinbidder.BidStatus, 10), } blockTracker := &mockBlockTracker{ - in: make(chan op, 10), - out: make(chan bool, 10), + in: make(chan op, 10), + out: make(chan bool, 10), + bnIn: make(chan struct{}, 10), + bnOut: make(chan blockNoOp, 10), + bnErr: make(chan error, 1), } sndr, err := sender.NewTxSender( @@ -310,34 +319,23 @@ func TestSender(t *testing.T) { // Simulate opted in block bidder.optinEstimate <- 2 - // simulate error and ensure retry happens - testPricer.errOut <- errors.New("simulated error for testing") + <-blockTracker.bnIn + blockTracker.bnErr <- errors.New("simulated error for testing") bidder.optinEstimate <- 7 + <-blockTracker.bnIn + + blockTracker.bnOut <- blockNoOp{ + block: 1, + timeTillNextBlock: 5 * time.Second, + } + // Simulate a price estimate - testPricer.out <- &pricer.BlockPrices{ - CurrentBlockNumber: 0, - MsSinceLastBlock: 1000, - Prices: []pricer.BlockPrice{ - { - BlockNumber: 1, - EstimatedPrices: []pricer.EstimatedPrice{ - { - Confidence: 90, - PriorityFeePerGasGwei: 1.0, - }, - { - Confidence: 95, - PriorityFeePerGasGwei: 1.5, - }, - { - Confidence: 99, - PriorityFeePerGasGwei: 2.0, - }, - }, - }, - }, + testPricer.out <- map[int64]float64{ + 90: 1.0, + 95: 1.5, + 99: 2.0, } // Simulate a bid response @@ -411,29 +409,17 @@ func TestSender(t *testing.T) { // Simulate non opted in block bidder.optinEstimate <- 20 + <-blockTracker.bnIn + blockTracker.bnOut <- blockNoOp{ + block: 2, + timeTillNextBlock: 5 * time.Second, + } + // Simulate a price estimate - testPricer.out <- &pricer.BlockPrices{ - CurrentBlockNumber: 1, - MsSinceLastBlock: 1000, - Prices: []pricer.BlockPrice{ - { - BlockNumber: 2, - EstimatedPrices: []pricer.EstimatedPrice{ - { - Confidence: 90, - PriorityFeePerGasGwei: 1.0, - }, - { - Confidence: 95, - PriorityFeePerGasGwei: 1.5, - }, - { - Confidence: 99, - PriorityFeePerGasGwei: 2.0, - }, - }, - }, - }, + testPricer.out <- map[int64]float64{ + 90: 1.0, + 95: 1.5, + 99: 2.0, } // Simulate a bid response @@ -457,29 +443,17 @@ func TestSender(t *testing.T) { // Simulate non opted in block bidder.optinEstimate <- 18 + <-blockTracker.bnIn + blockTracker.bnOut <- blockNoOp{ + block: 2, + timeTillNextBlock: 5 * time.Second, + } + // Simulate a price estimate - testPricer.out <- &pricer.BlockPrices{ - CurrentBlockNumber: 1, - MsSinceLastBlock: 1000, - Prices: []pricer.BlockPrice{ - { - BlockNumber: 2, - EstimatedPrices: []pricer.EstimatedPrice{ - { - Confidence: 90, - PriorityFeePerGasGwei: 1.0, - }, - { - Confidence: 95, - PriorityFeePerGasGwei: 1.5, - }, - { - Confidence: 99, - PriorityFeePerGasGwei: 2.0, - }, - }, - }, - }, + testPricer.out <- map[int64]float64{ + 90: 1.0, + 95: 1.5, + 99: 2.0, } // Simulate a bid response @@ -551,8 +525,7 @@ func TestCancelTransaction(t *testing.T) { st := newMockStore() testPricer := &mockPricer{ - out: make(chan *pricer.BlockPrices, 10), - errOut: make(chan error, 3), + out: make(chan map[int64]float64, 10), } bidder := &mockBidder{ optinEstimate: make(chan int64), @@ -560,8 +533,11 @@ func TestCancelTransaction(t *testing.T) { out: make(chan chan optinbidder.BidStatus, 10), } blockTracker := &mockBlockTracker{ - in: make(chan op, 10), - out: make(chan bool, 10), + in: make(chan op, 10), + out: make(chan bool, 10), + bnIn: make(chan struct{}, 10), + bnOut: make(chan blockNoOp, 10), + bnErr: make(chan error, 3), } sndr, err := sender.NewTxSender( @@ -603,8 +579,18 @@ func TestCancelTransaction(t *testing.T) { t.Fatalf("failed to enqueue transaction: %v", err) } - testPricer.errOut <- errors.New("simulated error for testing") - testPricer.errOut <- errors.New("simulated error for testing") + go func() { + for { + select { + case <-blockTracker.bnIn: + case <-ctx.Done(): + return + } + } + }() + + blockTracker.bnErr <- errors.New("simulated error for testing") + blockTracker.bnErr <- errors.New("simulated error for testing") bidder.optinEstimate <- 2 cancelled, err := sndr.CancelTransaction(ctx, tx1.Hash()) diff --git a/tools/preconf-rpc/service/service.go b/tools/preconf-rpc/service/service.go index f07a28273..926ae43d5 100644 --- a/tools/preconf-rpc/service/service.go +++ b/tools/preconf-rpc/service/service.go @@ -174,7 +174,10 @@ func New(config *Config) (*Service, error) { config.Logger.With("module", "rpcserver"), ) - bidpricer := pricer.NewPricer(config.PricerAPIKey) + bidpricer, err := pricer.NewPricer(config.PricerAPIKey, config.Logger.With("module", "bidpricer")) + if err != nil { + return nil, fmt.Errorf("failed to create bid pricer: %w", err) + } db, err := initDB(config) if err != nil {