From e618d69672e463f6172acfa7bf74fcfff0d94fb8 Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Thu, 9 Oct 2025 17:07:46 +0530 Subject: [PATCH] optimizations --- tools/indexer/cmd/main.go | 4 +- tools/indexer/pkg/backfill/backfill.go | 192 +++++++++++++--------- tools/indexer/pkg/beacon/client.go | 10 +- tools/indexer/pkg/database/starrock.go | 29 +++- tools/indexer/pkg/ethereum/conversions.go | 51 +++++- 5 files changed, 186 insertions(+), 100 deletions(-) diff --git a/tools/indexer/cmd/main.go b/tools/indexer/cmd/main.go index 5cc43e8fc..faf32b315 100644 --- a/tools/indexer/cmd/main.go +++ b/tools/indexer/cmd/main.go @@ -69,14 +69,14 @@ var ( Name: "backfill-lookback", Usage: "number of slots to look back for backfill", EnvVars: []string{"INDEXER_BACKFILL_LOOKBACK"}, - Value: 10000000, + Value: 50400, }) optionBackfillBatch = altsrc.NewIntFlag(&cli.IntFlag{ Name: "backfill-batch", Usage: "batch size for backfill operations", EnvVars: []string{"INDEXER_BACKFILL_BATCH"}, - Value: 5, + Value: 100, }) optionHTTPTimeout = altsrc.NewDurationFlag(&cli.DurationFlag{ diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go index a9d31c23c..5fa43f107 100644 --- a/tools/indexer/pkg/backfill/backfill.go +++ b/tools/indexer/pkg/backfill/backfill.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "sync" "time" "github.com/hashicorp/go-retryablehttp" @@ -28,95 +29,138 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c if err := ctx.Err(); err != nil { return err } - - blocks, err := db.GetRecentMissingBlocks(ctx, cfg.BackfillLookback, cfg.BackfillBatch) - if err != nil { - return fmt.Errorf("get missing blocks: %w", err) - } - - for _, b := range blocks { - if err := ctx.Err(); err != nil { - return err + lastSlotNumber, _ := db.GetMaxSlotNumber(ctx) + startSlot := lastSlotNumber - cfg.BackfillLookback + + batch := cfg.BackfillBatch + totalBatches := (cfg.BackfillLookback + int64(batch) - 1) / int64(batch) + logger.Info("Starting backfill", + "start_slot", startSlot, + "end_slot_exclusive", lastSlotNumber, + "lookback_slots", cfg.BackfillLookback, + "batch_size", cfg.BackfillBatch, + "total_batches", totalBatches, + ) + + batchSz := int64(cfg.BackfillBatch) + var processed int64 + for batchIdx := int64(0); batchIdx < totalBatches; batchIdx++ { + batchStart := startSlot + batchIdx*int64(batch) + batchEnd := batchStart + batchSz + if batchEnd > lastSlotNumber { + batchEnd = lastSlotNumber } + logger.Info("Batch begin", + "batch", batchIdx+1, "of", totalBatches, + "range", fmt.Sprintf("[%d,%d)", batchStart, batchEnd), + ) + for slot := batchStart; slot < batchEnd; slot++ { + tTotal := time.Now() + conversCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + blockNumber, err := ethereum.SlotToExecutionBlockNumber(conversCtx, httpc, cfg.BeaconBase, slot) + cancel() + if err != nil { + logger.Error("Failed to convert slot to block number", "slot", slot, "error", err) + continue + } + if blockNumber != 0 { - fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, b.BlockNumber) - cancel() - if ferr != nil || ei == nil { - logger.Error("beacon fetch failed", "block", b.BlockNumber, "error", ferr) - continue - } + if err := ctx.Err(); err != nil { + return err + } - if err := db.UpsertBlockFromExec(ctx, ei); err != nil { - logger.Error("block upsert failed", "slot", ei.Slot, "error", err) - continue - } + fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, blockNumber) + cancel() + if ferr != nil || ei == nil { + logger.Error("beacon fetch failed", "block", blockNumber, "error", ferr) + continue + } + + if err := db.UpsertBlockFromExec(ctx, ei); err != nil { + logger.Error("block upsert failed", "slot", ei.Slot, "error", err) + continue + } - var vpub []byte - if ei.ProposerIdx != nil { - vctx, vcancel := context.WithTimeout(ctx, 5*time.Second) - v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx) - vcancel() - if verr != nil { - logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr) - } else if len(v) > 0 { - vpub = v - - // Save validator pubkey - if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil { - logger.Error("validator update failed", "slot", ei.Slot, "error", err) - } else { - - opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub) - - if oerr != nil { - logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr) - } else { - updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second) - if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil { - logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr) + var vpub []byte + if ei.ProposerIdx != nil { + vctx, vcancel := context.WithTimeout(ctx, 5*time.Second) + v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx) + vcancel() + if verr != nil { + logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr) + } else if len(v) > 0 { + vpub = v + + // Save validator pubkey + if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil { + logger.Error("validator update failed", "slot", ei.Slot, "error", err) + } else { + opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub) + if oerr != nil { + logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr) + } else { + updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second) + if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil { + logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr) + } + updCancel() + } } - updCancel() } } - } - } + tBids := time.Now() + var wg sync.WaitGroup + for _, r := range relays { + wg.Add(1) + go func(rel relay.Row) { + defer wg.Done() + if err := ctx.Err(); err != nil { + return + } - for _, r := range relays { - if err := ctx.Err(); err != nil { - return err - } + bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) + bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) + bcancel() + if berr != nil { + logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) + return + } - bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) - bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, r.URL, ei.Slot) - bcancel() - if berr != nil { - logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) - continue - } + if len(bids) == 0 { + return + } - if len(bids) == 0 { - continue - } + rows := make([]database.BidRow, 0, len(bids)) + for _, bid := range bids { + if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { + rows = append(rows, row) + } + } - rows := make([]database.BidRow, 0, len(bids)) - for _, bid := range bids { - if row, ok := relay.BuildBidInsert(ei.Slot, r.ID, bid); ok { - rows = append(rows, row) + if len(rows) > 0 { + insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) + if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { + logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) + } + insCancel() + } + }(r) } + wg.Wait() + bidsMs := time.Since(tBids).Milliseconds() + logger.Info("Bids fetch and insert", "bids_ms", bidsMs) + processed++ + totalMS := time.Since(tTotal).Milliseconds() + logger.Info("total time taken", "total_ms", totalMS) } - if len(rows) > 0 { - insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) - if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { - logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) - } - insCancel() - } + logger.Info("Batch end", + "batch", batchIdx+1, + "processed_slots_so_far", processed, + ) } - logger.Debug("slot processed", "slot", ei.Slot) } - - logger.Info("Backfill completed", "blocks_processed", len(blocks)) + logger.Info("Backfill completed", "total_slots_processed", processed) return nil } diff --git a/tools/indexer/pkg/beacon/client.go b/tools/indexer/pkg/beacon/client.go index 2b7011143..f05249394 100644 --- a/tools/indexer/pkg/beacon/client.go +++ b/tools/indexer/pkg/beacon/client.go @@ -13,7 +13,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/hashicorp/go-retryablehttp" - "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" + httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" ) @@ -200,10 +200,6 @@ func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rp if err != nil { return nil, err } - - // Convert block number to slot for beacon chain query - slotNumber := ethereum.BlockNumberToSlot(blockNumber) - beaconData, _ := FetchBeaconExecutionBlock(ctx, httpc, beaconBase, blockNumber) // Merge data - use Alchemy as primary, beacon as supplement @@ -214,10 +210,6 @@ func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rp execBlock.RewardEth = beaconData.RewardEth execBlock.BuilderHex = beaconData.BuilderHex execBlock.FeeRecHex = beaconData.FeeRecHex - } else { - - execBlock.Slot = slotNumber } - return execBlock, nil } diff --git a/tools/indexer/pkg/database/starrock.go b/tools/indexer/pkg/database/starrock.go index da4929d89..258970a16 100644 --- a/tools/indexer/pkg/database/starrock.go +++ b/tools/indexer/pkg/database/starrock.go @@ -52,7 +52,13 @@ func Connect(ctx context.Context, dsn string, maxConns, minConns int) (*DB, erro func (db *DB) Close() error { return db.conn.Close() } - +func (db *DB) GetMaxSlotNumber(ctx context.Context) (int64, error) { + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + var slot int64 + err := db.conn.QueryRowContext(ctx2, `SELECT COALESCE(MAX(slot),0) FROM blocks`).Scan(&slot) + return slot, err +} func (db *DB) EnsureStateTable(ctx context.Context) error { ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -155,7 +161,7 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - var timestamp, proposerIndex, relayTag, rewardEth string + var timestamp, proposerIndex, relayTag, rewardEth, builderPubkeyPrefix, feeRecHex string if ei.Timestamp != nil { timestamp = fmt.Sprintf("'%s'", ei.Timestamp.Format("2006-01-02 15:04:05")) @@ -174,7 +180,16 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro } else { relayTag = "NULL" } - + if ei.BuilderHex != nil { + builderPubkeyPrefix = fmt.Sprintf("'%s'", (*ei.BuilderHex)) + } else { + builderPubkeyPrefix = "NULL" + } + if ei.FeeRecHex != nil { + feeRecHex = fmt.Sprintf("'%s'", (*ei.FeeRecHex)) + } else { + feeRecHex = "NULL" + } if ei.RewardEth != nil { rewardEth = fmt.Sprintf("%.6f", *ei.RewardEth) } else { @@ -184,9 +199,9 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro query := fmt.Sprintf(` INSERT INTO blocks( slot, block_number, timestamp, proposer_index, - winning_relay, producer_reward_eth -) VALUES (%d, %d, %s, %s, %s, %s)`, - ei.Slot, ei.BlockNumber, timestamp, proposerIndex, relayTag, rewardEth) + winning_relay, producer_reward_eth, winning_builder_pubkey, fee_recipient +) VALUES (%d, %d, %s, %s, %s, %s, %s, %s)`, + ei.Slot, ei.BlockNumber, timestamp, proposerIndex, relayTag, rewardEth, builderPubkeyPrefix, feeRecHex) _, err := db.conn.ExecContext(ctx2, query) if err != nil { @@ -429,7 +444,7 @@ func (db *DB) UpdateValidatorOptInStatus(ctx context.Context, slot int64, opted v = 1 } // TINYINT(1) in StarRocks q := fmt.Sprintf( - "UPDATE blocks SET validator_opted_in=%d WHERE slot=%d AND validator_opted_in IS NULL", + "UPDATE blocks SET validator_opted_in=%d WHERE slot=%d", v, slot, ) _, err := db.conn.ExecContext(ctx2, q) diff --git a/tools/indexer/pkg/ethereum/conversions.go b/tools/indexer/pkg/ethereum/conversions.go index 962bacf5a..0da5cda26 100644 --- a/tools/indexer/pkg/ethereum/conversions.go +++ b/tools/indexer/pkg/ethereum/conversions.go @@ -1,14 +1,49 @@ package ethereum -func BlockNumberToSlot(blockNumber int64) int64 { - // Ethereum mainnet merge happened at slot 4700013 (block 15537394) - const MERGE_BLOCK = 15537394 - const MERGE_SLOT = 4700013 +import ( + "context" + "fmt" + "time" - if blockNumber < MERGE_BLOCK { - return 0 // Pre-merge blocks don't have valid slots + "github.com/hashicorp/go-retryablehttp" + httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" +) + +// SlotToExecutionBlockNumber converts a beacon chain slot to an execution layer block number +// using the beaconcha.in API with retryable HTTP client and context support. +func SlotToExecutionBlockNumber(ctx context.Context, httpc *retryablehttp.Client, beaconBase string, slot int64) (int64, error) { + url := fmt.Sprintf("%s/slot/%d", beaconBase, slot) + + if _, has := ctx.Deadline(); !has { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 5*time.Second) + defer cancel() + } + + var wrap struct { + Status string `json:"status"` + Data struct { + ExecutionBlockNumber int64 `json:"exec_block_number"` + Status string `json:"status"` + } `json:"data"` + } + + if err := httputil.FetchJSON(ctx, httpc, url, &wrap); err != nil { + return 0, fmt.Errorf("failed to fetch slot %d: %w", slot, err) + } + + if wrap.Status != "OK" { + return 0, fmt.Errorf("API returned status: %s for slot %d", wrap.Status, slot) + } + + // Check if slot was missed + if wrap.Data.Status != "1" { + return 0, nil // Missed slot, not an error + } + + if wrap.Data.ExecutionBlockNumber == 0 { + return 0, fmt.Errorf("no execution block number for slot %d", slot) } - // Post-merge: roughly 1 slot per block - return MERGE_SLOT + (blockNumber - MERGE_BLOCK) + return wrap.Data.ExecutionBlockNumber, nil }