Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions tools/preconf-rpc/blocktracker/blocktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,16 @@ func (b *blockTracker) LatestBlockNumber() uint64 {
}

func (b *blockTracker) NextBlockNumber() (uint64, time.Duration, error) {
block, found := b.blocks.Get(b.latestBlockNo.Load())
latestBlockNo := b.latestBlockNo.Load()
block, found := b.blocks.Get(latestBlockNo)
if !found {
return 0, 0, errors.New("latest block not found in cache")
}
blockTime := time.Unix(int64(block.Time()), 0)
return b.latestBlockNo.Load() + 1, time.Until(blockTime.Add(12 * time.Second)), nil
if time.Since(blockTime) >= 11*time.Second {
return latestBlockNo + 2, time.Until(blockTime.Add(24 * time.Second)), nil
}
return latestBlockNo + 1, time.Until(blockTime.Add(12 * time.Second)), nil
}

func (b *blockTracker) CheckTxnInclusion(
Expand Down
12 changes: 4 additions & 8 deletions tools/preconf-rpc/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewRPCMethodHandler(

func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) {
// Ethereum JSON-RPC methods overridden
server.RegisterHandler("eth_getBlockNumber", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) {
server.RegisterHandler("eth_blockNumber", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) {
blockNumber := h.blockTracker.LatestBlockNumber()

blockNumberJSON, err := json.Marshal(hexutil.Uint64(blockNumber))
Expand Down Expand Up @@ -411,8 +411,7 @@ func (h *rpcMethodHandler) handleGetTxReceipt(ctx context.Context, params ...any
return nil, true, nil
}

if txn.Status != sender.TxStatusFailed &&
(txn.Status != sender.TxStatusPreConfirmed || h.blockTracker.LatestBlockNumber() > uint64(txn.BlockNumber)) {
if txn.Status != sender.TxStatusFailed && txn.Status != sender.TxStatusPreConfirmed {
return nil, true, nil
}

Expand Down Expand Up @@ -571,11 +570,8 @@ func (h *rpcMethodHandler) handleMevCommitGetBalance(ctx context.Context, params

balance, err := h.store.GetBalance(ctx, common.HexToAddress(account))
if err != nil {
h.logger.Error("Failed to get balance for account", "error", err, "account", account)
return nil, false, rpcserver.NewJSONErr(
rpcserver.CodeCustomError,
"failed to get balance for account",
)
h.logger.Warn("Failed to get balance for account, returning 0", "error", err, "account", account)
balance = big.NewInt(0)
}

return json.RawMessage(fmt.Sprintf(`{"balance": "%s"}`, balance)), false, nil
Expand Down
4 changes: 2 additions & 2 deletions tools/preconf-rpc/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ func (n *Notifier) StartTransactionNotifier(
func (n *Notifier) NotifyTransactionStatus(
txn *sender.Transaction,
noOfAttempts int,
start time.Time,
timeTaken time.Duration,
) {
n.queuedMu.Lock()
defer n.queuedMu.Unlock()

n.queuedTxns = append(n.queuedTxns, txnInfo{
txn: txn,
noOfAttempts: noOfAttempts,
timeTaken: time.Since(start).Round(time.Millisecond),
timeTaken: timeTaken,
})
}

Expand Down
10 changes: 3 additions & 7 deletions tools/preconf-rpc/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ var cacheMethods = map[string]bool{
"eth_call": true,
"eth_getCode": true,
"eth_getStorageAt": true,
"eth_feeHistory": true,
"eth_gasPrice": true,
"eth_getLogs": true,
"net_version": true,
}
Expand Down Expand Up @@ -168,7 +166,7 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

start := time.Now()
defer func() {
s.logger.Debug("Request processing time", "method", req.Method, "id", req.ID, "duration", time.Since(start))
s.logger.Info("Request processing time", "method", req.Method, "id", req.ID, "duration", time.Since(start).String())
}()

if cacheMethods[req.Method] {
Expand Down Expand Up @@ -201,7 +199,7 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if cacheMethods[req.Method] && resp.Result != nil {
key := cacheKey(req.Method, req.Params)
s.cache.Add(key, cacheEntry{
_ = s.cache.Add(key, cacheEntry{
until: time.Now().Add(pickTTL(req.Method, *resp.Result)),
data: *resp.Result,
})
Expand Down Expand Up @@ -350,16 +348,14 @@ func pickTTL(method string, params json.RawMessage) time.Duration {
return 24 * time.Hour
case "eth_getCode":
return 24 * time.Hour
case "eth_feeHistory":
return 3 * time.Second
case "eth_call":
// if block tag provided and hex number → immutable
if strings.HasSuffix(string(params), "\"") { // cheap check
if strings.Contains(string(params), "\"0x") && !strings.Contains(string(params), "\"latest\"") {
return 24 * time.Hour
}
}
return 1 * time.Second
return 1 * time.Millisecond
default:
return 2 * time.Second
}
Expand Down
96 changes: 64 additions & 32 deletions tools/preconf-rpc/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ const (
)

const (
blockTime = 12 // seconds, typical Ethereum block time
bidTimeout = 100 * time.Millisecond // timeout for bid operations
defaultConfidence = 90 // default confidence level for the next block
confidenceSecondAttempt = 95 // confidence level for the second attempt
confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts
transactionTimeout = 10 * time.Minute // timeout for transaction processing
blockTime = 12 // seconds, typical Ethereum block time
bidTimeout = 3 * time.Second // timeout for bid operation
defaultConfidence = 90 // default confidence level for the next block
confidenceSecondAttempt = 95 // confidence level for the second attempt
confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts
transactionTimeout = 10 * time.Minute // timeout for transaction processing
)

var (
Expand Down Expand Up @@ -113,7 +113,7 @@ type txnAttempt struct {
}

type Notifier interface {
NotifyTransactionStatus(txn *Transaction, noOfAttempts int, start time.Time)
NotifyTransactionStatus(txn *Transaction, noOfAttempts int, timeTaken time.Duration)
}

type TxSender struct {
Expand All @@ -135,6 +135,8 @@ type TxSender struct {
txnAttemptHistory *lru.Cache[common.Hash, *txnAttempt]
notifier Notifier
fastTrack func(cmts []*bidderapiv1.Commitment, optedInSlot bool) bool
bidTimeout time.Duration
timeoutMtx sync.RWMutex
}

func noOpFastTrack(_ []*bidderapiv1.Commitment, _ bool) bool {
Expand Down Expand Up @@ -177,6 +179,7 @@ func NewTxSender(
txnAttemptHistory: txnAttemptHistory,
notifier: notifier,
fastTrack: fastTrack,
bidTimeout: bidTimeout,
}, nil
}

Expand Down Expand Up @@ -307,6 +310,20 @@ func (t *TxSender) CancelTransaction(ctx context.Context, txnHash common.Hash) (
}
}

func (t *TxSender) UpdateBidTimeout(timeout time.Duration) {
t.timeoutMtx.Lock()
defer t.timeoutMtx.Unlock()

t.bidTimeout = timeout
}

func (t *TxSender) getBidTimeout() time.Duration {
t.timeoutMtx.RLock()
defer t.timeoutMtx.RUnlock()

return t.bidTimeout
}

func (t *TxSender) Start(ctx context.Context) chan struct{} {
t.eg, t.egCtx = errgroup.WithContext(ctx)
done := make(chan struct{})
Expand Down Expand Up @@ -374,7 +391,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) {
t.logger.Info("No queued transactions to process")
return
}
t.logger.Info("Processing queued transactions", "count", len(txns))
t.logger.Debug("Processing queued transactions", "count", len(txns))
for _, txn := range txns {
txn := txn // capture range variable
select {
Expand All @@ -398,7 +415,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) {
t.logger.Error("Failed to process transaction", "sender", txn.Sender.Hex(), "error", err)
txn.Status = TxStatusFailed
txn.Details = err.Error()
t.clearBlockAttemptHistory(txn)
t.clearBlockAttemptHistory(txn, time.Now())
return t.store.StoreTransaction(ctx, txn, nil)
}
return nil
Expand All @@ -422,6 +439,7 @@ BID_LOOP:
default:
}

preConfirmed := false
result, err = t.sendBid(ctx, txn)
switch {
case err != nil:
Expand Down Expand Up @@ -449,13 +467,16 @@ BID_LOOP:
txn.BlockNumber = int64(result.blockNumber)
t.logger.Info(
"Transaction fast-tracked based on commitments",
"transactionHash", txn.Hash().Hex(),
"sender", txn.Sender.Hex(),
"type", txn.Type,
"blockNumber", result.blockNumber,
"bidAmount", result.bidAmount.String(),
)
t.clearBlockAttemptHistory(txn)
break BID_LOOP
if err := t.store.StoreTransaction(ctx, txn, result.commitments); err != nil {
return fmt.Errorf("failed to store fast-tracked transaction: %w", err)
}
preConfirmed = true
case result.optedInSlot:
if result.noOfProviders == len(result.commitments) {
// This means that all builders have committed to the bid and it
Expand All @@ -465,20 +486,24 @@ BID_LOOP:
txn.BlockNumber = int64(result.blockNumber)
t.logger.Info(
"Transaction pre-confirmed",
"transactionHash", txn.Hash().Hex(),
"sender", txn.Sender.Hex(),
"type", txn.Type,
"blockNumber", result.blockNumber,
"bidAmount", result.bidAmount.String(),
)
t.clearBlockAttemptHistory(txn)
break BID_LOOP
if err := t.store.StoreTransaction(ctx, txn, result.commitments); err != nil {
return fmt.Errorf("failed to store preconfirmed transaction: %w", err)
}
preConfirmed = true
}
default:
}

if result.noOfProviders > len(result.commitments) {
if !preConfirmed && result.noOfProviders > len(result.commitments) {
t.logger.Warn(
"Not all builders committed to the bid",
"transactionHash", txn.Hash().Hex(),
"noOfProviders", result.noOfProviders,
"noOfCommitments", len(result.commitments),
"sender", txn.Sender.Hex(),
Expand All @@ -501,24 +526,30 @@ BID_LOOP:
return fmt.Errorf("failed to check transaction inclusion: %w", err)
}
if included {
txn.Status = TxStatusConfirmed
txn.BlockNumber = int64(result.blockNumber)
t.logger.Info(
"Transaction confirmed for non opted-in slot",
"sender", txn.Sender.Hex(),
"type", txn.Type,
"blockNumber", result.blockNumber,
"bidAmount", result.bidAmount.String(),
)
t.clearBlockAttemptHistory(txn)
if !preConfirmed {
txn.Status = TxStatusConfirmed
txn.BlockNumber = int64(result.blockNumber)
t.logger.Info(
"Transaction confirmed",
"transactionHash", txn.Hash().Hex(),
"sender", txn.Sender.Hex(),
"type", txn.Type,
"blockNumber", result.blockNumber,
"bidAmount", result.bidAmount.String(),
)
if err := t.store.StoreTransaction(ctx, txn, result.commitments); err != nil {
return fmt.Errorf("failed to store preconfirmed transaction: %w", err)
}
}
endTime := time.Now()
if len(result.commitments) > 0 {
endTime = time.UnixMilli(result.commitments[len(result.commitments)-1].DispatchTimestamp)
}
t.clearBlockAttemptHistory(txn, endTime)
break BID_LOOP
}
}

if err := t.store.StoreTransaction(ctx, txn, result.commitments); err != nil {
return fmt.Errorf("failed to store preconfirmed transaction: %w", err)
}

switch txn.Type {
case TxTypeRegular:
if err := t.store.DeductBalance(ctx, txn.Sender, result.bidAmount); err != nil {
Expand Down Expand Up @@ -597,7 +628,7 @@ func (t *TxSender) sendBid(
// Allow for certain level of tolerance w.r.t timestamps
optedInSlot := math.Abs(float64(timeToOptIn)-float64(timeUntilNextBlock.Seconds())) < float64(blockTime/3)

cctx, cancel := context.WithTimeout(ctx, bidTimeout)
cctx, cancel := context.WithTimeout(ctx, t.getBidTimeout())
defer cancel()

cost, err := t.calculatePriceForNextBlock(txn, bidBlockNo, prices, optedInSlot)
Expand Down Expand Up @@ -663,7 +694,7 @@ func (t *TxSender) sendBid(
WaitForOptIn: false,
BlockNumber: uint64(bidBlockNo),
RevertingTxHashes: []string{txn.Hash().Hex()},
DecayDuration: bidTimeout * 2,
DecayDuration: t.getBidTimeout() * 2,
},
)
if err != nil {
Expand Down Expand Up @@ -782,7 +813,7 @@ func (t *TxSender) calculatePriceForNextBlock(
)
}

func (t *TxSender) clearBlockAttemptHistory(txn *Transaction) {
func (t *TxSender) clearBlockAttemptHistory(txn *Transaction, endTime time.Time) {
attempts, found := t.txnAttemptHistory.Get(txn.Hash())
if !found {
return
Expand All @@ -796,6 +827,7 @@ func (t *TxSender) clearBlockAttemptHistory(txn *Transaction) {
t.logger.Info(
"Clearing block attempt history for transaction",
"hash", txn.Hash().Hex(),
"blockNumber", txn.BlockNumber,
"blockAttempts", len(attempts.attempts),
"startTime", attempts.startTime.Format(time.RFC3339),
"startBlockNumber", attempts.attempts[0].blockNumber,
Expand All @@ -804,5 +836,5 @@ func (t *TxSender) clearBlockAttemptHistory(txn *Transaction) {

_ = t.txnAttemptHistory.Remove(txn.Hash())

t.notifier.NotifyTransactionStatus(txn, totalAttempts, attempts.startTime)
t.notifier.NotifyTransactionStatus(txn, totalAttempts, endTime.Sub(attempts.startTime).Round(time.Millisecond))
}
14 changes: 12 additions & 2 deletions tools/preconf-rpc/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ type mockNotifier struct {
notifications []string
}

func (m *mockNotifier) NotifyTransactionStatus(txn *sender.Transaction, attempts int, start time.Time) {
func (m *mockNotifier) NotifyTransactionStatus(txn *sender.Transaction, attempts int, start time.Duration) {
m.notifications = append(m.notifications, txn.Hash().Hex())
}

Expand Down Expand Up @@ -399,6 +399,16 @@ func TestSender(t *testing.T) {
t.Fatalf("expected 1 commitment, got %d", len(res.commitments))
}

checkOp := <-blockTracker.in
if checkOp.hash != tx1.Hash() {
t.Fatalf("expected transaction hash %s, got %s", tx1.Hash().Hex(), checkOp.hash.Hex())
}
if checkOp.block != 1 {
t.Fatalf("expected block number 1, got %d", checkOp.block)
}
// Simulate transaction inclusion
blockTracker.out <- true

tx2 := &sender.Transaction{
Transaction: types.NewTransaction(
2,
Expand Down Expand Up @@ -493,7 +503,7 @@ func TestSender(t *testing.T) {
close(resC)
bidder.out <- resC

checkOp := <-blockTracker.in
checkOp = <-blockTracker.in
if checkOp.hash != tx2.Hash() {
t.Fatalf("expected transaction hash %s, got %s", tx2.Hash().Hex(), checkOp.hash.Hex())
}
Expand Down
Loading
Loading