Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tools/indexer/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ var (
Name: "backfill-lookback",
Usage: "number of slots to look back for backfill",
EnvVars: []string{"INDEXER_BACKFILL_LOOKBACK"},
Value: 10000000,
Value: 50400,
})

optionBackfillBatch = altsrc.NewIntFlag(&cli.IntFlag{
Name: "backfill-batch",
Usage: "batch size for backfill operations",
EnvVars: []string{"INDEXER_BACKFILL_BATCH"},
Value: 5,
Value: 100,
})

optionHTTPTimeout = altsrc.NewDurationFlag(&cli.DurationFlag{
Expand Down
192 changes: 118 additions & 74 deletions tools/indexer/pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"github.com/hashicorp/go-retryablehttp"
Expand All @@ -28,95 +29,138 @@ func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, c
if err := ctx.Err(); err != nil {
return err
}

blocks, err := db.GetRecentMissingBlocks(ctx, cfg.BackfillLookback, cfg.BackfillBatch)
if err != nil {
return fmt.Errorf("get missing blocks: %w", err)
}

for _, b := range blocks {
if err := ctx.Err(); err != nil {
return err
lastSlotNumber, _ := db.GetMaxSlotNumber(ctx)
startSlot := lastSlotNumber - cfg.BackfillLookback

batch := cfg.BackfillBatch
totalBatches := (cfg.BackfillLookback + int64(batch) - 1) / int64(batch)
logger.Info("Starting backfill",
"start_slot", startSlot,
"end_slot_exclusive", lastSlotNumber,
"lookback_slots", cfg.BackfillLookback,
"batch_size", cfg.BackfillBatch,
"total_batches", totalBatches,
)

batchSz := int64(cfg.BackfillBatch)
var processed int64
for batchIdx := int64(0); batchIdx < totalBatches; batchIdx++ {
batchStart := startSlot + batchIdx*int64(batch)
batchEnd := batchStart + batchSz
if batchEnd > lastSlotNumber {
batchEnd = lastSlotNumber
}
logger.Info("Batch begin",
"batch", batchIdx+1, "of", totalBatches,
"range", fmt.Sprintf("[%d,%d)", batchStart, batchEnd),
)
for slot := batchStart; slot < batchEnd; slot++ {
tTotal := time.Now()
conversCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
blockNumber, err := ethereum.SlotToExecutionBlockNumber(conversCtx, httpc, cfg.BeaconBase, slot)
cancel()
if err != nil {
logger.Error("Failed to convert slot to block number", "slot", slot, "error", err)
continue
}
if blockNumber != 0 {

fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, b.BlockNumber)
cancel()
if ferr != nil || ei == nil {
logger.Error("beacon fetch failed", "block", b.BlockNumber, "error", ferr)
continue
}
if err := ctx.Err(); err != nil {
return err
}

if err := db.UpsertBlockFromExec(ctx, ei); err != nil {
logger.Error("block upsert failed", "slot", ei.Slot, "error", err)
continue
}
fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, blockNumber)
cancel()
if ferr != nil || ei == nil {
logger.Error("beacon fetch failed", "block", blockNumber, "error", ferr)
continue
}

if err := db.UpsertBlockFromExec(ctx, ei); err != nil {
logger.Error("block upsert failed", "slot", ei.Slot, "error", err)
continue
}

var vpub []byte
if ei.ProposerIdx != nil {
vctx, vcancel := context.WithTimeout(ctx, 5*time.Second)
v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx)
vcancel()
if verr != nil {
logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr)
} else if len(v) > 0 {
vpub = v

// Save validator pubkey
if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil {
logger.Error("validator update failed", "slot", ei.Slot, "error", err)
} else {

opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub)

if oerr != nil {
logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr)
} else {
updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second)
if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil {
logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr)
var vpub []byte
if ei.ProposerIdx != nil {
vctx, vcancel := context.WithTimeout(ctx, 5*time.Second)
v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx)
vcancel()
if verr != nil {
logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr)
} else if len(v) > 0 {
vpub = v

// Save validator pubkey
if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil {
logger.Error("validator update failed", "slot", ei.Slot, "error", err)
} else {
opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub)
if oerr != nil {
logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr)
} else {
updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second)
if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil {
logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr)
}
updCancel()
}
}
updCancel()
}
}
}
}
tBids := time.Now()
var wg sync.WaitGroup
for _, r := range relays {
wg.Add(1)
go func(rel relay.Row) {
defer wg.Done()
if err := ctx.Err(); err != nil {
return
}

for _, r := range relays {
if err := ctx.Err(); err != nil {
return err
}
bctx, bcancel := context.WithTimeout(ctx, 5*time.Second)
bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, rel.URL, ei.Slot)
bcancel()
if berr != nil {
logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr)
return
}

bctx, bcancel := context.WithTimeout(ctx, 5*time.Second)
bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, r.URL, ei.Slot)
bcancel()
if berr != nil {
logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr)
continue
}
if len(bids) == 0 {
return
}

if len(bids) == 0 {
continue
}
rows := make([]database.BidRow, 0, len(bids))
for _, bid := range bids {
if row, ok := relay.BuildBidInsert(ei.Slot, rel.ID, bid); ok {
rows = append(rows, row)
}
}

rows := make([]database.BidRow, 0, len(bids))
for _, bid := range bids {
if row, ok := relay.BuildBidInsert(ei.Slot, r.ID, bid); ok {
rows = append(rows, row)
if len(rows) > 0 {
insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second)
if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil {
logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr)
}
insCancel()
}
}(r)
}
wg.Wait()
bidsMs := time.Since(tBids).Milliseconds()
logger.Info("Bids fetch and insert", "bids_ms", bidsMs)
processed++
totalMS := time.Since(tTotal).Milliseconds()
logger.Info("total time taken", "total_ms", totalMS)
}

if len(rows) > 0 {
insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second)
if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil {
logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr)
}
insCancel()
}
logger.Info("Batch end",
"batch", batchIdx+1,
"processed_slots_so_far", processed,
)
}
logger.Debug("slot processed", "slot", ei.Slot)
}

logger.Info("Backfill completed", "blocks_processed", len(blocks))
logger.Info("Backfill completed", "total_slots_processed", processed)
return nil
}
10 changes: 1 addition & 9 deletions tools/indexer/pkg/beacon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/hashicorp/go-retryablehttp"
"github.com/primev/mev-commit/tools/indexer/pkg/ethereum"

httputil "github.com/primev/mev-commit/tools/indexer/pkg/http"
)

Expand Down Expand Up @@ -200,10 +200,6 @@ func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rp
if err != nil {
return nil, err
}

// Convert block number to slot for beacon chain query
slotNumber := ethereum.BlockNumberToSlot(blockNumber)

beaconData, _ := FetchBeaconExecutionBlock(ctx, httpc, beaconBase, blockNumber)

// Merge data - use Alchemy as primary, beacon as supplement
Expand All @@ -214,10 +210,6 @@ func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rp
execBlock.RewardEth = beaconData.RewardEth
execBlock.BuilderHex = beaconData.BuilderHex
execBlock.FeeRecHex = beaconData.FeeRecHex
} else {

execBlock.Slot = slotNumber
}

return execBlock, nil
}
29 changes: 22 additions & 7 deletions tools/indexer/pkg/database/starrock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ func Connect(ctx context.Context, dsn string, maxConns, minConns int) (*DB, erro
func (db *DB) Close() error {
return db.conn.Close()
}

func (db *DB) GetMaxSlotNumber(ctx context.Context) (int64, error) {
ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var slot int64
err := db.conn.QueryRowContext(ctx2, `SELECT COALESCE(MAX(slot),0) FROM blocks`).Scan(&slot)
return slot, err
}
func (db *DB) EnsureStateTable(ctx context.Context) error {
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -155,7 +161,7 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro
ctx2, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

var timestamp, proposerIndex, relayTag, rewardEth string
var timestamp, proposerIndex, relayTag, rewardEth, builderPubkeyPrefix, feeRecHex string

if ei.Timestamp != nil {
timestamp = fmt.Sprintf("'%s'", ei.Timestamp.Format("2006-01-02 15:04:05"))
Expand All @@ -174,7 +180,16 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro
} else {
relayTag = "NULL"
}

if ei.BuilderHex != nil {
builderPubkeyPrefix = fmt.Sprintf("'%s'", (*ei.BuilderHex))
} else {
builderPubkeyPrefix = "NULL"
}
if ei.FeeRecHex != nil {
feeRecHex = fmt.Sprintf("'%s'", (*ei.FeeRecHex))
} else {
feeRecHex = "NULL"
}
if ei.RewardEth != nil {
rewardEth = fmt.Sprintf("%.6f", *ei.RewardEth)
} else {
Expand All @@ -184,9 +199,9 @@ func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) erro
query := fmt.Sprintf(`
INSERT INTO blocks(
slot, block_number, timestamp, proposer_index,
winning_relay, producer_reward_eth
) VALUES (%d, %d, %s, %s, %s, %s)`,
ei.Slot, ei.BlockNumber, timestamp, proposerIndex, relayTag, rewardEth)
winning_relay, producer_reward_eth, winning_builder_pubkey, fee_recipient
) VALUES (%d, %d, %s, %s, %s, %s, %s, %s)`,
ei.Slot, ei.BlockNumber, timestamp, proposerIndex, relayTag, rewardEth, builderPubkeyPrefix, feeRecHex)

_, err := db.conn.ExecContext(ctx2, query)
if err != nil {
Expand Down Expand Up @@ -429,7 +444,7 @@ func (db *DB) UpdateValidatorOptInStatus(ctx context.Context, slot int64, opted
v = 1
} // TINYINT(1) in StarRocks
q := fmt.Sprintf(
"UPDATE blocks SET validator_opted_in=%d WHERE slot=%d AND validator_opted_in IS NULL",
"UPDATE blocks SET validator_opted_in=%d WHERE slot=%d",
v, slot,
)
_, err := db.conn.ExecContext(ctx2, q)
Expand Down
51 changes: 43 additions & 8 deletions tools/indexer/pkg/ethereum/conversions.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,49 @@
package ethereum

func BlockNumberToSlot(blockNumber int64) int64 {
// Ethereum mainnet merge happened at slot 4700013 (block 15537394)
const MERGE_BLOCK = 15537394
const MERGE_SLOT = 4700013
import (
"context"
"fmt"
"time"

if blockNumber < MERGE_BLOCK {
return 0 // Pre-merge blocks don't have valid slots
"github.com/hashicorp/go-retryablehttp"
httputil "github.com/primev/mev-commit/tools/indexer/pkg/http"
)

// SlotToExecutionBlockNumber converts a beacon chain slot to an execution layer block number
// using the beaconcha.in API with retryable HTTP client and context support.
func SlotToExecutionBlockNumber(ctx context.Context, httpc *retryablehttp.Client, beaconBase string, slot int64) (int64, error) {
url := fmt.Sprintf("%s/slot/%d", beaconBase, slot)

if _, has := ctx.Deadline(); !has {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()
}

var wrap struct {
Status string `json:"status"`
Data struct {
ExecutionBlockNumber int64 `json:"exec_block_number"`
Status string `json:"status"`
} `json:"data"`
}

if err := httputil.FetchJSON(ctx, httpc, url, &wrap); err != nil {
return 0, fmt.Errorf("failed to fetch slot %d: %w", slot, err)
}

if wrap.Status != "OK" {
return 0, fmt.Errorf("API returned status: %s for slot %d", wrap.Status, slot)
}

// Check if slot was missed
if wrap.Data.Status != "1" {
return 0, nil // Missed slot, not an error
}

if wrap.Data.ExecutionBlockNumber == 0 {
return 0, fmt.Errorf("no execution block number for slot %d", slot)
}

// Post-merge: roughly 1 slot per block
return MERGE_SLOT + (blockNumber - MERGE_BLOCK)
return wrap.Data.ExecutionBlockNumber, nil
}
Loading