From 18c41575f5d880f01a1cc2b79e352b54a97d787d Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Mon, 13 Oct 2025 23:11:54 +0530 Subject: [PATCH 1/8] fixes --- tools/indexer/cmd/main.go | 16 +++ tools/indexer/cmd/start.go | 158 +++++++++++-------------- tools/indexer/pkg/backfill/backfill.go | 121 ++++++++++--------- tools/indexer/pkg/beacon/client.go | 136 ++++++++++++++------- tools/indexer/pkg/config/config.go | 21 ++++ tools/indexer/pkg/database/starrock.go | 36 ++++-- 6 files changed, 290 insertions(+), 198 deletions(-) diff --git a/tools/indexer/cmd/main.go b/tools/indexer/cmd/main.go index faf32b315..29f03226c 100644 --- a/tools/indexer/cmd/main.go +++ b/tools/indexer/cmd/main.go @@ -85,6 +85,18 @@ var ( EnvVars: []string{"INDEXER_HTTP_TIMEOUT"}, Value: 15 * time.Second, }) + + optionRelayFlag = altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "relay", + Usage: "Whether to run in relay mode", + EnvVars: []string{"INDEXER_RELAY"}, + Value: false, + }) + optionRelaysJSON = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "relays-json", + Usage: "JSON array overriding default relays (fields: relay_id,name,tag,url)", + EnvVars: []string{"INDEXER_RELAYS_JSON"}, + }) ) func createOptionsFromCLI(c *cli.Context) *config.Config { @@ -98,6 +110,8 @@ func createOptionsFromCLI(c *cli.Context) *config.Config { EtherscanKey: c.String("etherscan-key"), InfuraRPC: c.String("infura-rpc"), BeaconBase: c.String("beacon-base"), + RelayMode: c.Bool("relay"), + RelaysJSON: c.String("relays-json"), } } @@ -115,6 +129,8 @@ func main() { optionHTTPTimeout, optionOptInContract, optionEtherscanKey, + optionRelayFlag, + optionRelaysJSON, } app := &cli.App{ diff --git a/tools/indexer/cmd/start.go b/tools/indexer/cmd/start.go index 1f3a2b5d4..05fc70804 100644 --- a/tools/indexer/cmd/start.go +++ b/tools/indexer/cmd/start.go @@ -3,18 +3,20 @@ package main import ( "context" "fmt" - "log/slog" - "reflect" - "time" - _ "github.com/go-sql-driver/mysql" "github.com/hashicorp/go-retryablehttp" "github.com/primev/mev-commit/tools/indexer/pkg/backfill" "github.com/primev/mev-commit/tools/indexer/pkg/beacon" + "github.com/primev/mev-commit/tools/indexer/pkg/config" "github.com/primev/mev-commit/tools/indexer/pkg/database" "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" "github.com/primev/mev-commit/tools/indexer/pkg/relay" + "log/slog" + "reflect" + "sync" + "sync/atomic" + "time" "github.com/urfave/cli/v2" ) @@ -36,21 +38,6 @@ func initializeDatabase(ctx context.Context, dbURL string, logger *slog.Logger) return db, nil } -func loadRelays(ctx context.Context, db *database.DB, logger *slog.Logger) ([]relay.Row, error) { - relays, err := relay.UpsertRelaysAndLoad(ctx, db) - if err != nil { - logger.Error("[RELAY] failed to load", "error", err) - return nil, err - } - - logger.Info("[RELAY] loaded active relays", "count", len(relays)) - for _, r := range relays { - logger.Info("[RELAY] relay found", "id", r.ID, "url", r.URL) - } - - return relays, nil -} - func getStartingBlockNumber(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, infuraRPC string, logger *slog.Logger) (int64, error) { lastBN, found := db.LoadLastBlockNumber(ctx) @@ -138,8 +125,8 @@ func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, http "timestamp", ei.Timestamp, "proposer_index", safe(ei.ProposerIdx), "winning_relay", safe(ei.RelayTag), - "builder_pubkey_prefix", safe(ei.BuilderHex), - "producer_reward_eth", safe(ei.RewardEth), + "builder_pubkey_prefix", safe(ei.BuilderPublicKey), + "mev_reward_eth", safe(ei.MevRewardEth), ) if err := db.UpsertBlockFromExec(ctx, ei); err != nil { @@ -147,10 +134,13 @@ func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, http return lastBN } logger.Info("[DB] block saved successfully", "block", nextBN) + cfg := createOptionsFromCLI(c) - if err := processBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { - logger.Error("failed to process bids", "error", err) - return lastBN + if cfg.RelayMode { + if err := processBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { + logger.Error("failed to process bids", "error", err) + return lastBN + } } if err := launchValidatorTasks(ctx, c, db, httpc, ei, beaconBase, logger); err != nil { logger.Error("[VALIDATOR] failed to launch async tasks", "slot", ei.Slot, "error", err) @@ -162,68 +152,51 @@ func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, http } func processBidsForBlock(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, ei *beacon.ExecInfo, logger *slog.Logger) error { - - // Fetch and store bid data from all relays - totalBids := 0 - successfulRelays := 0 - const batchSize = 500 - for _, rr := range relays { - if err := ctx.Err(); err != nil { - logger.Warn("main context canceled, stopping relay processing") - return err - } - - bids, err := relay.FetchBuilderBlocksReceived(ctx, httpc, rr.URL, ei.Slot) - if err != nil { - // logger.Error("[RELAY] failed to fetch bids", "relay_id", rr.ID, "url", rr.URL, "error", err) - return fmt.Errorf("fetch bids: relay_id=%d url=%s slot=%d: %w", rr.ID, rr.URL, ei.Slot, err) - - } - - relayBids := 0 - batch := make([]database.BidRow, 0, batchSize) - - for _, bid := range bids { - + logger.Info("[BIDS] processing bids for block", "block", ei.BlockNumber, "slot", ei.Slot) + var wg sync.WaitGroup + var totalBids int64 + var successfulRelays int64 + for _, r := range relays { + r := r + wg.Add(1) + go func(rel relay.Row) { + defer wg.Done() if err := ctx.Err(); err != nil { - logger.Warn("[BIDS] main context canceled, stopping bid insertion") - return err + return } - - if row, ok := relay.BuildBidInsert(ei.Slot, rr.ID, bid); ok { - batch = append(batch, row) - - if len(batch) >= batchSize { - insCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - if err := db.InsertBidsBatch(insCtx, batch); err != nil { - - logger.Error("[DB]batch insert failed", "slot", ei.Slot, "relay_id", rr.ID, "count", len(batch), "error", err) - } else { - relayBids += len(batch) - } - cancel() - batch = batch[:0] + bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) + bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) + bcancel() + if berr != nil { + logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) + return + } + if len(bids) == 0 { + return + } + rows := make([]database.BidRow, 0, len(bids)) + for _, bid := range bids { + if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { + rows = append(rows, row) } } - } - - // final flush - if len(batch) > 0 { - flushCtx, flushCancel := context.WithTimeout(context.Background(), 5*time.Second) - if err := db.InsertBidsBatch(flushCtx, batch); err != nil { - logger.Error("[DB] batch insert failed", "slot", ei.Slot, "relay_id", rr.ID, "count", len(batch), "error", err) - } else { - relayBids += len(batch) + if len(rows) > 0 { + insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) + if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { + logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) + } + insCancel() } - flushCancel() - } - - if relayBids > 0 { - logger.Info("[BIDS] bids collected", "relay_id", rr.ID, "count", relayBids) - totalBids += relayBids - successfulRelays++ - } + atomic.AddInt64(&totalBids, int64(len(rows))) + atomic.AddInt64(&successfulRelays, 1) + logger.Info("[BIDS] ok", + "slot", ei.Slot, "relay_id", rel.ID, + "bids_in", len(bids), "rows_out", len(rows), + ) + }(r) } + wg.Wait() + logger.Info("[BIDS] summary", "block", ei.BlockNumber, "total_bids", totalBids, "successful_relays", successfulRelays) return nil } @@ -300,7 +273,24 @@ func startIndexer(c *cli.Context) error { "block_interval", c.Duration("block-interval"), "validator_delay", c.Duration("validator-delay")) ctx := c.Context - + var relays []relay.Row + if c.Bool(optionRelayFlag.Name) { + cfgRelays, err := config.ResolveRelays(c) + if err != nil { + return err + } + relays = make([]relay.Row, 0, len(cfgRelays)) + for _, r := range cfgRelays { + relays = append(relays, relay.Row{ID: r.Relay_id, URL: r.URL}) + } + initLogger.Info("[RELAY] enabled", "count", len(relays)) + } else { + initLogger.Info("[RELAY] disabled") + relays = make([]relay.Row, 0, len(config.RelaysDefault)) + for _, r := range config.RelaysDefault { + relays = append(relays, relay.Row{ID: r.Relay_id, URL: r.URL}) + } + } db, err := initializeDatabase(ctx, dbURL, initLogger) if err != nil { return err @@ -315,12 +305,6 @@ func startIndexer(c *cli.Context) error { httpc := httputil.NewHTTPClient(c.Duration("http-timeout")) initLogger.Info("[HTTP] client initialized", "timeout", c.Duration("http-timeout")) - // Load relay configurations - relays, err := loadRelays(ctx, db, initLogger) - if err != nil { - return err - } - // Get starting block number lastBN, err := getStartingBlockNumber(ctx, db, httpc, infuraRPC, initLogger) if err != nil { diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go index 5fa43f107..260930dd1 100644 --- a/tools/indexer/pkg/backfill/backfill.go +++ b/tools/indexer/pkg/backfill/backfill.go @@ -21,7 +21,6 @@ type SlotData struct { ValidatorPubkey []byte ProposerIdx *int64 } - func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, cfg *config.Config, relays []relay.Row) error { logger := slog.With("component", "backfill") logger.Info("Starting streaming backfill") @@ -29,11 +28,16 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c if err := ctx.Err(); err != nil { return err } + lastSlotNumber, _ := db.GetMaxSlotNumber(ctx) startSlot := lastSlotNumber - cfg.BackfillLookback + if startSlot < 0 { + startSlot = 0 + } + + batch := int64(cfg.BackfillBatch) + totalBatches := (cfg.BackfillLookback + batch - 1) / batch - batch := cfg.BackfillBatch - totalBatches := (cfg.BackfillLookback + int64(batch) - 1) / int64(batch) logger.Info("Starting backfill", "start_slot", startSlot, "end_slot_exclusive", lastSlotNumber, @@ -44,74 +48,81 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c batchSz := int64(cfg.BackfillBatch) var processed int64 + for batchIdx := int64(0); batchIdx < totalBatches; batchIdx++ { - batchStart := startSlot + batchIdx*int64(batch) + batchStart := startSlot + batchIdx*batch batchEnd := batchStart + batchSz if batchEnd > lastSlotNumber { batchEnd = lastSlotNumber } + logger.Info("Batch begin", "batch", batchIdx+1, "of", totalBatches, "range", fmt.Sprintf("[%d,%d)", batchStart, batchEnd), ) + for slot := batchStart; slot < batchEnd; slot++ { tTotal := time.Now() - conversCtx, cancel := context.WithTimeout(ctx, 3*time.Second) - blockNumber, err := ethereum.SlotToExecutionBlockNumber(conversCtx, httpc, cfg.BeaconBase, slot) + + convCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + blockNumber, err := ethereum.SlotToExecutionBlockNumber(convCtx, httpc, cfg.BeaconBase, slot) cancel() if err != nil { logger.Error("Failed to convert slot to block number", "slot", slot, "error", err) continue } - if blockNumber != 0 { - - if err := ctx.Err(); err != nil { - return err - } + if blockNumber == 0 { + continue + } - fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, blockNumber) - cancel() - if ferr != nil || ei == nil { - logger.Error("beacon fetch failed", "block", blockNumber, "error", ferr) - continue - } + fetchCtx, fetchCancel := context.WithTimeout(ctx, 5*time.Second) + ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, blockNumber) + fetchCancel() + if ferr != nil || ei == nil { + logger.Error("beacon fetch failed", "block", blockNumber, "error", ferr) + continue + } - if err := db.UpsertBlockFromExec(ctx, ei); err != nil { - logger.Error("block upsert failed", "slot", ei.Slot, "error", err) - continue - } + if err := db.UpsertBlockFromExec(ctx, ei); err != nil { + logger.Error("block upsert failed", "slot", ei.Slot, "error", err) + continue + } - var vpub []byte - if ei.ProposerIdx != nil { - vctx, vcancel := context.WithTimeout(ctx, 5*time.Second) - v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx) - vcancel() - if verr != nil { - logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr) - } else if len(v) > 0 { - vpub = v - - // Save validator pubkey - if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil { - logger.Error("validator update failed", "slot", ei.Slot, "error", err) + // Validator pubkey + opt-in status + var vpub []byte + if ei.ProposerIdx != nil { + vctx, vcancel := context.WithTimeout(ctx, 5*time.Second) + v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx) + vcancel() + if verr != nil { + logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr) + } else if len(v) > 0 { + vpub = v + // Save validator pubkey + if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil { + logger.Error("validator update failed", "slot", ei.Slot, "error", err) + } else { + opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub) + if oerr != nil { + logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr) } else { - opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub) - if oerr != nil { - logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr) - } else { - updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second) - if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil { - logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr) - } - updCancel() + updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second) + if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil { + logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr) } + updCancel() } } } + } + + // Relay mode: fetch & insert bids + if cfg.RelayMode { tBids := time.Now() var wg sync.WaitGroup + for _, r := range relays { + r := r // capture loop var wg.Add(1) go func(rel relay.Row) { defer wg.Done() @@ -123,10 +134,9 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) bcancel() if berr != nil { - logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) + logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", rel.ID, "error", berr) return } - if len(bids) == 0 { return } @@ -141,26 +151,29 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c if len(rows) > 0 { insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { - logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) + logger.Error("bid insert failed", "slot", ei.Slot, "relay", rel.ID, "error", ierr) } insCancel() } }(r) } + wg.Wait() bidsMs := time.Since(tBids).Milliseconds() logger.Info("Bids fetch and insert", "bids_ms", bidsMs) - processed++ - totalMS := time.Since(tTotal).Milliseconds() - logger.Info("total time taken", "total_ms", totalMS) } - logger.Info("Batch end", - "batch", batchIdx+1, - "processed_slots_so_far", processed, - ) + processed++ + totalMS := time.Since(tTotal).Milliseconds() + logger.Info("total time taken", "total_ms", totalMS) } + + logger.Info("Batch end", + "batch", batchIdx+1, + "processed_slots_so_far", processed, + ) } + logger.Info("Backfill completed", "total_slots_processed", processed) return nil } diff --git a/tools/indexer/pkg/beacon/client.go b/tools/indexer/pkg/beacon/client.go index f05249394..466bebbcc 100644 --- a/tools/indexer/pkg/beacon/client.go +++ b/tools/indexer/pkg/beacon/client.go @@ -13,19 +13,20 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/hashicorp/go-retryablehttp" - httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" ) type ExecInfo struct { - BlockNumber int64 - Slot int64 - ProposerIdx *int64 - Timestamp *time.Time - RelayTag *string - BuilderHex *string - FeeRecHex *string - RewardEth *float64 + BlockNumber int64 + Slot int64 + ProposerIdx *int64 + Timestamp *time.Time + RelayTag *string + BuilderPublicKey *string + ProposerFeeRecHex *string + MevRewardEth *float64 + ProposerRewardEth *float64 + FeeRecipient *string } func FetchBeaconExecutionBlock(ctx context.Context, httpc *retryablehttp.Client, beaconBase string, blockNum int64) (*ExecInfo, error) { @@ -85,38 +86,58 @@ func FetchBeaconExecutionBlock(ctx context.Context, httpc *retryablehttp.Client, out.RelayTag = &s } if s, ok := rel["builderPubkey"].(string); ok { - out.BuilderHex = &s + out.BuilderPublicKey = &s } if s, ok := rel["producerFeeRecipient"].(string); ok { - out.FeeRecHex = &s + out.ProposerFeeRecHex = &s } } - // reward eth from blockMevReward or producerReward + // reward eth from blockMevReward if v, ok := j["blockMevReward"]; ok { switch t := v.(type) { case float64: f := t - if f > 1e10 { - f = f / 1e18 // wei -> ETH + if f > 1e10 { // likely wei + f = f / 1e18 } - out.RewardEth = &f + out.MevRewardEth = &f case string: if strings.HasPrefix(t, "0x") { if bi, ok := new(big.Int).SetString(t[2:], 16); ok { f, _ := new(big.Rat).SetFrac(bi, big.NewInt(1e18)).Float64() - out.RewardEth = &f + out.MevRewardEth = &f } } else if f, err := strconv.ParseFloat(t, 64); err == nil { - out.RewardEth = &f + out.MevRewardEth = &f } } - } else if v, ok := j["producerReward"]; ok { - if f, ok := v.(float64); ok { - out.RewardEth = &f + } + + // producerReward → out.ProposerRewardEth (ETH units) + if v, ok := j["producerReward"]; ok { + switch t := v.(type) { + case float64: + f := t + if f > 1e10 { + f = f / 1e18 + } + out.ProposerRewardEth = &f + case string: + if strings.HasPrefix(t, "0x") { + if bi, ok := new(big.Int).SetString(t[2:], 16); ok { + f, _ := new(big.Rat).SetFrac(bi, big.NewInt(1e18)).Float64() + out.ProposerRewardEth = &f + } + } else if f, err := strconv.ParseFloat(t, 64); err == nil { + out.ProposerRewardEth = &f + } } } + if fr, ok := j["feeRecipient"].(string); ok && strings.TrimSpace(fr) != "" { + out.FeeRecipient = &fr + } // sanity if out.Slot == 0 { return nil, fmt.Errorf("exec block missing posConsensus.slot for %d", blockNum) @@ -149,67 +170,92 @@ func FetchValidatorPubkey(ctx context.Context, httpc *retryablehttp.Client, beac // to fetch blocks from Alchemy RPC func fetchBlockFromRPC(httpc *retryablehttp.Client, rpcURL string, blockNumber int64) (*ExecInfo, error) { - underlyingClient := httpc.HTTPClient - // Get block data from Alchemy payload := map[string]any{ "jsonrpc": "2.0", "id": 1, "method": "eth_getBlockByNumber", - "params": []any{fmt.Sprintf("0x%x", blockNumber), true}, // true for full transaction objects + "params": []any{fmt.Sprintf("0x%x", blockNumber), false}, // false = no full txs (faster) } - buf, _ := json.Marshal(payload) - req, _ := http.NewRequest("POST", rpcURL, bytes.NewReader(buf)) + + req, _ := retryablehttp.NewRequest("POST", rpcURL, bytes.NewReader(buf)) req.Header.Set("Content-Type", "application/json") - resp, err := underlyingClient.Do(req) + resp, err := httpc.Do(req) if err != nil { return nil, err } - defer func() { _ = resp.Body.Close() }() + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("rpc HTTP %d", resp.StatusCode) + } var result struct { Result struct { Number string `json:"number"` - Timestamp string `json:"timestamp"` - Miner string `json:"miner"` + Timestamp string `json:"timestamp"` // hex + Miner string `json:"miner"` // some nodes + Author string `json:"author"` // others use author } `json:"result"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error,omitempty"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, err } - + if result.Error != nil { + return nil, fmt.Errorf("rpc error %d: %s", result.Error.Code, result.Error.Message) + } if result.Result.Number == "" { return nil, fmt.Errorf("block not found") } - // Convert hex timestamp to time - timestampHex := result.Result.Timestamp[2:] // Remove 0x - timestamp, _ := strconv.ParseInt(timestampHex, 16, 64) - blockTime := time.Unix(timestamp, 0) + // timestamp hex -> time + tsHex := strings.TrimPrefix(result.Result.Timestamp, "0x") + secs, err := strconv.ParseInt(tsHex, 16, 64) + if err != nil { + return nil, fmt.Errorf("bad timestamp: %w", err) + } + t := time.Unix(secs, 0).UTC() - return &ExecInfo{ + // fee recipient (coinbase) from EL header + fr := result.Result.Miner + if fr == "" { + fr = result.Result.Author + } + + out := &ExecInfo{ BlockNumber: blockNumber, - Timestamp: &blockTime, - }, nil + Timestamp: &t, + } + if strings.TrimSpace(fr) != "" { + out.FeeRecipient = &fr + } + return out, nil } -func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rpcURL string, beaconBase string, blockNumber int64) (*ExecInfo, error) { - // Get execution block from Alchemy (always available) +func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rpcURL, beaconBase string, blockNumber int64) (*ExecInfo, error) { execBlock, err := fetchBlockFromRPC(httpc, rpcURL, blockNumber) if err != nil { return nil, err } - beaconData, _ := FetchBeaconExecutionBlock(ctx, httpc, beaconBase, blockNumber) - // Merge data - use Alchemy as primary, beacon as supplement - if beaconData != nil { + // Best-effort: enrich from beacon (slot, proposer, relay, rewards, etc.) + if beaconData, _ := FetchBeaconExecutionBlock(ctx, httpc, beaconBase, blockNumber); beaconData != nil { execBlock.Slot = beaconData.Slot execBlock.ProposerIdx = beaconData.ProposerIdx execBlock.RelayTag = beaconData.RelayTag - execBlock.RewardEth = beaconData.RewardEth - execBlock.BuilderHex = beaconData.BuilderHex - execBlock.FeeRecHex = beaconData.FeeRecHex + execBlock.BuilderPublicKey = beaconData.BuilderPublicKey + execBlock.ProposerFeeRecHex = beaconData.ProposerFeeRecHex + execBlock.MevRewardEth = beaconData.MevRewardEth + execBlock.ProposerRewardEth = beaconData.ProposerRewardEth + + // If RPC didn't provide fee recipient for any reason, fall back to beacon (if present) + if execBlock.FeeRecipient == nil && beaconData.FeeRecipient != nil { + execBlock.FeeRecipient = beaconData.FeeRecipient + } } return execBlock, nil } diff --git a/tools/indexer/pkg/config/config.go b/tools/indexer/pkg/config/config.go index 227e30610..15b3e4633 100644 --- a/tools/indexer/pkg/config/config.go +++ b/tools/indexer/pkg/config/config.go @@ -1,6 +1,10 @@ package config import ( + "encoding/json" + "fmt" + "github.com/urfave/cli/v2" + "strings" "time" ) @@ -18,6 +22,21 @@ var RelaysDefault = []Relay{ {Relay_id: 4, Name: "Bloxroute Regulated", Tag: "bloxroute-regulated-relay", URL: "https://bloxroute.regulated.blxrbdn.com"}, } +func ResolveRelays(c *cli.Context) ([]Relay, error) { + s := strings.TrimSpace(c.String("relays-json")) + if s == "" { + return RelaysDefault, nil + } + var v []Relay + if err := json.Unmarshal([]byte(s), &v); err != nil { + return nil, fmt.Errorf("invalid --relays-json: %w", err) + } + if len(v) == 0 { + return nil, fmt.Errorf("--relays-json provided but empty") + } + return v, nil +} + type Config struct { BlockTick time.Duration ValidatorWait time.Duration @@ -31,4 +50,6 @@ type Config struct { EtherscanKey string InfuraRPC string BeaconBase string + RelayMode bool + RelaysJSON string } diff --git a/tools/indexer/pkg/database/starrock.go b/tools/indexer/pkg/database/starrock.go index 258970a16..35f358019 100644 --- a/tools/indexer/pkg/database/starrock.go +++ b/tools/indexer/pkg/database/starrock.go @@ -161,7 +161,7 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - var timestamp, proposerIndex, relayTag, rewardEth, builderPubkeyPrefix, feeRecHex string + var timestamp, proposerIndex, relayTag, builderPubkeyPrefix, proposerFeeRecHex, mevRewardEth, feeRecHex, proposerRewardEth string if ei.Timestamp != nil { timestamp = fmt.Sprintf("'%s'", ei.Timestamp.Format("2006-01-02 15:04:05")) @@ -180,29 +180,41 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro } else { relayTag = "NULL" } - if ei.BuilderHex != nil { - builderPubkeyPrefix = fmt.Sprintf("'%s'", (*ei.BuilderHex)) + if ei.BuilderPublicKey != nil { + builderPubkeyPrefix = fmt.Sprintf("'%s'", (*ei.BuilderPublicKey)) } else { builderPubkeyPrefix = "NULL" } - if ei.FeeRecHex != nil { - feeRecHex = fmt.Sprintf("'%s'", (*ei.FeeRecHex)) + if ei.ProposerFeeRecHex != nil { + proposerFeeRecHex = fmt.Sprintf("'%s'", (*ei.ProposerFeeRecHex)) + } else { + proposerFeeRecHex = "NULL" + } + if ei.MevRewardEth != nil { + mevRewardEth = fmt.Sprintf("%.6f", *ei.MevRewardEth) + } else { + mevRewardEth = "NULL" + } + if ei.FeeRecipient != nil { + feeRecHex = fmt.Sprintf("'%s'", (*ei.FeeRecipient)) } else { feeRecHex = "NULL" } - if ei.RewardEth != nil { - rewardEth = fmt.Sprintf("%.6f", *ei.RewardEth) + if ei.ProposerRewardEth != nil { + proposerRewardEth = fmt.Sprintf("%.6f", *ei.ProposerRewardEth) } else { - rewardEth = "NULL" + proposerRewardEth = "NULL" } query := fmt.Sprintf(` INSERT INTO blocks( slot, block_number, timestamp, proposer_index, - winning_relay, producer_reward_eth, winning_builder_pubkey, fee_recipient -) VALUES (%d, %d, %s, %s, %s, %s, %s, %s)`, - ei.Slot, ei.BlockNumber, timestamp, proposerIndex, relayTag, rewardEth, builderPubkeyPrefix, feeRecHex) - + winning_relay, winning_builder_pubkey, proposer_fee_recipient, mev_reward, proposer_reward_eth, fee_recipient +) VALUES (%d, %d, %s, %s, %s, %s, %s, %s, %s, %s)`, + ei.Slot, ei.BlockNumber, timestamp, proposerIndex, + relayTag, builderPubkeyPrefix, proposerFeeRecHex, + mevRewardEth, proposerRewardEth, feeRecHex, + ) _, err := db.conn.ExecContext(ctx2, query) if err != nil { return fmt.Errorf("upsert block slot=%d: %w", ei.Slot, err) From f35dca629e400e5c00279f0179c7d19454abff83 Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Mon, 13 Oct 2025 23:47:33 +0530 Subject: [PATCH 2/8] fixes --- tools/indexer/pkg/backfill/backfill.go | 169 +++++++++++++------------ tools/indexer/pkg/beacon/client.go | 10 +- 2 files changed, 89 insertions(+), 90 deletions(-) diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go index 260930dd1..5a4c9a63d 100644 --- a/tools/indexer/pkg/backfill/backfill.go +++ b/tools/indexer/pkg/backfill/backfill.go @@ -21,6 +21,7 @@ type SlotData struct { ValidatorPubkey []byte ProposerIdx *int64 } + func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, cfg *config.Config, relays []relay.Row) error { logger := slog.With("component", "backfill") logger.Info("Starting streaming backfill") @@ -71,109 +72,109 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c logger.Error("Failed to convert slot to block number", "slot", slot, "error", err) continue } - if blockNumber == 0 { - continue - } - - fetchCtx, fetchCancel := context.WithTimeout(ctx, 5*time.Second) - ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, blockNumber) - fetchCancel() - if ferr != nil || ei == nil { - logger.Error("beacon fetch failed", "block", blockNumber, "error", ferr) - continue - } + if blockNumber != 0 { + if err := ctx.Err(); err != nil { + return err + } + fetchCtx, fetchCancel := context.WithTimeout(ctx, 5*time.Second) + ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, blockNumber) + fetchCancel() + if ferr != nil || ei == nil { + logger.Error("beacon fetch failed", "block", blockNumber, "error", ferr) + continue + } - if err := db.UpsertBlockFromExec(ctx, ei); err != nil { - logger.Error("block upsert failed", "slot", ei.Slot, "error", err) - continue - } + if err := db.UpsertBlockFromExec(ctx, ei); err != nil { + logger.Error("block upsert failed", "slot", ei.Slot, "error", err) + continue + } - // Validator pubkey + opt-in status - var vpub []byte - if ei.ProposerIdx != nil { - vctx, vcancel := context.WithTimeout(ctx, 5*time.Second) - v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx) - vcancel() - if verr != nil { - logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr) - } else if len(v) > 0 { - vpub = v - // Save validator pubkey - if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil { - logger.Error("validator update failed", "slot", ei.Slot, "error", err) - } else { - opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub) - if oerr != nil { - logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr) + // Validator pubkey + opt-in status + var vpub []byte + if ei.ProposerIdx != nil { + vctx, vcancel := context.WithTimeout(ctx, 5*time.Second) + v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx) + vcancel() + if verr != nil { + logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr) + } else if len(v) > 0 { + vpub = v + // Save validator pubkey + if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil { + logger.Error("validator update failed", "slot", ei.Slot, "error", err) } else { - updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second) - if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil { - logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr) + opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub) + if oerr != nil { + logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr) + } else { + updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second) + if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil { + logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr) + } + updCancel() } - updCancel() } } } - } - // Relay mode: fetch & insert bids - if cfg.RelayMode { - tBids := time.Now() - var wg sync.WaitGroup - - for _, r := range relays { - r := r // capture loop var - wg.Add(1) - go func(rel relay.Row) { - defer wg.Done() - if err := ctx.Err(); err != nil { - return - } + // Relay mode: fetch & insert bids + if cfg.RelayMode { + tBids := time.Now() + var wg sync.WaitGroup + + for _, r := range relays { + r := r // capture loop var + wg.Add(1) + go func(rel relay.Row) { + defer wg.Done() + if err := ctx.Err(); err != nil { + return + } - bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) - bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) - bcancel() - if berr != nil { - logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", rel.ID, "error", berr) - return - } - if len(bids) == 0 { - return - } + bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) + bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) + bcancel() + if berr != nil { + logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", rel.ID, "error", berr) + return + } + if len(bids) == 0 { + return + } - rows := make([]database.BidRow, 0, len(bids)) - for _, bid := range bids { - if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { - rows = append(rows, row) + rows := make([]database.BidRow, 0, len(bids)) + for _, bid := range bids { + if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { + rows = append(rows, row) + } } - } - if len(rows) > 0 { - insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) - if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { - logger.Error("bid insert failed", "slot", ei.Slot, "relay", rel.ID, "error", ierr) + if len(rows) > 0 { + insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) + if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { + logger.Error("bid insert failed", "slot", ei.Slot, "relay", rel.ID, "error", ierr) + } + insCancel() } - insCancel() - } - }(r) + }(r) + } + + wg.Wait() + bidsMs := time.Since(tBids).Milliseconds() + logger.Info("Bids fetch and insert", "bids_ms", bidsMs) } - wg.Wait() - bidsMs := time.Since(tBids).Milliseconds() - logger.Info("Bids fetch and insert", "bids_ms", bidsMs) + processed++ + totalMS := time.Since(tTotal).Milliseconds() + logger.Info("total time taken", "total_ms", totalMS) } - processed++ - totalMS := time.Since(tTotal).Milliseconds() - logger.Info("total time taken", "total_ms", totalMS) + logger.Info("Batch end", + "batch", batchIdx+1, + "processed_slots_so_far", processed, + ) } - - logger.Info("Batch end", - "batch", batchIdx+1, - "processed_slots_so_far", processed, - ) } - logger.Info("Backfill completed", "total_slots_processed", processed) return nil } diff --git a/tools/indexer/pkg/beacon/client.go b/tools/indexer/pkg/beacon/client.go index 466bebbcc..438c078ce 100644 --- a/tools/indexer/pkg/beacon/client.go +++ b/tools/indexer/pkg/beacon/client.go @@ -174,7 +174,7 @@ func fetchBlockFromRPC(httpc *retryablehttp.Client, rpcURL string, blockNumber i "jsonrpc": "2.0", "id": 1, "method": "eth_getBlockByNumber", - "params": []any{fmt.Sprintf("0x%x", blockNumber), false}, // false = no full txs (faster) + "params": []any{fmt.Sprintf("0x%x", blockNumber), true}, } buf, _ := json.Marshal(payload) @@ -193,9 +193,9 @@ func fetchBlockFromRPC(httpc *retryablehttp.Client, rpcURL string, blockNumber i var result struct { Result struct { Number string `json:"number"` - Timestamp string `json:"timestamp"` // hex - Miner string `json:"miner"` // some nodes - Author string `json:"author"` // others use author + Timestamp string `json:"timestamp"` + Miner string `json:"miner"` + Author string `json:"author"` } `json:"result"` Error *struct { Code int `json:"code"` @@ -221,7 +221,6 @@ func fetchBlockFromRPC(httpc *retryablehttp.Client, rpcURL string, blockNumber i } t := time.Unix(secs, 0).UTC() - // fee recipient (coinbase) from EL header fr := result.Result.Miner if fr == "" { fr = result.Result.Author @@ -242,7 +241,6 @@ func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rp return nil, err } - // Best-effort: enrich from beacon (slot, proposer, relay, rewards, etc.) if beaconData, _ := FetchBeaconExecutionBlock(ctx, httpc, beaconBase, blockNumber); beaconData != nil { execBlock.Slot = beaconData.Slot execBlock.ProposerIdx = beaconData.ProposerIdx From d67a75689f546edce22efe26435201f7e8828e2a Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Tue, 14 Oct 2025 00:01:59 +0530 Subject: [PATCH 3/8] fixed lint issues --- tools/indexer/pkg/beacon/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/indexer/pkg/beacon/client.go b/tools/indexer/pkg/beacon/client.go index 438c078ce..2a42740ff 100644 --- a/tools/indexer/pkg/beacon/client.go +++ b/tools/indexer/pkg/beacon/client.go @@ -185,7 +185,9 @@ func fetchBlockFromRPC(httpc *retryablehttp.Client, rpcURL string, blockNumber i if err != nil { return nil, err } - defer resp.Body.Close() + defer func() { + _ = resp.Body.Close() + }() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("rpc HTTP %d", resp.StatusCode) } From 4b9cd697eab2daa258d7c3b110b9bc48e68fef79 Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Fri, 17 Oct 2025 18:03:36 +0530 Subject: [PATCH 4/8] fixes --- tools/indexer/cmd/main.go | 2 +- tools/indexer/cmd/start.go | 155 ++++------------------ tools/indexer/pkg/backfill/backfill.go | 173 +++++++++++++++---------- tools/indexer/pkg/config/config.go | 2 +- 4 files changed, 134 insertions(+), 198 deletions(-) diff --git a/tools/indexer/cmd/main.go b/tools/indexer/cmd/main.go index 29f03226c..520ea6eac 100644 --- a/tools/indexer/cmd/main.go +++ b/tools/indexer/cmd/main.go @@ -110,7 +110,7 @@ func createOptionsFromCLI(c *cli.Context) *config.Config { EtherscanKey: c.String("etherscan-key"), InfuraRPC: c.String("infura-rpc"), BeaconBase: c.String("beacon-base"), - RelayMode: c.Bool("relay"), + RelayData: c.Bool("relay"), RelaysJSON: c.String("relays-json"), } } diff --git a/tools/indexer/cmd/start.go b/tools/indexer/cmd/start.go index 05fc70804..8d6ede8d0 100644 --- a/tools/indexer/cmd/start.go +++ b/tools/indexer/cmd/start.go @@ -2,7 +2,10 @@ package main import ( "context" - "fmt" + "log/slog" + "reflect" + "time" + _ "github.com/go-sql-driver/mysql" "github.com/hashicorp/go-retryablehttp" "github.com/primev/mev-commit/tools/indexer/pkg/backfill" @@ -12,11 +15,6 @@ import ( "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" "github.com/primev/mev-commit/tools/indexer/pkg/relay" - "log/slog" - "reflect" - "sync" - "sync/atomic" - "time" "github.com/urfave/cli/v2" ) @@ -24,16 +22,16 @@ import ( func initializeDatabase(ctx context.Context, dbURL string, logger *slog.Logger) (*database.DB, error) { db, err := database.Connect(ctx, dbURL, 20, 5) if err != nil { - logger.Error("[DB] connection failed", "error", err) + logger.Error("database connection failed", "error", err) return nil, err } - logger.Info("[DB] connected to StarRocks database") + logger.Info("database connected to StarRocks database") if err := db.EnsureStateTable(ctx); err != nil { - logger.Error("[DB] failed to ensure state table", "error", err) + logger.Error("database failed to ensure state table", "error", err) return nil, err } - logger.Info("[DB] state table ready") + logger.Info("database state table ready") return db, nil } @@ -69,16 +67,16 @@ func runBackfillIfConfigured(ctx context.Context, c *cli.Context, db *database.D logger.Info("indexer configuration", "lookback", c.Int("backfill-lookback"), "batch", c.Int("backfill-batch")) if c.Int("backfill-lookback") > 0 { - logger.Info("[BACKFILL] running one-time backfill", + logger.Info("running one-time backfill", "lookback", c.Int("backfill-lookback"), "batch", c.Int("backfill-batch")) if err := backfill.RunAll(ctx, db, httpc, createOptionsFromCLI(c), relays); err != nil { - logger.Error("[BACKFILL] failed", "error", err) + logger.Error("failed to backfill", "error", err) } else { - logger.Info("[BACKFILL] completed startup backfill") + logger.Info("completed startup backfill") } } else { - logger.Info("[BACKFILL] skipped", "reason", "backfill-lookback=0") + logger.Info("backfill skipped", "reason", "backfill-lookback=0") } } @@ -91,11 +89,11 @@ func runMainLoop(ctx context.Context, c *cli.Context, db *database.DB, httpc *re for { select { case <-ctx.Done(): - logger.Info("[SHUTDOWN] graceful shutdown initiated", "reason", ctx.Err()) + logger.Info("shutdown graceful shutdown initiated", "reason", ctx.Err()) if err := db.SaveLastBlockNumber(ctx, lastBN); err != nil { - logger.Error("[SHUTDOWN] failed to save last block number", "error", err) + logger.Error("shutdown failed to save last block number", "error", err) } - logger.Info("[SHUTDOWN] indexer stopped", "block", lastBN) + logger.Info("shutdown indexer stopped", "block", lastBN) return nil case <-mainTicker.C: @@ -115,7 +113,7 @@ func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, http ei, err := beacon.FetchCombinedBlockData(ctx, httpc, infuraRPC, beaconBase, nextBN) if err != nil || ei == nil { - logger.Warn("[BLOCK] not available yet", "block", nextBN, "error", err) + logger.Warn("block not available yet", "block", nextBN, "error", err) return lastBN } @@ -130,20 +128,20 @@ func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, http ) if err := db.UpsertBlockFromExec(ctx, ei); err != nil { - logger.Error("[DB] failed to save block", "block", nextBN, "error", err) + logger.Error("failed to save block", "block", nextBN, "error", err) return lastBN } - logger.Info("[DB] block saved successfully", "block", nextBN) + logger.Info("block saved successfully", "block", nextBN) cfg := createOptionsFromCLI(c) - if cfg.RelayMode { - if err := processBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { + if cfg.RelayData { + if err := backfill.ProcessBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { logger.Error("failed to process bids", "error", err) return lastBN } } - if err := launchValidatorTasks(ctx, c, db, httpc, ei, beaconBase, logger); err != nil { - logger.Error("[VALIDATOR] failed to launch async tasks", "slot", ei.Slot, "error", err) + if err := backfill.LaunchValidatorTasks(ctx, cfg, db, httpc, ei, beaconBase, logger); err != nil { + logger.Error("failed to launch async validator tasks", "slot", ei.Slot, "error", err) return lastBN } @@ -151,112 +149,15 @@ func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, http return nextBN } -func processBidsForBlock(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, ei *beacon.ExecInfo, logger *slog.Logger) error { - logger.Info("[BIDS] processing bids for block", "block", ei.BlockNumber, "slot", ei.Slot) - var wg sync.WaitGroup - var totalBids int64 - var successfulRelays int64 - for _, r := range relays { - r := r - wg.Add(1) - go func(rel relay.Row) { - defer wg.Done() - if err := ctx.Err(); err != nil { - return - } - bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) - bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) - bcancel() - if berr != nil { - logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) - return - } - if len(bids) == 0 { - return - } - rows := make([]database.BidRow, 0, len(bids)) - for _, bid := range bids { - if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { - rows = append(rows, row) - } - } - if len(rows) > 0 { - insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) - if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { - logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) - } - insCancel() - } - atomic.AddInt64(&totalBids, int64(len(rows))) - atomic.AddInt64(&successfulRelays, 1) - logger.Info("[BIDS] ok", - "slot", ei.Slot, "relay_id", rel.ID, - "bids_in", len(bids), "rows_out", len(rows), - ) - }(r) - } - wg.Wait() - - logger.Info("[BIDS] summary", "block", ei.BlockNumber, "total_bids", totalBids, "successful_relays", successfulRelays) - return nil -} - func saveBlockProgress(db *database.DB, blockNum int64, logger *slog.Logger) { gctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := db.SaveLastBlockNumber(gctx, blockNum); err != nil { - logger.Error("[PROGRESS] failed to save block number", "block", blockNum, "error", err) - } else { - logger.Info("[PROGRESS] advanced to block", "block", blockNum) - } - -} - -func launchValidatorTasks(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, ei *beacon.ExecInfo, beaconBase string, logger *slog.Logger) error { // Async validator pubkey fetch - if ei.ProposerIdx == nil { - return nil - } - - vctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - vpub, err := beacon.FetchValidatorPubkey(vctx, httpc, beaconBase, *ei.ProposerIdx) - if err != nil { - return fmt.Errorf("fetch validator pubkey: %w", err) - } - - if len(vpub) > 0 { - if err := db.UpdateValidatorPubkey(vctx, ei.Slot, vpub); err != nil { - logger.Error("[VALIDATOR] failed to save pubkey", "slot", ei.Slot, "error", err) - } else { - logger.Info("[VALIDATOR] pubkey saved", "proposer", *ei.ProposerIdx, "slot", ei.Slot) - } - } - - // Wait for validator pubkey to be available - getCtx, getCancel := context.WithTimeout(context.Background(), 5*time.Second) - vpk, err := db.GetValidatorPubkeyWithRetry(getCtx, ei.Slot, 3, time.Second) - getCancel() - - if err != nil { - logger.Error("[VALIDATOR] pubkey not available", "slot", ei.Slot, "error", err) - return fmt.Errorf("save validator pubkey: %w", err) - } - - opted, err := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, createOptionsFromCLI(c), ei.BlockNumber, vpk) - if err != nil { - return fmt.Errorf("check opt-in status: %w", err) - } - - updCtx, updCancel := context.WithTimeout(context.Background(), 3*time.Second) - err = db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted) - updCancel() - if err != nil { - return fmt.Errorf("save opt-in status: %w", err) + logger.Error("progress failed to save block number", "block", blockNum, "error", err) } else { - logger.Info("[OPT-IN] validator opt-in status", "slot", ei.Slot, "opted_in", opted) + logger.Info("progress advanced to block", "block", blockNum) } - return nil } @@ -283,9 +184,9 @@ func startIndexer(c *cli.Context) error { for _, r := range cfgRelays { relays = append(relays, relay.Row{ID: r.Relay_id, URL: r.URL}) } - initLogger.Info("[RELAY] enabled", "count", len(relays)) + initLogger.Info("relay enabled", "count", len(relays)) } else { - initLogger.Info("[RELAY] disabled") + initLogger.Info("relay disabled") relays = make([]relay.Row, 0, len(config.RelaysDefault)) for _, r := range config.RelaysDefault { relays = append(relays, relay.Row{ID: r.Relay_id, URL: r.URL}) @@ -297,13 +198,13 @@ func startIndexer(c *cli.Context) error { } defer func() { if cerr := db.Close(); cerr != nil { - initLogger.Error("[DB] close failed", "error", cerr) + initLogger.Error("database close failed", "error", cerr) } }() // Initialize HTTP client httpc := httputil.NewHTTPClient(c.Duration("http-timeout")) - initLogger.Info("[HTTP] client initialized", "timeout", c.Duration("http-timeout")) + initLogger.Info("http client initialized", "timeout", c.Duration("http-timeout")) // Get starting block number lastBN, err := getStartingBlockNumber(ctx, db, httpc, infuraRPC, initLogger) diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go index 5a4c9a63d..594f796df 100644 --- a/tools/indexer/pkg/backfill/backfill.go +++ b/tools/indexer/pkg/backfill/backfill.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "sync" + "sync/atomic" "time" "github.com/hashicorp/go-retryablehttp" @@ -22,6 +23,103 @@ type SlotData struct { ProposerIdx *int64 } +func LaunchValidatorTasks(ctx context.Context, cfg *config.Config, db *database.DB, httpc *retryablehttp.Client, ei *beacon.ExecInfo, beaconBase string, logger *slog.Logger) error { // Async validator pubkey fetch + if ei.ProposerIdx == nil { + return nil + } + + vctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + vpub, err := beacon.FetchValidatorPubkey(vctx, httpc, beaconBase, *ei.ProposerIdx) + if err != nil { + return fmt.Errorf("fetch validator pubkey: %w", err) + } + + if len(vpub) > 0 { + if err := db.UpdateValidatorPubkey(vctx, ei.Slot, vpub); err != nil { + logger.Error("validator failed to save pubkey", "slot", ei.Slot, "error", err) + } else { + logger.Info("validator pubkey saved", "proposer", *ei.ProposerIdx, "slot", ei.Slot) + } + } + + // Wait for validator pubkey to be available + getCtx, getCancel := context.WithTimeout(context.Background(), 5*time.Second) + vpk, err := db.GetValidatorPubkeyWithRetry(getCtx, ei.Slot, 3, time.Second) + getCancel() + + if err != nil { + logger.Error("validator pubkey not available", "slot", ei.Slot, "error", err) + return fmt.Errorf("save validator pubkey: %w", err) + } + + opted, err := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpk) + if err != nil { + return fmt.Errorf("check opt-in status: %w", err) + } + + updCtx, updCancel := context.WithTimeout(context.Background(), 3*time.Second) + err = db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted) + updCancel() + if err != nil { + return fmt.Errorf("save opt-in status: %w", err) + } else { + logger.Info("validator opt-in status", "slot", ei.Slot, "opted_in", opted) + } + return nil + +} + +func ProcessBidsForBlock(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, ei *beacon.ExecInfo, logger *slog.Logger) error { + logger.Info("processing bids for block", "block", ei.BlockNumber, "slot", ei.Slot) + var wg sync.WaitGroup + var totalBids int64 + var successfulRelays int64 + for _, r := range relays { + r := r + wg.Add(1) + go func(rel relay.Row) { + defer wg.Done() + if err := ctx.Err(); err != nil { + return + } + bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) + bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) + bcancel() + if berr != nil { + logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) + return + } + if len(bids) == 0 { + return + } + rows := make([]database.BidRow, 0, len(bids)) + for _, bid := range bids { + if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { + rows = append(rows, row) + } + } + if len(rows) > 0 { + insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) + if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { + logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) + } + insCancel() + } + atomic.AddInt64(&totalBids, int64(len(rows))) + atomic.AddInt64(&successfulRelays, 1) + logger.Info("bid insert ok", + "slot", ei.Slot, "relay_id", rel.ID, + "bids_in", len(bids), "rows_out", len(rows), + ) + }(r) + } + wg.Wait() + + logger.Info("summary", "block", ei.BlockNumber, "total_bids", totalBids, "successful_relays", successfulRelays) + return nil +} + func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, cfg *config.Config, relays []relay.Row) error { logger := slog.With("component", "backfill") logger.Info("Starting streaming backfill") @@ -90,78 +188,15 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c } // Validator pubkey + opt-in status - var vpub []byte - if ei.ProposerIdx != nil { - vctx, vcancel := context.WithTimeout(ctx, 5*time.Second) - v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx) - vcancel() - if verr != nil { - logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr) - } else if len(v) > 0 { - vpub = v - // Save validator pubkey - if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil { - logger.Error("validator update failed", "slot", ei.Slot, "error", err) - } else { - opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub) - if oerr != nil { - logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr) - } else { - updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second) - if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil { - logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr) - } - updCancel() - } - } - } - } + if err := LaunchValidatorTasks(ctx, cfg, db, httpc, ei, cfg.BeaconBase, logger); err != nil { + logger.Error("failed to launch async validator tasks", "slot", ei.Slot, "error", err) + } // Relay mode: fetch & insert bids - if cfg.RelayMode { - tBids := time.Now() - var wg sync.WaitGroup - - for _, r := range relays { - r := r // capture loop var - wg.Add(1) - go func(rel relay.Row) { - defer wg.Done() - if err := ctx.Err(); err != nil { - return - } - - bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) - bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) - bcancel() - if berr != nil { - logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", rel.ID, "error", berr) - return - } - if len(bids) == 0 { - return - } - - rows := make([]database.BidRow, 0, len(bids)) - for _, bid := range bids { - if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { - rows = append(rows, row) - } - } - - if len(rows) > 0 { - insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) - if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { - logger.Error("bid insert failed", "slot", ei.Slot, "relay", rel.ID, "error", ierr) - } - insCancel() - } - }(r) + if cfg.RelayData { + if err := ProcessBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { + logger.Error("failed to process bids", "error", err) } - - wg.Wait() - bidsMs := time.Since(tBids).Milliseconds() - logger.Info("Bids fetch and insert", "bids_ms", bidsMs) } processed++ diff --git a/tools/indexer/pkg/config/config.go b/tools/indexer/pkg/config/config.go index 15b3e4633..20be03624 100644 --- a/tools/indexer/pkg/config/config.go +++ b/tools/indexer/pkg/config/config.go @@ -50,6 +50,6 @@ type Config struct { EtherscanKey string InfuraRPC string BeaconBase string - RelayMode bool + RelayData bool RelaysJSON string } From 182b3895b6990ede4f59d6863b38ca5904fa4099 Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Fri, 17 Oct 2025 18:29:55 +0530 Subject: [PATCH 5/8] fixes --- tools/indexer/cmd/start.go | 6 +- tools/indexer/pkg/backfill/backfill.go | 103 +--------------------- tools/indexer/pkg/ingest/ingest.go | 115 +++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 103 deletions(-) create mode 100644 tools/indexer/pkg/ingest/ingest.go diff --git a/tools/indexer/cmd/start.go b/tools/indexer/cmd/start.go index 8d6ede8d0..42751beff 100644 --- a/tools/indexer/cmd/start.go +++ b/tools/indexer/cmd/start.go @@ -15,7 +15,7 @@ import ( "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" "github.com/primev/mev-commit/tools/indexer/pkg/relay" - + "github.com/primev/mev-commit/tools/indexer/pkg/ingest" "github.com/urfave/cli/v2" ) @@ -135,12 +135,12 @@ func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, http cfg := createOptionsFromCLI(c) if cfg.RelayData { - if err := backfill.ProcessBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { + if err := ingest.ProcessBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { logger.Error("failed to process bids", "error", err) return lastBN } } - if err := backfill.LaunchValidatorTasks(ctx, cfg, db, httpc, ei, beaconBase, logger); err != nil { + if err := ingest.LaunchValidatorTasks(ctx, cfg, db, httpc, ei, beaconBase, logger); err != nil { logger.Error("failed to launch async validator tasks", "slot", ei.Slot, "error", err) return lastBN } diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go index 594f796df..756c9399f 100644 --- a/tools/indexer/pkg/backfill/backfill.go +++ b/tools/indexer/pkg/backfill/backfill.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "log/slog" - "sync" - "sync/atomic" "time" "github.com/hashicorp/go-retryablehttp" @@ -14,6 +12,7 @@ import ( "github.com/primev/mev-commit/tools/indexer/pkg/database" "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" "github.com/primev/mev-commit/tools/indexer/pkg/relay" + "github.com/primev/mev-commit/tools/indexer/pkg/ingest" ) type SlotData struct { @@ -23,102 +22,6 @@ type SlotData struct { ProposerIdx *int64 } -func LaunchValidatorTasks(ctx context.Context, cfg *config.Config, db *database.DB, httpc *retryablehttp.Client, ei *beacon.ExecInfo, beaconBase string, logger *slog.Logger) error { // Async validator pubkey fetch - if ei.ProposerIdx == nil { - return nil - } - - vctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - vpub, err := beacon.FetchValidatorPubkey(vctx, httpc, beaconBase, *ei.ProposerIdx) - if err != nil { - return fmt.Errorf("fetch validator pubkey: %w", err) - } - - if len(vpub) > 0 { - if err := db.UpdateValidatorPubkey(vctx, ei.Slot, vpub); err != nil { - logger.Error("validator failed to save pubkey", "slot", ei.Slot, "error", err) - } else { - logger.Info("validator pubkey saved", "proposer", *ei.ProposerIdx, "slot", ei.Slot) - } - } - - // Wait for validator pubkey to be available - getCtx, getCancel := context.WithTimeout(context.Background(), 5*time.Second) - vpk, err := db.GetValidatorPubkeyWithRetry(getCtx, ei.Slot, 3, time.Second) - getCancel() - - if err != nil { - logger.Error("validator pubkey not available", "slot", ei.Slot, "error", err) - return fmt.Errorf("save validator pubkey: %w", err) - } - - opted, err := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpk) - if err != nil { - return fmt.Errorf("check opt-in status: %w", err) - } - - updCtx, updCancel := context.WithTimeout(context.Background(), 3*time.Second) - err = db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted) - updCancel() - if err != nil { - return fmt.Errorf("save opt-in status: %w", err) - } else { - logger.Info("validator opt-in status", "slot", ei.Slot, "opted_in", opted) - } - return nil - -} - -func ProcessBidsForBlock(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, ei *beacon.ExecInfo, logger *slog.Logger) error { - logger.Info("processing bids for block", "block", ei.BlockNumber, "slot", ei.Slot) - var wg sync.WaitGroup - var totalBids int64 - var successfulRelays int64 - for _, r := range relays { - r := r - wg.Add(1) - go func(rel relay.Row) { - defer wg.Done() - if err := ctx.Err(); err != nil { - return - } - bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) - bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) - bcancel() - if berr != nil { - logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) - return - } - if len(bids) == 0 { - return - } - rows := make([]database.BidRow, 0, len(bids)) - for _, bid := range bids { - if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { - rows = append(rows, row) - } - } - if len(rows) > 0 { - insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) - if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { - logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) - } - insCancel() - } - atomic.AddInt64(&totalBids, int64(len(rows))) - atomic.AddInt64(&successfulRelays, 1) - logger.Info("bid insert ok", - "slot", ei.Slot, "relay_id", rel.ID, - "bids_in", len(bids), "rows_out", len(rows), - ) - }(r) - } - wg.Wait() - - logger.Info("summary", "block", ei.BlockNumber, "total_bids", totalBids, "successful_relays", successfulRelays) - return nil -} func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, cfg *config.Config, relays []relay.Row) error { logger := slog.With("component", "backfill") @@ -188,13 +91,13 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c } // Validator pubkey + opt-in status - if err := LaunchValidatorTasks(ctx, cfg, db, httpc, ei, cfg.BeaconBase, logger); err != nil { + if err := ingest.LaunchValidatorTasks(ctx, cfg, db, httpc, ei, cfg.BeaconBase, logger); err != nil { logger.Error("failed to launch async validator tasks", "slot", ei.Slot, "error", err) } // Relay mode: fetch & insert bids if cfg.RelayData { - if err := ProcessBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { + if err := ingest.ProcessBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { logger.Error("failed to process bids", "error", err) } } diff --git a/tools/indexer/pkg/ingest/ingest.go b/tools/indexer/pkg/ingest/ingest.go new file mode 100644 index 000000000..f8db365d2 --- /dev/null +++ b/tools/indexer/pkg/ingest/ingest.go @@ -0,0 +1,115 @@ + +package ingest + +import ( + "context" + "fmt" + "log/slog" + "sync" + "sync/atomic" + "time" + + "github.com/hashicorp/go-retryablehttp" + "github.com/primev/mev-commit/tools/indexer/pkg/beacon" + "github.com/primev/mev-commit/tools/indexer/pkg/config" + "github.com/primev/mev-commit/tools/indexer/pkg/database" + "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" + "github.com/primev/mev-commit/tools/indexer/pkg/relay" +) + +func LaunchValidatorTasks(ctx context.Context, cfg *config.Config, db *database.DB, httpc *retryablehttp.Client, ei *beacon.ExecInfo, beaconBase string, logger *slog.Logger) error { // Async validator pubkey fetch + if ei.ProposerIdx == nil { + return nil + } + + vctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + vpub, err := beacon.FetchValidatorPubkey(vctx, httpc, beaconBase, *ei.ProposerIdx) + if err != nil { + return fmt.Errorf("fetch validator pubkey: %w", err) + } + + if len(vpub) > 0 { + if err := db.UpdateValidatorPubkey(vctx, ei.Slot, vpub); err != nil { + logger.Error("validator failed to save pubkey", "slot", ei.Slot, "error", err) + } else { + logger.Info("validator pubkey saved", "proposer", *ei.ProposerIdx, "slot", ei.Slot) + } + } + + // Wait for validator pubkey to be available + getCtx, getCancel := context.WithTimeout(context.Background(), 5*time.Second) + vpk, err := db.GetValidatorPubkeyWithRetry(getCtx, ei.Slot, 3, time.Second) + getCancel() + + if err != nil { + logger.Error("validator pubkey not available", "slot", ei.Slot, "error", err) + return fmt.Errorf("save validator pubkey: %w", err) + } + + opted, err := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpk) + if err != nil { + return fmt.Errorf("check opt-in status: %w", err) + } + + updCtx, updCancel := context.WithTimeout(context.Background(), 3*time.Second) + err = db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted) + updCancel() + if err != nil { + return fmt.Errorf("save opt-in status: %w", err) + } else { + logger.Info("validator opt-in status", "slot", ei.Slot, "opted_in", opted) + } + return nil + +} + +func ProcessBidsForBlock(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, ei *beacon.ExecInfo, logger *slog.Logger) error { + logger.Info("processing bids for block", "block", ei.BlockNumber, "slot", ei.Slot) + var wg sync.WaitGroup + var totalBids int64 + var successfulRelays int64 + for _, r := range relays { + r := r + wg.Add(1) + go func(rel relay.Row) { + defer wg.Done() + if err := ctx.Err(); err != nil { + return + } + bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) + bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot) + bcancel() + if berr != nil { + logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) + return + } + if len(bids) == 0 { + return + } + rows := make([]database.BidRow, 0, len(bids)) + for _, bid := range bids { + if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok { + rows = append(rows, row) + } + } + if len(rows) > 0 { + insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) + if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { + logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) + } + insCancel() + } + atomic.AddInt64(&totalBids, int64(len(rows))) + atomic.AddInt64(&successfulRelays, 1) + logger.Info("bid insert ok", + "slot", ei.Slot, "relay_id", rel.ID, + "bids_in", len(bids), "rows_out", len(rows), + ) + }(r) + } + wg.Wait() + + logger.Info("summary", "block", ei.BlockNumber, "total_bids", totalBids, "successful_relays", successfulRelays) + return nil +} \ No newline at end of file From fbc415e8930ef1ca7c9bbf3059716ec2d131340a Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Fri, 17 Oct 2025 18:33:56 +0530 Subject: [PATCH 6/8] fixes --- tools/indexer/cmd/main.go | 1 - tools/indexer/cmd/start.go | 2 +- tools/indexer/pkg/backfill/backfill.go | 3 +-- tools/indexer/pkg/ingest/ingest.go | 3 +-- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/tools/indexer/cmd/main.go b/tools/indexer/cmd/main.go index 520ea6eac..60838e238 100644 --- a/tools/indexer/cmd/main.go +++ b/tools/indexer/cmd/main.go @@ -123,7 +123,6 @@ func main() { optionBeaconBase, optionBlockInterval, optionValidatorDelay, - optionBackfillLookback, optionBackfillBatch, optionHTTPTimeout, diff --git a/tools/indexer/cmd/start.go b/tools/indexer/cmd/start.go index 42751beff..54a31a80d 100644 --- a/tools/indexer/cmd/start.go +++ b/tools/indexer/cmd/start.go @@ -14,8 +14,8 @@ import ( "github.com/primev/mev-commit/tools/indexer/pkg/database" "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" - "github.com/primev/mev-commit/tools/indexer/pkg/relay" "github.com/primev/mev-commit/tools/indexer/pkg/ingest" + "github.com/primev/mev-commit/tools/indexer/pkg/relay" "github.com/urfave/cli/v2" ) diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go index 756c9399f..e58f9c7ab 100644 --- a/tools/indexer/pkg/backfill/backfill.go +++ b/tools/indexer/pkg/backfill/backfill.go @@ -11,8 +11,8 @@ import ( "github.com/primev/mev-commit/tools/indexer/pkg/config" "github.com/primev/mev-commit/tools/indexer/pkg/database" "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" - "github.com/primev/mev-commit/tools/indexer/pkg/relay" "github.com/primev/mev-commit/tools/indexer/pkg/ingest" + "github.com/primev/mev-commit/tools/indexer/pkg/relay" ) type SlotData struct { @@ -22,7 +22,6 @@ type SlotData struct { ProposerIdx *int64 } - func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, cfg *config.Config, relays []relay.Row) error { logger := slog.With("component", "backfill") logger.Info("Starting streaming backfill") diff --git a/tools/indexer/pkg/ingest/ingest.go b/tools/indexer/pkg/ingest/ingest.go index f8db365d2..669afa743 100644 --- a/tools/indexer/pkg/ingest/ingest.go +++ b/tools/indexer/pkg/ingest/ingest.go @@ -1,4 +1,3 @@ - package ingest import ( @@ -112,4 +111,4 @@ func ProcessBidsForBlock(ctx context.Context, db *database.DB, httpc *retryableh logger.Info("summary", "block", ei.BlockNumber, "total_bids", totalBids, "successful_relays", successfulRelays) return nil -} \ No newline at end of file +} From e69770344690ecd5ca4e0f9226c0bd0f83cf43f7 Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Tue, 21 Oct 2025 18:10:10 +0530 Subject: [PATCH 7/8] changed the backfill logic --- tools/indexer/cmd/main.go | 46 +-- tools/indexer/cmd/start.go | 13 +- tools/indexer/pkg/backfill/backfill.go | 474 ++++++++++++++++++++++--- tools/indexer/pkg/backfill/clients.go | 43 +++ tools/indexer/pkg/backfill/helpers.go | 164 +++++++++ tools/indexer/pkg/beacon/client.go | 2 + tools/indexer/pkg/config/config.go | 21 +- tools/indexer/pkg/database/starrock.go | 248 +++++++++---- tools/indexer/pkg/ethereum/client.go | 8 +- tools/indexer/pkg/ingest/ingest.go | 1 - tools/indexer/pkg/relay/client.go | 25 +- 11 files changed, 850 insertions(+), 195 deletions(-) create mode 100644 tools/indexer/pkg/backfill/clients.go create mode 100644 tools/indexer/pkg/backfill/helpers.go diff --git a/tools/indexer/cmd/main.go b/tools/indexer/cmd/main.go index 60838e238..1a24c38aa 100644 --- a/tools/indexer/cmd/main.go +++ b/tools/indexer/cmd/main.go @@ -2,18 +2,16 @@ package main import ( "context" - "fmt" + "os" + "os/signal" + "syscall" + "time" _ "github.com/go-sql-driver/mysql" "github.com/primev/mev-commit/tools/indexer/pkg/config" "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" - - "os" - "os/signal" - "syscall" - "time" ) var ( @@ -34,15 +32,10 @@ var ( EnvVars: []string{"INDEXER_OPT_IN_CONTRACT"}, Value: "0x821798d7b9d57dF7Ed7616ef9111A616aB19ed64", }) - optionEtherscanKey = altsrc.NewStringFlag(&cli.StringFlag{ - Name: "etherscan-key", - Usage: "Etherscan API key", - EnvVars: []string{"INDEXER_ETHERSCAN_KEY"}, - }) - optionInfuraRPC = altsrc.NewStringFlag(&cli.StringFlag{ - Name: "infura-rpc", - Usage: "Infura RPC URL", - EnvVars: []string{"INDEXER_INFURA_RPC"}, + optionAlchemyRPC = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "alchemy-rpc", + Usage: "Alchemy RPC URL", + EnvVars: []string{"INDEXER_ALCHEMY_RPC"}, Required: true, }) optionBeaconBase = altsrc.NewStringFlag(&cli.StringFlag{ @@ -97,6 +90,18 @@ var ( Usage: "JSON array overriding default relays (fields: relay_id,name,tag,url)", EnvVars: []string{"INDEXER_RELAYS_JSON"}, }) + optionRatedAPIKey = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "rated-api-key", + Usage: "Rated Network API key", + EnvVars: []string{"INDEXER_RATED_API_KEY"}, + // don't mark Required here to keep runtime flexible; we'll validate only when needed + }) + + optionQuickNodeBase = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "quicknode-base", + Usage: "QuickNode Beacon API base, e.g. https://.quiknode.pro/", + EnvVars: []string{"INDEXER_QUICKNODE_BASE"}, + }) ) func createOptionsFromCLI(c *cli.Context) *config.Config { @@ -107,11 +112,12 @@ func createOptionsFromCLI(c *cli.Context) *config.Config { BackfillBatch: c.Int("backfill-batch"), HTTPTimeout: c.Duration("http-timeout"), OptInContract: c.String("opt-in-contract"), - EtherscanKey: c.String("etherscan-key"), - InfuraRPC: c.String("infura-rpc"), + AlchemyRPC: c.String("alchemy-rpc"), BeaconBase: c.String("beacon-base"), RelayData: c.Bool("relay"), RelaysJSON: c.String("relays-json"), + RatedAPIKey: c.String("rated-api-key"), + QuickNodeBase: c.String("quicknode-base"), } } @@ -119,7 +125,7 @@ func main() { flags := []cli.Flag{ optionConfig, optionDatabaseURL, - optionInfuraRPC, + optionAlchemyRPC, optionBeaconBase, optionBlockInterval, optionValidatorDelay, @@ -127,9 +133,10 @@ func main() { optionBackfillBatch, optionHTTPTimeout, optionOptInContract, - optionEtherscanKey, optionRelayFlag, optionRelaysJSON, + optionRatedAPIKey, + optionQuickNodeBase, } app := &cli.App{ @@ -163,5 +170,4 @@ func main() { if err := app.RunContext(ctx, os.Args); err != nil { _, _ = fmt.Fprintf(app.Writer, "exited with error: %v\n", err) } - } diff --git a/tools/indexer/cmd/start.go b/tools/indexer/cmd/start.go index 54a31a80d..7809e80ca 100644 --- a/tools/indexer/cmd/start.go +++ b/tools/indexer/cmd/start.go @@ -16,6 +16,7 @@ import ( httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" "github.com/primev/mev-commit/tools/indexer/pkg/ingest" "github.com/primev/mev-commit/tools/indexer/pkg/relay" + "github.com/urfave/cli/v2" ) @@ -70,7 +71,7 @@ func runBackfillIfConfigured(ctx context.Context, c *cli.Context, db *database.D logger.Info("running one-time backfill", "lookback", c.Int("backfill-lookback"), "batch", c.Int("backfill-batch")) - if err := backfill.RunAll(ctx, db, httpc, createOptionsFromCLI(c), relays); err != nil { + if err := backfill.RunAllWithRatedAPI(ctx, db, httpc, createOptionsFromCLI(c)); err != nil { logger.Error("failed to backfill", "error", err) } else { logger.Info("completed startup backfill") @@ -101,6 +102,7 @@ func runMainLoop(ctx context.Context, c *cli.Context, db *database.DB, httpc *re } } } + func safe(p interface{}) interface{} { v := reflect.ValueOf(p) if !v.IsValid() || v.IsNil() { @@ -108,6 +110,7 @@ func safe(p interface{}) interface{} { } return v.Elem().Interface() } + func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, infuraRPC, beaconBase string, lastBN int64, logger *slog.Logger) int64 { nextBN := lastBN + 1 @@ -158,15 +161,13 @@ func saveBlockProgress(db *database.DB, blockNum int64, logger *slog.Logger) { } else { logger.Info("progress advanced to block", "block", blockNum) } - } func startIndexer(c *cli.Context) error { - initLogger := slog.With("component", "init") dbURL := c.String(optionDatabaseURL.Name) - infuraRPC := c.String(optionInfuraRPC.Name) + alchemyRPC := c.String(optionAlchemyRPC.Name) beaconBase := c.String(optionBeaconBase.Name) initLogger.Info("starting blockchain indexer with StarRocks database") @@ -207,7 +208,7 @@ func startIndexer(c *cli.Context) error { initLogger.Info("http client initialized", "timeout", c.Duration("http-timeout")) // Get starting block number - lastBN, err := getStartingBlockNumber(ctx, db, httpc, infuraRPC, initLogger) + lastBN, err := getStartingBlockNumber(ctx, db, httpc, alchemyRPC, initLogger) if err != nil { return err } @@ -217,5 +218,5 @@ func startIndexer(c *cli.Context) error { // Run backfill if configured go runBackfillIfConfigured(ctx, c, db, httpc, relays, initLogger) - return runMainLoop(ctx, c, db, httpc, relays, infuraRPC, beaconBase, lastBN, initLogger) + return runMainLoop(ctx, c, db, httpc, relays, alchemyRPC, beaconBase, lastBN, initLogger) } diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go index e58f9c7ab..48d391c98 100644 --- a/tools/indexer/pkg/backfill/backfill.go +++ b/tools/indexer/pkg/backfill/backfill.go @@ -2,17 +2,20 @@ package backfill import ( "context" + "encoding/hex" + "encoding/json" "fmt" + "io" "log/slog" + "strconv" + "strings" + "sync" "time" "github.com/hashicorp/go-retryablehttp" "github.com/primev/mev-commit/tools/indexer/pkg/beacon" "github.com/primev/mev-commit/tools/indexer/pkg/config" "github.com/primev/mev-commit/tools/indexer/pkg/database" - "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" - "github.com/primev/mev-commit/tools/indexer/pkg/ingest" - "github.com/primev/mev-commit/tools/indexer/pkg/relay" ) type SlotData struct { @@ -22,96 +25,457 @@ type SlotData struct { ProposerIdx *int64 } -func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, cfg *config.Config, relays []relay.Row) error { - logger := slog.With("component", "backfill") - logger.Info("Starting streaming backfill") +// RatedSlotResponse represents the response from Rated Network API blocks endpoint +type RatedSlotResponse struct { + Epoch int64 `json:"epoch"` + ConsensusSlot int64 `json:"consensusSlot"` + ConsensusBlockRoot string `json:"consensusBlockRoot"` + ExecutionBlockNumber int64 `json:"executionBlockNumber"` + ExecutionBlockHash string `json:"executionBlockHash"` + ValidatorIndex int64 `json:"validatorIndex"` + FeeRecipient string `json:"feeRecipient"` + TotalTransactions int `json:"totalTransactions"` + TotalGasUsed int64 `json:"totalGasUsed"` + BaseFeePerGas int64 `json:"baseFeePerGas"` + BaselineMev int64 `json:"baselineMev"` + BaselineMevWei string `json:"baselineMevWei"` + ExecutionRewards int64 `json:"executionRewards"` + ExecutionRewardsWei string `json:"executionRewardsWei"` + ConsensusRewards int64 `json:"consensusRewards"` + TotalRewards int64 `json:"totalRewards"` + BlockTimestamp string `json:"blockTimestamp"` + Relays []string `json:"relays"` + BlockBuilderPubkeys []string `json:"blockBuilderPubkeys"` + ExecutionProposerDuty string `json:"executionProposerDuty"` + TotalPriorityFeesValidator int64 `json:"totalPriorityFeesValidator"` +} + +// RatedValidatorResponse represents validator details from Rated API +type RatedValidatorResponse struct { + ValidatorIndex int64 `json:"validatorIndex"` + ValidatorPubkey string `json:"validatorPubkey"` + Pool string `json:"pool"` + Network string `json:"network"` +} + +// FetchSlotsBatch fetches a batch of slots from Rated API +func (r *RatedAPIClient) FetchSlotsBatch(ctx context.Context, startSlot, endSlot int64) ([]RatedSlotResponse, error) { + if endSlot <= startSlot { + return nil, nil + } + to := endSlot - 1 + if to < startSlot { + to = startSlot + } + pageLimit := endSlot - startSlot + if pageLimit > 1000 { + pageLimit = 1000 + } + // to := endSlot - 1 + nextURL := fmt.Sprintf("%s/blocks?from=%d&to=%d&limit=%d&offset=0", r.baseURL, startSlot, to, pageLimit) + + req, err := retryablehttp.NewRequestWithContext(ctx, "GET", nextURL, nil) + if err != nil { + return nil, fmt.Errorf("creating request: %w", err) + } + + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", r.apiKey)) + req.Header.Set("Accept", "application/json") + + resp, err := r.httpc.Do(req) + if err != nil { + return nil, fmt.Errorf("executing request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("API returned status %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + + var env map[string]json.RawMessage + if err := json.Unmarshal(body, &env); err != nil { + snip := string(body) + if len(snip) > 512 { + snip = snip[:512] + "..." + } + return nil, fmt.Errorf("decoding envelope: %w; body starts: %s", err, snip) + } + resRaw, ok := env["results"] + if !ok || len(resRaw) == 0 { + return []RatedSlotResponse{}, nil + } + + var flat []json.RawMessage + if err := json.Unmarshal(resRaw, &flat); err != nil { + + var nested [][]json.RawMessage + if err2 := json.Unmarshal(resRaw, &nested); err2 != nil { + snip := string(resRaw) + if len(snip) > 512 { + snip = snip[:512] + "..." + } + return nil, fmt.Errorf("decoding results: %v; nested err: %v; results starts: %s", err, err2, snip) + } + + for _, grp := range nested { + flat = append(flat, grp...) + } + } + + out := make([]RatedSlotResponse, 0, len(flat)) + for _, raw := range flat { + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + continue + } + var s RatedSlotResponse + s.Epoch = asInt64(m["epoch"]) + s.ConsensusSlot = asInt64(m["consensusSlot"]) + s.ConsensusBlockRoot = asString(m["consensusBlockRoot"]) + s.ExecutionBlockNumber = asInt64(m["executionBlockNumber"]) + s.ExecutionBlockHash = asString(m["executionBlockHash"]) + s.ValidatorIndex = asInt64(m["validatorIndex"]) + s.FeeRecipient = asString(m["feeRecipient"]) + s.TotalTransactions = int(asInt64(m["totalTransactions"])) + s.TotalGasUsed = asInt64(m["totalGasUsed"]) + s.BaseFeePerGas = asInt64(m["baseFeePerGas"]) + s.BaselineMev = asInt64(m["baselineMev"]) + s.BaselineMevWei = asDecimalString(m["baselineMevWei"]) + s.ExecutionRewards = asInt64(m["executionRewards"]) + s.ExecutionRewardsWei = asDecimalString(m["executionRewardsWei"]) + s.ConsensusRewards = asInt64(m["consensusRewards"]) + s.TotalRewards = asInt64(m["totalRewards"]) + s.BlockTimestamp = asString(m["blockTimestamp"]) + s.Relays = asStringSlice(m["relays"]) + s.BlockBuilderPubkeys = asStringSlice(m["blockBuilderPubkeys"]) + s.ExecutionProposerDuty = asString(m["executionProposerDuty"]) + s.TotalPriorityFeesValidator = asInt64(m["totalPriorityFeesValidator"]) + out = append(out, s) + } + + return out, nil +} + +func (q *QuickNodeClient) FetchValidatorPubkeysBatch( + ctx context.Context, + indices []int64, + missingOut *[]int64, +) (map[int64]string, error) { + out := make(map[int64]string, len(indices)) + if len(indices) == 0 { + return out, nil + } + + // de-dupe and freeze order + seen := make(map[int64]struct{}, len(indices)) + uniq := make([]int64, 0, len(indices)) + for _, idx := range indices { + if _, ok := seen[idx]; !ok { + seen[idx] = struct{}{} + uniq = append(uniq, idx) + } + } + + type chunkRes struct { + data map[int64]string + err error + seen []int64 + } + + chunks := make([][]int64, 0, (len(uniq)+q.chunkSize-1)/q.chunkSize) + for i := 0; i < len(uniq); i += q.chunkSize { + j := i + q.chunkSize + if j > len(uniq) { + j = len(uniq) + } + chunks = append(chunks, uniq[i:j]) + } + + sem := make(chan struct{}, q.concurrent) + resCh := make(chan chunkRes, len(chunks)) + var wg sync.WaitGroup + + for _, chunk := range chunks { + sem <- struct{}{} + wg.Add(1) + ch := chunk + go func() { + defer wg.Done() + defer func() { <-sem }() + + // Build URL with repeated id params. + ids := make([]string, len(ch)) + for i, v := range ch { + ids[i] = strconv.FormatInt(v, 10) + } + idQuery := "id=" + strings.Join(ids, "&id=") + fullURL := fmt.Sprintf("%s/eth/v1/beacon/states/head/validators?%s", q.base, idQuery) + + // Per-request timeout + reqCtx, cancel := context.WithTimeout(ctx, q.timeout) + defer cancel() + + req, err := retryablehttp.NewRequestWithContext(reqCtx, "GET", fullURL, nil) + if err != nil { + resCh <- chunkRes{err: err, seen: ch} + return + } + req.Header.Set("accept", "application/json") + + resp, err := q.httpc.Do(req) + if err != nil { + resCh <- chunkRes{err: err, seen: ch} + return + } + defer resp.Body.Close() + + body, rerr := io.ReadAll(resp.Body) + if rerr != nil { + resCh <- chunkRes{err: rerr, seen: ch} + return + } + if resp.StatusCode != 200 { + snip := string(body) + if len(snip) > 240 { + snip = snip[:240] + "..." + } + resCh <- chunkRes{err: fmt.Errorf("quicknode %d: %s", resp.StatusCode, snip), seen: ch} + return + } + + var parsed struct { + Data []struct { + Index string `json:"index"` + Validator struct { + Pubkey string `json:"pubkey"` + } `json:"validator"` + } `json:"data"` + } + if err := json.Unmarshal(body, &parsed); err != nil { + resCh <- chunkRes{err: fmt.Errorf("parse quicknode: %w", err), seen: ch} + return + } + + m := make(map[int64]string, len(parsed.Data)) + for _, row := range parsed.Data { + idx, _ := strconv.ParseInt(row.Index, 10, 64) + pk := strings.TrimSpace(row.Validator.Pubkey) + if idx > 0 && pk != "" { + // ensure 0x prefix + if !strings.HasPrefix(pk, "0x") { + pk = "0x" + pk + } + m[idx] = pk + } + } + resCh <- chunkRes{data: m, seen: ch} + }() + } + + wg.Wait() + close(resCh) + + errs := make([]error, 0) + seenAll := make(map[int64]struct{}, len(uniq)) + for _, idx := range uniq { + seenAll[idx] = struct{}{} + } + found := make(map[int64]struct{}) + + for r := range resCh { + if r.err != nil { + errs = append(errs, r.err) + continue + } + for k, v := range r.data { + out[k] = v + found[k] = struct{}{} + } + } + + if missingOut != nil { + miss := make([]int64, 0) + for idx := range seenAll { + if _, ok := found[idx]; !ok { + miss = append(miss, idx) + } + } + *missingOut = miss + } + + if len(errs) > 0 { + return out, fmt.Errorf("quicknode batch had %d error(s), e.g. %v", len(errs), errs[0]) + } + return out, nil +} + +// RunAllWithRatedAPI runs the optimized backfill using Rated Network API +func RunAllWithRatedAPI(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, + cfg *config.Config, +) error { + qn := NewQuickNodeClient(httpc, cfg.QuickNodeBase) + logger := slog.With("component", "backfill-rated-optimized") + logger.Info("Starting optimized streaming backfill with Rated Network API") if err := ctx.Err(); err != nil { return err } - lastSlotNumber, _ := db.GetMaxSlotNumber(ctx) + ratedClient := NewRatedAPIClient(httpc, cfg.RatedAPIKey) + + lastSlotNumber, err := db.GetMinSlotNumber(ctx) + if err != nil { + lastSlotNumber = 0 + } + startSlot := lastSlotNumber - cfg.BackfillLookback if startSlot < 0 { startSlot = 0 } - batch := int64(cfg.BackfillBatch) - totalBatches := (cfg.BackfillLookback + batch - 1) / batch + ratedBatchSize := int64(cfg.BackfillBatch) + totalSlots := lastSlotNumber - startSlot + totalBatches := (totalSlots + ratedBatchSize - 1) / ratedBatchSize - logger.Info("Starting backfill", + logger.Info("Starting Rated API optimized backfill", "start_slot", startSlot, - "end_slot_exclusive", lastSlotNumber, - "lookback_slots", cfg.BackfillLookback, - "batch_size", cfg.BackfillBatch, + "end_slot", lastSlotNumber, + "total_slots", totalSlots, + "batch_size", ratedBatchSize, "total_batches", totalBatches, + "estimated_duration", fmt.Sprintf("~%d minutes", totalBatches*3/2), ) - batchSz := int64(cfg.BackfillBatch) - var processed int64 + var totalProcessed int64 + var totalFailed int64 + startTime := time.Now() + // Process in batches for batchIdx := int64(0); batchIdx < totalBatches; batchIdx++ { - batchStart := startSlot + batchIdx*batch - batchEnd := batchStart + batchSz + batchStart := startSlot + batchIdx*ratedBatchSize + batchEnd := batchStart + ratedBatchSize if batchEnd > lastSlotNumber { batchEnd = lastSlotNumber } - logger.Info("Batch begin", + logger.Info("Processing batch", "batch", batchIdx+1, "of", totalBatches, "range", fmt.Sprintf("[%d,%d)", batchStart, batchEnd), + "progress", fmt.Sprintf("%.1f%%", float64(batchIdx)/float64(totalBatches)*100), ) - for slot := batchStart; slot < batchEnd; slot++ { - tTotal := time.Now() + // Fetch slots batch + tBatch := time.Now() + slots, err := ratedClient.FetchSlotsBatch(ctx, batchStart, batchEnd) + if err != nil { + logger.Error("Failed to fetch batch", "batch", batchIdx, "error", err) - convCtx, cancel := context.WithTimeout(ctx, 3*time.Second) - blockNumber, err := ethereum.SlotToExecutionBlockNumber(convCtx, httpc, cfg.BeaconBase, slot) - cancel() + // Retry once with backoff + time.Sleep(5 * time.Second) + slots, err = ratedClient.FetchSlotsBatch(ctx, batchStart, batchEnd) if err != nil { - logger.Error("Failed to convert slot to block number", "slot", slot, "error", err) + logger.Error("Retry failed, skipping batch", "batch", batchIdx) + totalFailed += batchEnd - batchStart continue } - if blockNumber != 0 { - if err := ctx.Err(); err != nil { - return err - } - fetchCtx, fetchCancel := context.WithTimeout(ctx, 5*time.Second) - ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, blockNumber) - fetchCancel() - if ferr != nil || ei == nil { - logger.Error("beacon fetch failed", "block", blockNumber, "error", ferr) - continue - } + } - if err := db.UpsertBlockFromExec(ctx, ei); err != nil { - logger.Error("block upsert failed", "slot", ei.Slot, "error", err) - continue - } + valid := make([]RatedSlotResponse, 0, len(slots)) + idxSet := make(map[int64]struct{}) + for _, s := range slots { + if s.ExecutionBlockNumber == 0 { + continue + } + valid = append(valid, s) + if s.ValidatorIndex > 0 { + idxSet[s.ValidatorIndex] = struct{}{} + } + } + indices := make([]int64, 0, len(idxSet)) + for i := range idxSet { + indices = append(indices, i) + } - // Validator pubkey + opt-in status - if err := ingest.LaunchValidatorTasks(ctx, cfg, db, httpc, ei, cfg.BeaconBase, logger); err != nil { - logger.Error("failed to launch async validator tasks", "slot", ei.Slot, "error", err) + missing := []int64{} + // batch-fetch pubkeys from QuickNode (chunked) + pubByIdx, err := qn.FetchValidatorPubkeysBatch(ctx, indices, &missing) + if err != nil { + logger.Error("QuickNode fetch validators failed", "error", err, "indices", len(indices)) + pubByIdx = map[int64]string{} + } + execInfos := make([]*beacon.ExecInfo, 0, len(valid)) + for _, s := range valid { + ei := ConvertRatedToExecInfo(s) + if pk, ok := pubByIdx[s.ValidatorIndex]; ok && pk != "" { + if b, err := hex.DecodeString(strings.TrimPrefix(pk, "0x")); err == nil { + ei.ValidatorPubkey = b } - // Relay mode: fetch & insert bids - if cfg.RelayData { - if err := ingest.ProcessBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { - logger.Error("failed to process bids", "error", err) - } + } + execInfos = append(execInfos, ei) + } + + // upsert blocks (StarRocks PK) + if err := db.BatchUpsertBlocksFromExec(ctx, execInfos); err != nil { + logger.Error("Batch block insert failed", "error", err) + for _, ei := range execInfos { + if err := db.UpsertBlockFromExec(ctx, ei); err != nil { + logger.Error("Individual block insert failed", "slot", ei.Slot, "error", err) } + } + } - processed++ - totalMS := time.Since(tTotal).Milliseconds() - logger.Info("total time taken", "total_ms", totalMS) + // Process entire batch + pairs := make([]struct { + Slot int64 + Pubkey string + }, 0, len(execInfos)) + for _, ei := range execInfos { + if len(ei.ValidatorPubkey) == 0 { + continue } + hexpk := hex.EncodeToString(ei.ValidatorPubkey) + if !strings.HasPrefix(hexpk, "0x") { + hexpk = "0x" + hexpk + } + pairs = append(pairs, struct { + Slot int64 + Pubkey string + }{Slot: ei.Slot, Pubkey: hexpk}) + } + if err := db.UpsertBlockPubkeysDirect(ctx, pairs); err != nil { + logger.Error("upsert block pubkeys direct failed", "error", err) + } - logger.Info("Batch end", - "batch", batchIdx+1, - "processed_slots_so_far", processed, - ) + batchDuration := time.Since(tBatch) + + logger.Info("Batch completed", + "batch", batchIdx+1, + "slots_in_batch", len(slots), + "duration", batchDuration, + "slots_per_second", float64(len(slots))/batchDuration.Seconds(), + ) + + // Check for cancellation + if ctx.Err() != nil { + logger.Info("Backfill cancelled") + break } } - logger.Info("Backfill completed", "total_slots_processed", processed) + + totalDuration := time.Since(startTime) + logger.Info("Backfill completed", + "total_processed", totalProcessed, + "total_failed", totalFailed, + "total_duration", totalDuration, + "average_slots_per_second", float64(totalProcessed)/totalDuration.Seconds(), + "success_rate", fmt.Sprintf("%.2f%%", float64(totalProcessed)/float64(totalSlots)*100), + ) + return nil } diff --git a/tools/indexer/pkg/backfill/clients.go b/tools/indexer/pkg/backfill/clients.go new file mode 100644 index 000000000..35be2d5aa --- /dev/null +++ b/tools/indexer/pkg/backfill/clients.go @@ -0,0 +1,43 @@ +package backfill + +import ( + "strings" + "time" + + "github.com/hashicorp/go-retryablehttp" +) + +// RatedAPIClient handles communication with Rated Network API +type RatedAPIClient struct { + httpc *retryablehttp.Client + apiKey string + baseURL string +} + +// NewRatedAPIClient creates a new Rated API client +func NewRatedAPIClient(httpc *retryablehttp.Client, apiKey string) *RatedAPIClient { + return &RatedAPIClient{ + httpc: httpc, + apiKey: apiKey, + baseURL: "https://api.rated.network/v1/eth", + } +} + +type QuickNodeClient struct { + httpc *retryablehttp.Client + base string + chunkSize int // ids per call; default 300 + concurrent int // concurrent calls; default 6 + timeout time.Duration +} + +func NewQuickNodeClient(httpc *retryablehttp.Client, base string) *QuickNodeClient { + base = strings.TrimRight(base, "/") + return &QuickNodeClient{ + httpc: httpc, + base: base, + chunkSize: 300, // safe default; increase if URLs stay < 8–10KB + concurrent: 6, // keep it polite + timeout: 8 * time.Second, + } +} diff --git a/tools/indexer/pkg/backfill/helpers.go b/tools/indexer/pkg/backfill/helpers.go new file mode 100644 index 000000000..5ee3ea182 --- /dev/null +++ b/tools/indexer/pkg/backfill/helpers.go @@ -0,0 +1,164 @@ +package backfill + +import ( + "encoding/json" + "math/big" + "strconv" + "strings" + "time" + + "github.com/primev/mev-commit/tools/indexer/pkg/beacon" +) + +func asInt64(v any) int64 { + switch t := v.(type) { + case float64: + return int64(t) + case string: + if t == "" { + return 0 + } + if strings.HasPrefix(t, "0x") || strings.HasPrefix(t, "0X") { + n, _ := strconv.ParseInt(t[2:], 16, 64) + return n + } + n, _ := strconv.ParseInt(t, 10, 64) + return n + default: + return 0 + } +} + +func asString(v any) string { + if v == nil { + return "" + } + if s, ok := v.(string); ok { + return s + } + b, _ := json.Marshal(v) + return string(b) +} + +func asStringSlice(v any) []string { + if v == nil { + return nil + } + switch arr := v.(type) { + case []any: + out := make([]string, 0, len(arr)) + for _, it := range arr { + out = append(out, asString(it)) + } + return out + case []string: + return arr + default: + return nil + } +} + +func asDecimalString(v any) string { + switch t := v.(type) { + case string: + return t + case float64: + // convert float64 (JSON numbers) to integer string without decimals + return strconv.FormatInt(int64(t), 10) + case nil: + return "" + default: + return asString(v) + } +} + +func ConvertRatedToExecInfo(slot RatedSlotResponse) *beacon.ExecInfo { + ei := &beacon.ExecInfo{ + BlockNumber: slot.ExecutionBlockNumber, + Slot: slot.ConsensusSlot, + } + + // Validator index + if slot.ValidatorIndex > 0 { + idx := slot.ValidatorIndex + ei.ProposerIdx = &idx + } + + // Parse timestamp + if slot.BlockTimestamp != "" { + if t, err := time.Parse("2006-01-02T15:04:05", slot.BlockTimestamp); err == nil { + utc := t.UTC() + ei.Timestamp = &utc + } + } + + // Relay information + if len(slot.Relays) > 0 { + relayTag := slot.Relays[0] + ei.RelayTag = &relayTag + } + + // Builder public key + if len(slot.BlockBuilderPubkeys) > 0 { + builderPubkey := slot.BlockBuilderPubkeys[0] + ei.BuilderPublicKey = &builderPubkey + } + + // Fee recipient + if slot.FeeRecipient != "" { + cleaned := cleanHexString(slot.FeeRecipient) + ei.FeeRecipient = &cleaned + ei.ProposerFeeRecHex = &cleaned + } + + // MEV reward in ETH + if slot.ExecutionRewardsWei != "" && slot.ExecutionRewardsWei != "0" { + if mevEth := weiToEth(slot.ExecutionRewardsWei); mevEth > 0 { + ei.MevRewardEth = &mevEth + ei.ProposerRewardEth = &mevEth + } + } else if slot.BaselineMevWei != "" && slot.BaselineMevWei != "0" { + if mevEth := weiToEth(slot.BaselineMevWei); mevEth > 0 { + ei.MevRewardEth = &mevEth + } + } + + return ei +} + +func cleanHexString(s string) string { + s = strings.TrimSpace(s) + if strings.HasPrefix(s, "\\\\x") { + return "0x" + s[3:] + } + if strings.HasPrefix(s, "\\x") { + return "0x" + s[2:] + } + if !strings.HasPrefix(s, "0x") { + return "0x" + s + } + return s +} + +func weiToEth(weiStr string) float64 { + weiStr = strings.TrimSpace(weiStr) + if weiStr == "" || weiStr == "0" { + return 0 + } + + wei, ok := new(big.Int).SetString(weiStr, 10) + if !ok { + if strings.HasPrefix(weiStr, "0x") { + wei, ok = new(big.Int).SetString(weiStr[2:], 16) + if !ok { + return 0 + } + } else { + return 0 + } + } + + eth := new(big.Rat).SetFrac(wei, big.NewInt(1e18)) + f, _ := eth.Float64() + return f +} diff --git a/tools/indexer/pkg/beacon/client.go b/tools/indexer/pkg/beacon/client.go index 2a42740ff..1feeb5aec 100644 --- a/tools/indexer/pkg/beacon/client.go +++ b/tools/indexer/pkg/beacon/client.go @@ -27,6 +27,7 @@ type ExecInfo struct { MevRewardEth *float64 ProposerRewardEth *float64 FeeRecipient *string + ValidatorPubkey []byte } func FetchBeaconExecutionBlock(ctx context.Context, httpc *retryablehttp.Client, beaconBase string, blockNum int64) (*ExecInfo, error) { @@ -237,6 +238,7 @@ func fetchBlockFromRPC(httpc *retryablehttp.Client, rpcURL string, blockNumber i } return out, nil } + func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rpcURL, beaconBase string, blockNumber int64) (*ExecInfo, error) { execBlock, err := fetchBlockFromRPC(httpc, rpcURL, blockNumber) if err != nil { diff --git a/tools/indexer/pkg/config/config.go b/tools/indexer/pkg/config/config.go index 20be03624..154226d1c 100644 --- a/tools/indexer/pkg/config/config.go +++ b/tools/indexer/pkg/config/config.go @@ -3,23 +3,23 @@ package config import ( "encoding/json" "fmt" - "github.com/urfave/cli/v2" "strings" "time" + + "github.com/urfave/cli/v2" ) type Relay struct { Relay_id int64 - Name string - Tag string - URL string + + URL string } var RelaysDefault = []Relay{ - {Relay_id: 1, Name: "Titan", Tag: "titan-relay", URL: "https://regional.titanrelay.xyz"}, - {Relay_id: 2, Name: "Aestus", Tag: "aestus-relay", URL: "https://aestus.live"}, - {Relay_id: 3, Name: "Bloxroute Max Profit", Tag: "bloxroute-max-profit-relay", URL: "https://bloxroute.max-profit.blxrbdn.com"}, - {Relay_id: 4, Name: "Bloxroute Regulated", Tag: "bloxroute-regulated-relay", URL: "https://bloxroute.regulated.blxrbdn.com"}, + {Relay_id: 1, URL: "https://regional.titanrelay.xyz"}, + {Relay_id: 2, URL: "https://aestus.live"}, + {Relay_id: 3, URL: "https://bloxroute.max-profit.blxrbdn.com"}, + {Relay_id: 4, URL: "https://bloxroute.regulated.blxrbdn.com"}, } func ResolveRelays(c *cli.Context) ([]Relay, error) { @@ -47,9 +47,10 @@ type Config struct { BaseRetryDelay time.Duration HTTPTimeout time.Duration OptInContract string - EtherscanKey string - InfuraRPC string + AlchemyRPC string BeaconBase string RelayData bool RelaysJSON string + RatedAPIKey string + QuickNodeBase string } diff --git a/tools/indexer/pkg/database/starrock.go b/tools/indexer/pkg/database/starrock.go index 35f358019..7a3836717 100644 --- a/tools/indexer/pkg/database/starrock.go +++ b/tools/indexer/pkg/database/starrock.go @@ -10,7 +10,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" _ "github.com/go-sql-driver/mysql" "github.com/primev/mev-commit/tools/indexer/pkg/beacon" - "github.com/primev/mev-commit/tools/indexer/pkg/config" ) type DB struct { @@ -47,11 +46,12 @@ func Connect(ctx context.Context, dsn string, maxConns, minConns int) (*DB, erro } return &DB{conn: conn}, nil - } + func (db *DB) Close() error { return db.conn.Close() } + func (db *DB) GetMaxSlotNumber(ctx context.Context) (int64, error) { ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -59,6 +59,16 @@ func (db *DB) GetMaxSlotNumber(ctx context.Context) (int64, error) { err := db.conn.QueryRowContext(ctx2, `SELECT COALESCE(MAX(slot),0) FROM blocks`).Scan(&slot) return slot, err } + +func (db *DB) GetMinSlotNumber(ctx context.Context) (int64, error) { + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var slot int64 + err := db.conn.QueryRowContext(ctx2, `SELECT COALESCE(MIN(slot), 0) FROM blocks`).Scan(&slot) + return slot, err +} + func (db *DB) EnsureStateTable(ctx context.Context) error { ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -90,6 +100,7 @@ func (db *DB) EnsureStateTable(ctx context.Context) error { return nil } + func (db *DB) GetMaxBlockNumber(ctx context.Context) (int64, error) { ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -98,14 +109,7 @@ func (db *DB) GetMaxBlockNumber(ctx context.Context) (int64, error) { err := db.conn.QueryRowContext(ctx2, `SELECT COALESCE(MAX(block_number),0) FROM blocks`).Scan(&bn) return bn, err } -func (db *DB) GetValidatorPubkey(ctx context.Context, slot int64) ([]byte, error) { - ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - var vpk []byte - err := db.conn.QueryRowContext(ctx2, `SELECT validator_pubkey FROM blocks WHERE slot=?`, slot).Scan(&vpk) - return vpk, err -} func (db *DB) LoadLastBlockNumber(ctx context.Context) (int64, bool) { ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -131,28 +135,6 @@ func (db *DB) SaveLastBlockNumber(ctx context.Context, bn int64) error { return nil } -func (db *DB) UpsertRelays(ctx context.Context, relays []config.Relay) error { - if len(relays) == 0 { - return nil - } - - ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - // StarRocks batch insert approach - var values []string - for _, r := range relays { - value := fmt.Sprintf("(%d, '%s', '%s', '%s', 1)", r.Relay_id, r.Name, r.Tag, r.URL) - values = append(values, value) - } - - query := fmt.Sprintf(`INSERT INTO relays (relay_id, name, tag, base_url, is_active) VALUES %s`, - strings.Join(values, ",")) - - _, err := db.conn.ExecContext(ctx2, query) - return err -} - func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) error { if ei == nil || ei.BlockNumber == 0 { return fmt.Errorf("upsert block: nil exec info or block_number=0") @@ -168,27 +150,25 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro } else { timestamp = "NULL" } - if ei.ProposerIdx != nil { proposerIndex = fmt.Sprintf("%d", *ei.ProposerIdx) } else { proposerIndex = "NULL" } - if ei.RelayTag != nil { relayTag = fmt.Sprintf("'%s'", *ei.RelayTag) } else { - relayTag = "NULL" + relayTag = "''" } if ei.BuilderPublicKey != nil { builderPubkeyPrefix = fmt.Sprintf("'%s'", (*ei.BuilderPublicKey)) } else { - builderPubkeyPrefix = "NULL" + builderPubkeyPrefix = "''" } if ei.ProposerFeeRecHex != nil { proposerFeeRecHex = fmt.Sprintf("'%s'", (*ei.ProposerFeeRecHex)) } else { - proposerFeeRecHex = "NULL" + proposerFeeRecHex = "''" } if ei.MevRewardEth != nil { mevRewardEth = fmt.Sprintf("%.6f", *ei.MevRewardEth) @@ -198,12 +178,12 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro if ei.FeeRecipient != nil { feeRecHex = fmt.Sprintf("'%s'", (*ei.FeeRecipient)) } else { - feeRecHex = "NULL" + feeRecHex = "''" } if ei.ProposerRewardEth != nil { proposerRewardEth = fmt.Sprintf("%.6f", *ei.ProposerRewardEth) } else { - proposerRewardEth = "NULL" + proposerRewardEth = "''" } query := fmt.Sprintf(` @@ -271,12 +251,10 @@ func (db *DB) InsertBidsBatch(ctx context.Context, rows []BidRow) error { if i > 0 { sb.WriteString(",") } - blockNumSQL := "NULL" if r.BlockNum != nil { blockNumSQL = fmt.Sprintf("%d", *r.BlockNum) } - tsMSSQL := "NULL" if r.TsMS != nil { tsMSSQL = fmt.Sprintf("%d", *r.TsMS) @@ -290,41 +268,11 @@ func (db *DB) InsertBidsBatch(ctx context.Context, rows []BidRow) error { return err } -func (db *DB) GetActiveRelays(ctx context.Context) ([]struct { - ID int64 - URL string -}, error) { - ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - rows, err := db.conn.QueryContext(ctx2, `SELECT relay_id, base_url FROM relays WHERE is_active = 1`) - if err != nil { - return nil, err - } - defer func() { _ = rows.Close() }() - - var results []struct { - ID int64 - URL string - } - for rows.Next() { - var id int64 - var url string - if err := rows.Scan(&id, &url); err != nil { - continue // Skip bad rows - } - results = append(results, struct { - ID int64 - URL string - }{ID: id, URL: url}) - } - return results, rows.Err() -} - func (db *DB) GetRecentMissingBlocks(ctx context.Context, lookback int64, batch int) ([]struct { Slot int64 BlockNumber int64 -}, error) { +}, error, +) { ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -405,7 +353,8 @@ func (db *DB) GetValidatorsNeedingOptInCheck(ctx context.Context, lookback int64 Slot int64 BlockNumber int64 ValidatorPubkey []byte -}, error) { +}, error, +) { ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -454,7 +403,7 @@ func (db *DB) UpdateValidatorOptInStatus(ctx context.Context, slot int64, opted v := 0 if opted { v = 1 - } // TINYINT(1) in StarRocks + } q := fmt.Sprintf( "UPDATE blocks SET validator_opted_in=%d WHERE slot=%d", v, slot, @@ -480,3 +429,154 @@ func (db *DB) GetValidatorPubkeyWithRetry(ctx context.Context, slot int64, retri } return nil, fmt.Errorf("validator pubkey not available after %d retries", retries) } + +func (db *DB) BatchUpsertBlocksFromExec(ctx context.Context, execInfos []*beacon.ExecInfo) error { + if len(execInfos) == 0 { + return nil + } + + const maxRowsPerInsert = 500 + + type row struct { + sql string + } + + rows := make([]row, 0, len(execInfos)) + + for _, ei := range execInfos { + if ei == nil || ei.BlockNumber == 0 { + continue + } + + var timestamp, proposerIndex, relayTag, builderPubkeyPrefix, + proposerFeeRecHex, mevRewardEth, feeRecHex, proposerRewardEth string + + if ei.Timestamp != nil { + timestamp = fmt.Sprintf("'%s'", ei.Timestamp.Format("2006-01-02 15:04:05")) + } else { + timestamp = "NULL" + } + if ei.ProposerIdx != nil { + proposerIndex = fmt.Sprintf("%d", *ei.ProposerIdx) + } else { + proposerIndex = "NULL" + } + if ei.RelayTag != nil { + relayTag = fmt.Sprintf("'%s'", *ei.RelayTag) + } else { + relayTag = "''" + } + if ei.BuilderPublicKey != nil { + builderPubkeyPrefix = fmt.Sprintf("'%s'", *ei.BuilderPublicKey) + } else { + builderPubkeyPrefix = "''" + } + if ei.ProposerFeeRecHex != nil { + proposerFeeRecHex = fmt.Sprintf("'%s'", *ei.ProposerFeeRecHex) + } else { + proposerFeeRecHex = "''" + } + if ei.MevRewardEth != nil { + mevRewardEth = fmt.Sprintf("%.6f", *ei.MevRewardEth) + } else { + mevRewardEth = "''" + } + if ei.FeeRecipient != nil { + feeRecHex = fmt.Sprintf("'%s'", *ei.FeeRecipient) + } else { + feeRecHex = "''" + } + if ei.ProposerRewardEth != nil { + proposerRewardEth = fmt.Sprintf("%.6f", *ei.ProposerRewardEth) + } else { + proposerRewardEth = "''" + } + + value := fmt.Sprintf("(%d, %d, %s, %s, %s, %s, %s, %s, %s, %s)", + ei.Slot, + ei.BlockNumber, + timestamp, + proposerIndex, + relayTag, + builderPubkeyPrefix, + proposerFeeRecHex, + mevRewardEth, + proposerRewardEth, + feeRecHex, + ) + + rows = append(rows, row{sql: value}) + } + + if len(rows) == 0 { + return nil + } + + // Insert in chunks + for i := 0; i < len(rows); i += maxRowsPerInsert { + j := i + maxRowsPerInsert + if j > len(rows) { + j = len(rows) + } + + query := ` +INSERT INTO blocks( + slot, block_number, timestamp, proposer_index, + winning_relay, winning_builder_pubkey, proposer_fee_recipient, + mev_reward, proposer_reward_eth, fee_recipient +) VALUES ` + strings.Join(func(ss []row) []string { + out := make([]string, len(ss)) + for k := range ss { + out[k] = ss[k].sql + } + return out + }(rows[i:j]), ",") + + // short timeout per batch + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + _, err := db.conn.ExecContext(ctx2, query) + cancel() + if err != nil { + return fmt.Errorf("batch upsert blocks [%d:%d]: %w", i, j, err) + } + } + + return nil +} + +// INSERT into StarRocks PK table acts as UPSERT on slot +func (db *DB) UpsertBlockPubkeysDirect(ctx context.Context, pairs []struct { + Slot int64 + Pubkey string +}, +) error { + if len(pairs) == 0 { + return nil + } + const maxRows = 1000 + vals := make([]string, 0, len(pairs)) + for _, p := range pairs { + if p.Slot == 0 || p.Pubkey == "" { + continue + } + vals = append(vals, fmt.Sprintf("(%d, '%s')", p.Slot, p.Pubkey)) + } + if len(vals) == 0 { + return nil + } + + for i := 0; i < len(vals); i += maxRows { + j := i + maxRows + if j > len(vals) { + j = len(vals) + } + q := "INSERT INTO blocks (slot, validator_pubkey) VALUES " + strings.Join(vals[i:j], ",") + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + _, err := db.conn.ExecContext(ctx2, q) + cancel() + if err != nil { + return fmt.Errorf("upsert block pubkeys [%d:%d]: %w", i, j, err) + } + } + return nil +} diff --git a/tools/indexer/pkg/ethereum/client.go b/tools/indexer/pkg/ethereum/client.go index 31b3b8d71..080128cd2 100644 --- a/tools/indexer/pkg/ethereum/client.go +++ b/tools/indexer/pkg/ethereum/client.go @@ -2,19 +2,17 @@ package ethereum import ( "bytes" - "encoding/json" "fmt" "math/big" + "net/http" + "strconv" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/primev/mev-commit/tools/indexer/pkg/config" - "net/http" - "strconv" - "github.com/primev/mev-commit/contracts-abi/clients/ValidatorOptInRouter" ) @@ -22,7 +20,7 @@ func CallAreOptedInAtBlock(httpc *http.Client, cfg *config.Config, blockNum int6 if len(pubkey) == 0 { return false, fmt.Errorf("empty pubkey") } - client, err := ethclient.Dial(cfg.InfuraRPC) + client, err := ethclient.Dial(cfg.AlchemyRPC) if err != nil { return false, err } diff --git a/tools/indexer/pkg/ingest/ingest.go b/tools/indexer/pkg/ingest/ingest.go index 669afa743..4d0125095 100644 --- a/tools/indexer/pkg/ingest/ingest.go +++ b/tools/indexer/pkg/ingest/ingest.go @@ -60,7 +60,6 @@ func LaunchValidatorTasks(ctx context.Context, cfg *config.Config, db *database. logger.Info("validator opt-in status", "slot", ei.Slot, "opted_in", opted) } return nil - } func ProcessBidsForBlock(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, ei *beacon.ExecInfo, logger *slog.Logger) error { diff --git a/tools/indexer/pkg/relay/client.go b/tools/indexer/pkg/relay/client.go index 1eafb7d9f..70a102128 100644 --- a/tools/indexer/pkg/relay/client.go +++ b/tools/indexer/pkg/relay/client.go @@ -5,19 +5,16 @@ import ( "encoding/json" "fmt" "math/big" - + "strconv" "strings" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/hashicorp/go-retryablehttp" - "github.com/primev/mev-commit/tools/indexer/pkg/config" "github.com/primev/mev-commit/tools/indexer/pkg/database" httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" - - "strconv" ) type Row struct { @@ -58,7 +55,6 @@ func parseBigString(v any) (string, bool) { } func BuildBidInsert(slot int64, relayID int64, bid map[string]any) (database.BidRow, bool) { - if slot <= 0 || relayID <= 0 { return database.BidRow{}, false } @@ -126,25 +122,6 @@ func BuildBidInsert(slot int64, relayID int64, bid map[string]any) (database.Bid }, true } -func UpsertRelaysAndLoad(ctx context.Context, db *database.DB) ([]Row, error) { - // upsert defaults from code - if err := db.UpsertRelays(ctx, config.RelaysDefault); err != nil { - return nil, err - } - ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - dbResults, err := db.GetActiveRelays(ctx2) - if err != nil { - return nil, err - } - - var rws []Row - for _, result := range dbResults { - rws = append(rws, Row{ID: result.ID, URL: result.URL}) - } - return rws, nil -} - func FetchBuilderBlocksReceived(ctx context.Context, httpc *retryablehttp.Client, relayBase string, slot int64) ([]map[string]any, error) { url := fmt.Sprintf("%s/relay/v1/data/bidtraces/builder_blocks_received?slot=%d", strings.TrimRight(relayBase, "/"), slot) From 393c61de448a165a2c79644ca0e3a04eecdedb0f Mon Sep 17 00:00:00 2001 From: Rose Jethani Date: Tue, 21 Oct 2025 18:29:37 +0530 Subject: [PATCH 8/8] changed the backfill logic --- tools/indexer/pkg/backfill/backfill.go | 4 ++-- tools/indexer/pkg/database/starrock.go | 12 +++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go index 48d391c98..870b54692 100644 --- a/tools/indexer/pkg/backfill/backfill.go +++ b/tools/indexer/pkg/backfill/backfill.go @@ -86,7 +86,7 @@ func (r *RatedAPIClient) FetchSlotsBatch(ctx context.Context, startSlot, endSlot if err != nil { return nil, fmt.Errorf("executing request: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() if resp.StatusCode != 200 { return nil, fmt.Errorf("API returned status %d", resp.StatusCode) @@ -232,7 +232,7 @@ func (q *QuickNodeClient) FetchValidatorPubkeysBatch( resCh <- chunkRes{err: err, seen: ch} return } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() body, rerr := io.ReadAll(resp.Body) if rerr != nil { diff --git a/tools/indexer/pkg/database/starrock.go b/tools/indexer/pkg/database/starrock.go index 7a3836717..26c67bf08 100644 --- a/tools/indexer/pkg/database/starrock.go +++ b/tools/indexer/pkg/database/starrock.go @@ -183,7 +183,7 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro if ei.ProposerRewardEth != nil { proposerRewardEth = fmt.Sprintf("%.6f", *ei.ProposerRewardEth) } else { - proposerRewardEth = "''" + proposerRewardEth = "NULL" } query := fmt.Sprintf(` @@ -404,10 +404,8 @@ func (db *DB) UpdateValidatorOptInStatus(ctx context.Context, slot int64, opted if opted { v = 1 } - q := fmt.Sprintf( - "UPDATE blocks SET validator_opted_in=%d WHERE slot=%d", - v, slot, - ) + + q := fmt.Sprintf("INSERT INTO blocks (slot, validator_opted_in) VALUES (%d, %d)", slot, v) _, err := db.conn.ExecContext(ctx2, q) return err } @@ -479,7 +477,7 @@ func (db *DB) BatchUpsertBlocksFromExec(ctx context.Context, execInfos []*beacon if ei.MevRewardEth != nil { mevRewardEth = fmt.Sprintf("%.6f", *ei.MevRewardEth) } else { - mevRewardEth = "''" + mevRewardEth = "NULL" } if ei.FeeRecipient != nil { feeRecHex = fmt.Sprintf("'%s'", *ei.FeeRecipient) @@ -489,7 +487,7 @@ func (db *DB) BatchUpsertBlocksFromExec(ctx context.Context, execInfos []*beacon if ei.ProposerRewardEth != nil { proposerRewardEth = fmt.Sprintf("%.6f", *ei.ProposerRewardEth) } else { - proposerRewardEth = "''" + proposerRewardEth = "NULL" } value := fmt.Sprintf("(%d, %d, %s, %s, %s, %s, %s, %s, %s, %s)",