diff --git a/go.work.sum b/go.work.sum index 86ab8f23c..7e9c788a8 100644 --- a/go.work.sum +++ b/go.work.sum @@ -818,6 +818,7 @@ github.com/chromedp/chromedp v0.9.2 h1:dKtNz4kApb06KuSXoTQIyUC2TrA0fhGDwNZf3bcgf github.com/chromedp/chromedp v0.9.2/go.mod h1:LkSXJKONWTCHAfQasKFUZI+mxqS4tZqhmtGzzhLsnLs= github.com/chromedp/sysutil v1.0.0 h1:+ZxhTpfpZlmchB58ih/LBHX52ky7w2VhQVKQMucy3Ic= github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww= +github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4MedaY= github.com/chzyer/logex v1.2.1 h1:XHDu3E6q+gdHgsdTPH6ImJMIp436vR6MPtH8gP05QzM= @@ -826,6 +827,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic= github.com/chzyer/readline v1.5.1 h1:upd/6fQk4src78LMRzh5vItIt361/o4uq553V8B5sGI= github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/chzyer/test v1.0.0 h1:p3BQDXSxOhOG0P9z6/hGnII4LGiEPOYBhs8asl/fC04= @@ -1337,8 +1339,6 @@ github.com/iris-contrib/pongo2 v0.0.1 h1:zGP7pW51oi5eQZMIlGA3I+FHY9/HOQWDB+572yi github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw= github.com/iris-contrib/schema v0.0.6 h1:CPSBLyx2e91H2yJzPuhGuifVRnZBBJ3pCOMbOvPZaTw= github.com/iris-contrib/schema v0.0.6/go.mod h1:iYszG0IOsuIsfzjymw1kMzTL8YQcCWlm65f3wX8J5iA= -github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= -github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jedisct1/go-minisign v0.0.0-20230811132847-661be99b8267 h1:TMtDYDHKYY15rFihtRfck/bfFqNfvcabqvXAFQfAUpY= @@ -2131,7 +2131,6 @@ golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2204,7 +2203,6 @@ golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457 h1:zf5N6UOrA487eEFacMePxjXAJctxKmyjKUsjA11Uzuk= @@ -2222,6 +2220,7 @@ golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU= golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s= +golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o= golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/tools/go.mod b/tools/go.mod index 131ff8d50..c210fb0b8 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -23,6 +23,7 @@ require ( require ( github.com/DATA-DOG/go-sqlmock v1.5.2 + github.com/go-sql-driver/mysql v1.9.3 github.com/google/go-cmp v0.6.0 github.com/testcontainers/testcontainers-go v0.27.0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 @@ -31,7 +32,9 @@ require ( require ( dario.cat/mergo v1.0.0 // indirect + filippo.io/edwards25519 v1.1.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect + github.com/BurntSushi/toml v1.4.0 // indirect github.com/Microsoft/hcsshim v0.11.4 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/containerd/containerd v1.7.11 // indirect diff --git a/tools/go.sum b/tools/go.sum index dcdab0538..8f8bd4347 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -2,10 +2,14 @@ buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.32.0-2024022118033 buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.32.0-20240221180331-f05a6f4403ce.1/go.mod h1:tiTMKD8j6Pd/D2WzREoweufjzaJKHZg35f/VGcZ2v3I= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= @@ -109,6 +113,8 @@ github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= +github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= diff --git a/tools/indexer/cmd/main.go b/tools/indexer/cmd/main.go new file mode 100644 index 000000000..5cc43e8fc --- /dev/null +++ b/tools/indexer/cmd/main.go @@ -0,0 +1,152 @@ +package main + +import ( + "context" + + "fmt" + + _ "github.com/go-sql-driver/mysql" + "github.com/primev/mev-commit/tools/indexer/pkg/config" + "github.com/urfave/cli/v2" + "github.com/urfave/cli/v2/altsrc" + + "os" + "os/signal" + "syscall" + "time" +) + +var ( + optionConfig = &cli.StringFlag{ + Name: "config", + Usage: "Path to config file", + EnvVars: []string{"INDEXER_CONFIG"}, + } + optionDatabaseURL = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "database-url", + Usage: "Database connection URL", + EnvVars: []string{"INDEXER_DATABASE_URL"}, + Required: true, + }) + optionOptInContract = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "opt-in-contract", + Usage: "Opt-in contract address", + EnvVars: []string{"INDEXER_OPT_IN_CONTRACT"}, + Value: "0x821798d7b9d57dF7Ed7616ef9111A616aB19ed64", + }) + optionEtherscanKey = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "etherscan-key", + Usage: "Etherscan API key", + EnvVars: []string{"INDEXER_ETHERSCAN_KEY"}, + }) + optionInfuraRPC = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "infura-rpc", + Usage: "Infura RPC URL", + EnvVars: []string{"INDEXER_INFURA_RPC"}, + Required: true, + }) + optionBeaconBase = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "beacon-base", + Usage: "Beacon API base URL", + EnvVars: []string{"INDEXER_BEACON_BASE"}, + Value: "https://beaconcha.in/api/v1", + }) + optionBlockInterval = altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: "block-interval", + Usage: "interval between block processing", + EnvVars: []string{"INDEXER_BLOCK_INTERVAL"}, + Value: 12 * time.Second, + }) + + optionValidatorDelay = altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: "validator-delay", + Usage: "delay before fetching validator data", + EnvVars: []string{"INDEXER_VALIDATOR_DELAY"}, + Value: 1500 * time.Millisecond, + }) + + optionBackfillLookback = altsrc.NewIntFlag(&cli.IntFlag{ + Name: "backfill-lookback", + Usage: "number of slots to look back for backfill", + EnvVars: []string{"INDEXER_BACKFILL_LOOKBACK"}, + Value: 10000000, + }) + + optionBackfillBatch = altsrc.NewIntFlag(&cli.IntFlag{ + Name: "backfill-batch", + Usage: "batch size for backfill operations", + EnvVars: []string{"INDEXER_BACKFILL_BATCH"}, + Value: 5, + }) + + optionHTTPTimeout = altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: "http-timeout", + Usage: "HTTP client timeout", + EnvVars: []string{"INDEXER_HTTP_TIMEOUT"}, + Value: 15 * time.Second, + }) +) + +func createOptionsFromCLI(c *cli.Context) *config.Config { + return &config.Config{ + BlockTick: c.Duration("block-interval"), + ValidatorWait: c.Duration("validator-delay"), + BackfillLookback: int64(c.Int("backfill-lookback")), + BackfillBatch: c.Int("backfill-batch"), + HTTPTimeout: c.Duration("http-timeout"), + OptInContract: c.String("opt-in-contract"), + EtherscanKey: c.String("etherscan-key"), + InfuraRPC: c.String("infura-rpc"), + BeaconBase: c.String("beacon-base"), + } +} + +func main() { + flags := []cli.Flag{ + optionConfig, + optionDatabaseURL, + optionInfuraRPC, + optionBeaconBase, + optionBlockInterval, + optionValidatorDelay, + + optionBackfillLookback, + optionBackfillBatch, + optionHTTPTimeout, + optionOptInContract, + optionEtherscanKey, + } + + app := &cli.App{ + Name: "mev-indexer", + Usage: "Builder/observer indexer", + Commands: []*cli.Command{{ + Name: "start", + Usage: "Start the indexer", + Flags: flags, + Before: altsrc.InitInputSourceWithContext( + flags, altsrc.NewYamlSourceFromFlagFunc("config"), + ), + Action: func(c *cli.Context) error { + return startIndexer(c) + }, + }}, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigc + _, _ = fmt.Fprintln(app.Writer, "received interrupt signal, exiting... Force exit with Ctrl+C") + cancel() + <-sigc + _, _ = fmt.Fprintln(app.Writer, "force exiting...") + os.Exit(1) + }() + + if err := app.RunContext(ctx, os.Args); err != nil { + _, _ = fmt.Fprintf(app.Writer, "exited with error: %v\n", err) + } + +} diff --git a/tools/indexer/cmd/start.go b/tools/indexer/cmd/start.go new file mode 100644 index 000000000..1f3a2b5d4 --- /dev/null +++ b/tools/indexer/cmd/start.go @@ -0,0 +1,336 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "reflect" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/hashicorp/go-retryablehttp" + "github.com/primev/mev-commit/tools/indexer/pkg/backfill" + "github.com/primev/mev-commit/tools/indexer/pkg/beacon" + "github.com/primev/mev-commit/tools/indexer/pkg/database" + "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" + httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" + "github.com/primev/mev-commit/tools/indexer/pkg/relay" + + "github.com/urfave/cli/v2" +) + +func initializeDatabase(ctx context.Context, dbURL string, logger *slog.Logger) (*database.DB, error) { + db, err := database.Connect(ctx, dbURL, 20, 5) + if err != nil { + logger.Error("[DB] connection failed", "error", err) + return nil, err + } + logger.Info("[DB] connected to StarRocks database") + + if err := db.EnsureStateTable(ctx); err != nil { + logger.Error("[DB] failed to ensure state table", "error", err) + return nil, err + } + logger.Info("[DB] state table ready") + + return db, nil +} + +func loadRelays(ctx context.Context, db *database.DB, logger *slog.Logger) ([]relay.Row, error) { + relays, err := relay.UpsertRelaysAndLoad(ctx, db) + if err != nil { + logger.Error("[RELAY] failed to load", "error", err) + return nil, err + } + + logger.Info("[RELAY] loaded active relays", "count", len(relays)) + for _, r := range relays { + logger.Info("[RELAY] relay found", "id", r.ID, "url", r.URL) + } + + return relays, nil +} + +func getStartingBlockNumber(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, infuraRPC string, logger *slog.Logger) (int64, error) { + lastBN, found := db.LoadLastBlockNumber(ctx) + + if !found || lastBN == 0 { + logger.Info("no previous state found, checking database for latest block") + var err error + lastBN, err = db.GetMaxBlockNumber(ctx) + if err != nil { + logger.Error("database query failed", "error", err) + } + } + + if lastBN == 0 { + logger.Info("getting latest block from Ethereum RPC...") + + latestBlock, err := ethereum.GetLatestBlockNumber(httpc.HTTPClient, infuraRPC) + if err != nil { + logger.Error("failed to get latest block from RPC", "error", err) + return 0, err + } + + lastBN = latestBlock - 10 // Start 10 blocks behind to ensure data availability + logger.Info("starting from block", "block", lastBN, "latest", latestBlock) + } + return lastBN, nil +} + +func runBackfillIfConfigured(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, logger *slog.Logger) { + logger.Info("indexer configuration", "lookback", c.Int("backfill-lookback"), "batch", c.Int("backfill-batch")) + + if c.Int("backfill-lookback") > 0 { + logger.Info("[BACKFILL] running one-time backfill", + "lookback", c.Int("backfill-lookback"), + "batch", c.Int("backfill-batch")) + if err := backfill.RunAll(ctx, db, httpc, createOptionsFromCLI(c), relays); err != nil { + logger.Error("[BACKFILL] failed", "error", err) + } else { + logger.Info("[BACKFILL] completed startup backfill") + } + } else { + logger.Info("[BACKFILL] skipped", "reason", "backfill-lookback=0") + } +} + +func runMainLoop(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, infuraRPC, beaconBase string, startBN int64, logger *slog.Logger) error { + mainTicker := time.NewTicker(c.Duration("block-interval")) + defer mainTicker.Stop() + + lastBN := startBN + + for { + select { + case <-ctx.Done(): + logger.Info("[SHUTDOWN] graceful shutdown initiated", "reason", ctx.Err()) + if err := db.SaveLastBlockNumber(ctx, lastBN); err != nil { + logger.Error("[SHUTDOWN] failed to save last block number", "error", err) + } + logger.Info("[SHUTDOWN] indexer stopped", "block", lastBN) + return nil + + case <-mainTicker.C: + lastBN = processNextBlock(ctx, c, db, httpc, relays, infuraRPC, beaconBase, lastBN, logger) + } + } +} +func safe(p interface{}) interface{} { + v := reflect.ValueOf(p) + if !v.IsValid() || v.IsNil() { + return nil + } + return v.Elem().Interface() +} +func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, infuraRPC, beaconBase string, lastBN int64, logger *slog.Logger) int64 { + nextBN := lastBN + 1 + + ei, err := beacon.FetchCombinedBlockData(ctx, httpc, infuraRPC, beaconBase, nextBN) + if err != nil || ei == nil { + logger.Warn("[BLOCK] not available yet", "block", nextBN, "error", err) + return lastBN + } + + logger.Info("processing block", + "block", nextBN, + "slot", ei.Slot, + "timestamp", ei.Timestamp, + "proposer_index", safe(ei.ProposerIdx), + "winning_relay", safe(ei.RelayTag), + "builder_pubkey_prefix", safe(ei.BuilderHex), + "producer_reward_eth", safe(ei.RewardEth), + ) + + if err := db.UpsertBlockFromExec(ctx, ei); err != nil { + logger.Error("[DB] failed to save block", "block", nextBN, "error", err) + return lastBN + } + logger.Info("[DB] block saved successfully", "block", nextBN) + + if err := processBidsForBlock(ctx, db, httpc, relays, ei, logger); err != nil { + logger.Error("failed to process bids", "error", err) + return lastBN + } + if err := launchValidatorTasks(ctx, c, db, httpc, ei, beaconBase, logger); err != nil { + logger.Error("[VALIDATOR] failed to launch async tasks", "slot", ei.Slot, "error", err) + return lastBN + } + + saveBlockProgress(db, nextBN, logger) + return nextBN +} + +func processBidsForBlock(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, ei *beacon.ExecInfo, logger *slog.Logger) error { + + // Fetch and store bid data from all relays + totalBids := 0 + successfulRelays := 0 + const batchSize = 500 + for _, rr := range relays { + if err := ctx.Err(); err != nil { + logger.Warn("main context canceled, stopping relay processing") + return err + } + + bids, err := relay.FetchBuilderBlocksReceived(ctx, httpc, rr.URL, ei.Slot) + if err != nil { + // logger.Error("[RELAY] failed to fetch bids", "relay_id", rr.ID, "url", rr.URL, "error", err) + return fmt.Errorf("fetch bids: relay_id=%d url=%s slot=%d: %w", rr.ID, rr.URL, ei.Slot, err) + + } + + relayBids := 0 + batch := make([]database.BidRow, 0, batchSize) + + for _, bid := range bids { + + if err := ctx.Err(); err != nil { + logger.Warn("[BIDS] main context canceled, stopping bid insertion") + return err + } + + if row, ok := relay.BuildBidInsert(ei.Slot, rr.ID, bid); ok { + batch = append(batch, row) + + if len(batch) >= batchSize { + insCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + if err := db.InsertBidsBatch(insCtx, batch); err != nil { + + logger.Error("[DB]batch insert failed", "slot", ei.Slot, "relay_id", rr.ID, "count", len(batch), "error", err) + } else { + relayBids += len(batch) + } + cancel() + batch = batch[:0] + } + } + } + + // final flush + if len(batch) > 0 { + flushCtx, flushCancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := db.InsertBidsBatch(flushCtx, batch); err != nil { + logger.Error("[DB] batch insert failed", "slot", ei.Slot, "relay_id", rr.ID, "count", len(batch), "error", err) + } else { + relayBids += len(batch) + } + flushCancel() + } + + if relayBids > 0 { + logger.Info("[BIDS] bids collected", "relay_id", rr.ID, "count", relayBids) + totalBids += relayBids + successfulRelays++ + } + } + logger.Info("[BIDS] summary", "block", ei.BlockNumber, "total_bids", totalBids, "successful_relays", successfulRelays) + return nil +} + +func saveBlockProgress(db *database.DB, blockNum int64, logger *slog.Logger) { + gctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := db.SaveLastBlockNumber(gctx, blockNum); err != nil { + logger.Error("[PROGRESS] failed to save block number", "block", blockNum, "error", err) + } else { + logger.Info("[PROGRESS] advanced to block", "block", blockNum) + } + +} + +func launchValidatorTasks(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, ei *beacon.ExecInfo, beaconBase string, logger *slog.Logger) error { // Async validator pubkey fetch + if ei.ProposerIdx == nil { + return nil + } + + vctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + vpub, err := beacon.FetchValidatorPubkey(vctx, httpc, beaconBase, *ei.ProposerIdx) + if err != nil { + return fmt.Errorf("fetch validator pubkey: %w", err) + } + + if len(vpub) > 0 { + if err := db.UpdateValidatorPubkey(vctx, ei.Slot, vpub); err != nil { + logger.Error("[VALIDATOR] failed to save pubkey", "slot", ei.Slot, "error", err) + } else { + logger.Info("[VALIDATOR] pubkey saved", "proposer", *ei.ProposerIdx, "slot", ei.Slot) + } + } + + // Wait for validator pubkey to be available + getCtx, getCancel := context.WithTimeout(context.Background(), 5*time.Second) + vpk, err := db.GetValidatorPubkeyWithRetry(getCtx, ei.Slot, 3, time.Second) + getCancel() + + if err != nil { + logger.Error("[VALIDATOR] pubkey not available", "slot", ei.Slot, "error", err) + return fmt.Errorf("save validator pubkey: %w", err) + } + + opted, err := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, createOptionsFromCLI(c), ei.BlockNumber, vpk) + if err != nil { + return fmt.Errorf("check opt-in status: %w", err) + } + + updCtx, updCancel := context.WithTimeout(context.Background(), 3*time.Second) + err = db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted) + updCancel() + if err != nil { + return fmt.Errorf("save opt-in status: %w", err) + } else { + logger.Info("[OPT-IN] validator opt-in status", "slot", ei.Slot, "opted_in", opted) + } + return nil + +} + +func startIndexer(c *cli.Context) error { + + initLogger := slog.With("component", "init") + + dbURL := c.String(optionDatabaseURL.Name) + infuraRPC := c.String(optionInfuraRPC.Name) + beaconBase := c.String(optionBeaconBase.Name) + + initLogger.Info("starting blockchain indexer with StarRocks database") + initLogger.Info("configuration loaded", + "block_interval", c.Duration("block-interval"), + "validator_delay", c.Duration("validator-delay")) + ctx := c.Context + + db, err := initializeDatabase(ctx, dbURL, initLogger) + if err != nil { + return err + } + defer func() { + if cerr := db.Close(); cerr != nil { + initLogger.Error("[DB] close failed", "error", cerr) + } + }() + + // Initialize HTTP client + httpc := httputil.NewHTTPClient(c.Duration("http-timeout")) + initLogger.Info("[HTTP] client initialized", "timeout", c.Duration("http-timeout")) + + // Load relay configurations + relays, err := loadRelays(ctx, db, initLogger) + if err != nil { + return err + } + + // Get starting block number + lastBN, err := getStartingBlockNumber(ctx, db, httpc, infuraRPC, initLogger) + if err != nil { + return err + } + + initLogger.Info("starting from block number", "block", lastBN) + initLogger.Info("indexer configuration", "lookback", c.Int("backfill-lookback"), "batch", c.Int("backfill-batch")) + + // Run backfill if configured + go runBackfillIfConfigured(ctx, c, db, httpc, relays, initLogger) + return runMainLoop(ctx, c, db, httpc, relays, infuraRPC, beaconBase, lastBN, initLogger) +} diff --git a/tools/indexer/pkg/backfill/backfill.go b/tools/indexer/pkg/backfill/backfill.go new file mode 100644 index 000000000..a9d31c23c --- /dev/null +++ b/tools/indexer/pkg/backfill/backfill.go @@ -0,0 +1,122 @@ +package backfill + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/hashicorp/go-retryablehttp" + "github.com/primev/mev-commit/tools/indexer/pkg/beacon" + "github.com/primev/mev-commit/tools/indexer/pkg/config" + "github.com/primev/mev-commit/tools/indexer/pkg/database" + "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" + "github.com/primev/mev-commit/tools/indexer/pkg/relay" +) + +type SlotData struct { + Slot int64 + BlockNumber int64 + ValidatorPubkey []byte + ProposerIdx *int64 +} + +func RunAll(ctx context.Context, db *database.DB, httpc *retryablehttp.Client, cfg *config.Config, relays []relay.Row) error { + logger := slog.With("component", "backfill") + logger.Info("Starting streaming backfill") + + if err := ctx.Err(); err != nil { + return err + } + + blocks, err := db.GetRecentMissingBlocks(ctx, cfg.BackfillLookback, cfg.BackfillBatch) + if err != nil { + return fmt.Errorf("get missing blocks: %w", err) + } + + for _, b := range blocks { + if err := ctx.Err(); err != nil { + return err + } + + fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + ei, ferr := beacon.FetchBeaconExecutionBlock(fetchCtx, httpc, cfg.BeaconBase, b.BlockNumber) + cancel() + if ferr != nil || ei == nil { + logger.Error("beacon fetch failed", "block", b.BlockNumber, "error", ferr) + continue + } + + if err := db.UpsertBlockFromExec(ctx, ei); err != nil { + logger.Error("block upsert failed", "slot", ei.Slot, "error", err) + continue + } + + var vpub []byte + if ei.ProposerIdx != nil { + vctx, vcancel := context.WithTimeout(ctx, 5*time.Second) + v, verr := beacon.FetchValidatorPubkey(vctx, httpc, cfg.BeaconBase, *ei.ProposerIdx) + vcancel() + if verr != nil { + logger.Error("validator fetch failed", "slot", ei.Slot, "error", verr) + } else if len(v) > 0 { + vpub = v + + // Save validator pubkey + if err := db.UpdateValidatorPubkey(ctx, ei.Slot, vpub); err != nil { + logger.Error("validator update failed", "slot", ei.Slot, "error", err) + } else { + + opted, oerr := ethereum.CallAreOptedInAtBlock(httpc.HTTPClient, cfg, ei.BlockNumber, vpub) + + if oerr != nil { + logger.Error("opt-in check failed", "slot", ei.Slot, "error", oerr) + } else { + updCtx, updCancel := context.WithTimeout(ctx, 3*time.Second) + if uerr := db.UpdateValidatorOptInStatus(updCtx, ei.Slot, opted); uerr != nil { + logger.Error("opt-in update failed", "slot", ei.Slot, "error", uerr) + } + updCancel() + } + } + } + } + + for _, r := range relays { + if err := ctx.Err(); err != nil { + return err + } + + bctx, bcancel := context.WithTimeout(ctx, 5*time.Second) + bids, berr := relay.FetchBuilderBlocksReceived(bctx, httpc, r.URL, ei.Slot) + bcancel() + if berr != nil { + logger.Debug("bid fetch failed", "slot", ei.Slot, "relay", r.ID, "error", berr) + continue + } + + if len(bids) == 0 { + continue + } + + rows := make([]database.BidRow, 0, len(bids)) + for _, bid := range bids { + if row, ok := relay.BuildBidInsert(ei.Slot, r.ID, bid); ok { + rows = append(rows, row) + } + } + + if len(rows) > 0 { + insCtx, insCancel := context.WithTimeout(ctx, 5*time.Second) + if ierr := db.InsertBidsBatch(insCtx, rows); ierr != nil { + logger.Error("bid insert failed", "slot", ei.Slot, "relay", r.ID, "error", ierr) + } + insCancel() + } + } + logger.Debug("slot processed", "slot", ei.Slot) + } + + logger.Info("Backfill completed", "blocks_processed", len(blocks)) + return nil +} diff --git a/tools/indexer/pkg/beacon/client.go b/tools/indexer/pkg/beacon/client.go new file mode 100644 index 000000000..2b7011143 --- /dev/null +++ b/tools/indexer/pkg/beacon/client.go @@ -0,0 +1,223 @@ +package beacon + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math/big" + "net/http" + "strconv" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/hashicorp/go-retryablehttp" + "github.com/primev/mev-commit/tools/indexer/pkg/ethereum" + httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" +) + +type ExecInfo struct { + BlockNumber int64 + Slot int64 + ProposerIdx *int64 + Timestamp *time.Time + RelayTag *string + BuilderHex *string + FeeRecHex *string + RewardEth *float64 +} + +func FetchBeaconExecutionBlock(ctx context.Context, httpc *retryablehttp.Client, beaconBase string, blockNum int64) (*ExecInfo, error) { + url := fmt.Sprintf("%s/execution/block/%d", beaconBase, blockNum) + + if _, has := ctx.Deadline(); !has { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 5*time.Second) + defer cancel() + } + + var wrap struct { + Data []map[string]any `json:"data"` + } + if err := httputil.FetchJSON(ctx, httpc, url, &wrap); err != nil || len(wrap.Data) == 0 { + return nil, fmt.Errorf("no exec block %d", blockNum) + } + j := wrap.Data[0] + out := &ExecInfo{BlockNumber: blockNum} + + // posConsensus.slot & proposerIndex + if pc, ok := j["posConsensus"].(map[string]any); ok { + if v, ok := pc["slot"].(float64); ok { + out.Slot = int64(v) + } else if s, ok := pc["slot"].(string); ok { + if n, err := strconv.ParseInt(s, 10, 64); err == nil { + out.Slot = n + } + } + if v, ok := pc["proposerIndex"].(float64); ok { + x := int64(v) + out.ProposerIdx = &x + } else if s, ok := pc["proposerIndex"].(string); ok { + if n, err := strconv.ParseInt(s, 10, 64); err == nil { + out.ProposerIdx = &n + } + } + } + + // timestamp + if v, ok := j["timestamp"]; ok { + switch t := v.(type) { + case float64: + u := time.Unix(int64(t), 0).UTC() + out.Timestamp = &u + case string: + if n, err := strconv.ParseInt(t, 10, 64); err == nil { + u := time.Unix(n, 0).UTC() + out.Timestamp = &u + } + } + } + + // relay + if rel, ok := j["relay"].(map[string]any); ok { + if s, ok := rel["tag"].(string); ok { + out.RelayTag = &s + } + if s, ok := rel["builderPubkey"].(string); ok { + out.BuilderHex = &s + } + if s, ok := rel["producerFeeRecipient"].(string); ok { + out.FeeRecHex = &s + } + } + + // reward eth from blockMevReward or producerReward + if v, ok := j["blockMevReward"]; ok { + switch t := v.(type) { + case float64: + f := t + if f > 1e10 { + f = f / 1e18 // wei -> ETH + } + out.RewardEth = &f + case string: + if strings.HasPrefix(t, "0x") { + if bi, ok := new(big.Int).SetString(t[2:], 16); ok { + f, _ := new(big.Rat).SetFrac(bi, big.NewInt(1e18)).Float64() + out.RewardEth = &f + } + } else if f, err := strconv.ParseFloat(t, 64); err == nil { + out.RewardEth = &f + } + } + } else if v, ok := j["producerReward"]; ok { + if f, ok := v.(float64); ok { + out.RewardEth = &f + } + } + + // sanity + if out.Slot == 0 { + return nil, fmt.Errorf("exec block missing posConsensus.slot for %d", blockNum) + } + return out, nil +} + +// validator pubkey from proposer index +func FetchValidatorPubkey(ctx context.Context, httpc *retryablehttp.Client, beaconBase string, proposerIndex int64) ([]byte, error) { + url := fmt.Sprintf("%s/validator/%d", beaconBase, proposerIndex) + + if _, has := ctx.Deadline(); !has { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 5*time.Second) + defer cancel() + } + var resp struct { + Data struct { + Pubkey string `json:"pubkey"` + } `json:"data"` + } + if err := httputil.FetchJSON(ctx, httpc, url, &resp); err != nil { + return nil, err + } + if strings.TrimSpace(resp.Data.Pubkey) == "" { + return nil, fmt.Errorf("validator %d pubkey empty", proposerIndex) + } + return common.FromHex(resp.Data.Pubkey), nil +} + +// to fetch blocks from Alchemy RPC +func fetchBlockFromRPC(httpc *retryablehttp.Client, rpcURL string, blockNumber int64) (*ExecInfo, error) { + underlyingClient := httpc.HTTPClient + // Get block data from Alchemy + payload := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_getBlockByNumber", + "params": []any{fmt.Sprintf("0x%x", blockNumber), true}, // true for full transaction objects + } + + buf, _ := json.Marshal(payload) + req, _ := http.NewRequest("POST", rpcURL, bytes.NewReader(buf)) + req.Header.Set("Content-Type", "application/json") + + resp, err := underlyingClient.Do(req) + if err != nil { + return nil, err + } + defer func() { _ = resp.Body.Close() }() + + var result struct { + Result struct { + Number string `json:"number"` + Timestamp string `json:"timestamp"` + Miner string `json:"miner"` + } `json:"result"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, err + } + + if result.Result.Number == "" { + return nil, fmt.Errorf("block not found") + } + + // Convert hex timestamp to time + timestampHex := result.Result.Timestamp[2:] // Remove 0x + timestamp, _ := strconv.ParseInt(timestampHex, 16, 64) + blockTime := time.Unix(timestamp, 0) + + return &ExecInfo{ + BlockNumber: blockNumber, + Timestamp: &blockTime, + }, nil +} +func FetchCombinedBlockData(ctx context.Context, httpc *retryablehttp.Client, rpcURL string, beaconBase string, blockNumber int64) (*ExecInfo, error) { + // Get execution block from Alchemy (always available) + execBlock, err := fetchBlockFromRPC(httpc, rpcURL, blockNumber) + if err != nil { + return nil, err + } + + // Convert block number to slot for beacon chain query + slotNumber := ethereum.BlockNumberToSlot(blockNumber) + + beaconData, _ := FetchBeaconExecutionBlock(ctx, httpc, beaconBase, blockNumber) + + // Merge data - use Alchemy as primary, beacon as supplement + if beaconData != nil { + execBlock.Slot = beaconData.Slot + execBlock.ProposerIdx = beaconData.ProposerIdx + execBlock.RelayTag = beaconData.RelayTag + execBlock.RewardEth = beaconData.RewardEth + execBlock.BuilderHex = beaconData.BuilderHex + execBlock.FeeRecHex = beaconData.FeeRecHex + } else { + + execBlock.Slot = slotNumber + } + + return execBlock, nil +} diff --git a/tools/indexer/pkg/config/config.go b/tools/indexer/pkg/config/config.go new file mode 100644 index 000000000..227e30610 --- /dev/null +++ b/tools/indexer/pkg/config/config.go @@ -0,0 +1,34 @@ +package config + +import ( + "time" +) + +type Relay struct { + Relay_id int64 + Name string + Tag string + URL string +} + +var RelaysDefault = []Relay{ + {Relay_id: 1, Name: "Titan", Tag: "titan-relay", URL: "https://regional.titanrelay.xyz"}, + {Relay_id: 2, Name: "Aestus", Tag: "aestus-relay", URL: "https://aestus.live"}, + {Relay_id: 3, Name: "Bloxroute Max Profit", Tag: "bloxroute-max-profit-relay", URL: "https://bloxroute.max-profit.blxrbdn.com"}, + {Relay_id: 4, Name: "Bloxroute Regulated", Tag: "bloxroute-regulated-relay", URL: "https://bloxroute.regulated.blxrbdn.com"}, +} + +type Config struct { + BlockTick time.Duration + ValidatorWait time.Duration + BackfillEvery time.Duration + BackfillLookback int64 + BackfillBatch int + MaxRetries int + BaseRetryDelay time.Duration + HTTPTimeout time.Duration + OptInContract string + EtherscanKey string + InfuraRPC string + BeaconBase string +} diff --git a/tools/indexer/pkg/database/starrock.go b/tools/indexer/pkg/database/starrock.go new file mode 100644 index 000000000..da4929d89 --- /dev/null +++ b/tools/indexer/pkg/database/starrock.go @@ -0,0 +1,455 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + _ "github.com/go-sql-driver/mysql" + "github.com/primev/mev-commit/tools/indexer/pkg/beacon" + "github.com/primev/mev-commit/tools/indexer/pkg/config" +) + +type DB struct { + conn *sql.DB +} +type BidInsert struct { + Slot int64 + RelayID int64 + BuilderHex string + ProposerHex string + FeeRecHex string + ValStr string + BlockNum *int64 + TsMS *int64 +} + +func Connect(ctx context.Context, dsn string, maxConns, minConns int) (*DB, error) { + conn, err := sql.Open("mysql", dsn) + if err != nil { + return nil, fmt.Errorf("failed to open StarRocks connection: %w", err) + } + + // Configure connection pool + conn.SetMaxOpenConns(maxConns) + conn.SetMaxIdleConns(minConns) + conn.SetConnMaxLifetime(time.Hour) + conn.SetConnMaxIdleTime(30 * time.Minute) + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := conn.PingContext(pingCtx); err != nil { + _ = conn.Close() + + return nil, fmt.Errorf("StarRocks ping failed: %v", err) + } + + return &DB{conn: conn}, nil + +} +func (db *DB) Close() error { + return db.conn.Close() +} + +func (db *DB) EnsureStateTable(ctx context.Context) error { + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + ddl := ` + CREATE TABLE IF NOT EXISTS ingestor_state ( + id TINYINT, + last_block_number BIGINT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + )` + + if _, err := db.conn.ExecContext(ctx2, ddl); err != nil { + return fmt.Errorf("failed to create state table: %w", err) + } + + var count int + err := db.conn.QueryRowContext(ctx2, `SELECT COUNT(*) FROM ingestor_state WHERE id = 1`).Scan(&count) + if err != nil || count == 0 { + _, err = db.conn.ExecContext(ctx2, + `INSERT INTO ingestor_state (id, last_block_number) VALUES (1, 0)`) + if err != nil { + return fmt.Errorf("failed to insert initial state: %w", err) + } + } + + return nil +} +func (db *DB) GetMaxBlockNumber(ctx context.Context) (int64, error) { + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var bn int64 + err := db.conn.QueryRowContext(ctx2, `SELECT COALESCE(MAX(block_number),0) FROM blocks`).Scan(&bn) + return bn, err +} +func (db *DB) GetValidatorPubkey(ctx context.Context, slot int64) ([]byte, error) { + ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + var vpk []byte + err := db.conn.QueryRowContext(ctx2, `SELECT validator_pubkey FROM blocks WHERE slot=?`, slot).Scan(&vpk) + return vpk, err +} +func (db *DB) LoadLastBlockNumber(ctx context.Context) (int64, bool) { + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var bn int64 + err := db.conn.QueryRowContext(ctx2, + `SELECT last_block_number FROM ingestor_state WHERE id = 1 LIMIT 1`).Scan(&bn) + if err != nil { + return 0, false + } + return bn, true +} + +func (db *DB) SaveLastBlockNumber(ctx context.Context, bn int64) error { + ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + q2 := fmt.Sprintf(`INSERT INTO ingestor_state (id, last_block_number) VALUES (1, %d)`, bn) + if _, err := db.conn.ExecContext(ctx2, q2); err != nil { + return fmt.Errorf("save last_block_number failed (insert): %w", err) + } + + return nil +} + +func (db *DB) UpsertRelays(ctx context.Context, relays []config.Relay) error { + if len(relays) == 0 { + return nil + } + + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // StarRocks batch insert approach + var values []string + for _, r := range relays { + value := fmt.Sprintf("(%d, '%s', '%s', '%s', 1)", r.Relay_id, r.Name, r.Tag, r.URL) + values = append(values, value) + } + + query := fmt.Sprintf(`INSERT INTO relays (relay_id, name, tag, base_url, is_active) VALUES %s`, + strings.Join(values, ",")) + + _, err := db.conn.ExecContext(ctx2, query) + return err +} + +func (db *DB) UpsertBlockFromExec(ctx context.Context, ei *beacon.ExecInfo) error { + if ei == nil || ei.BlockNumber == 0 { + return fmt.Errorf("upsert block: nil exec info or block_number=0") + } + + ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + var timestamp, proposerIndex, relayTag, rewardEth string + + if ei.Timestamp != nil { + timestamp = fmt.Sprintf("'%s'", ei.Timestamp.Format("2006-01-02 15:04:05")) + } else { + timestamp = "NULL" + } + + if ei.ProposerIdx != nil { + proposerIndex = fmt.Sprintf("%d", *ei.ProposerIdx) + } else { + proposerIndex = "NULL" + } + + if ei.RelayTag != nil { + relayTag = fmt.Sprintf("'%s'", *ei.RelayTag) + } else { + relayTag = "NULL" + } + + if ei.RewardEth != nil { + rewardEth = fmt.Sprintf("%.6f", *ei.RewardEth) + } else { + rewardEth = "NULL" + } + + query := fmt.Sprintf(` +INSERT INTO blocks( + slot, block_number, timestamp, proposer_index, + winning_relay, producer_reward_eth +) VALUES (%d, %d, %s, %s, %s, %s)`, + ei.Slot, ei.BlockNumber, timestamp, proposerIndex, relayTag, rewardEth) + + _, err := db.conn.ExecContext(ctx2, query) + if err != nil { + return fmt.Errorf("upsert block slot=%d: %w", ei.Slot, err) + } + return nil +} + +func (db *DB) UpdateValidatorPubkey(ctx context.Context, slot int64, vpub []byte) error { + if slot == 0 { + return fmt.Errorf("update validator: slot=0") + } + if len(vpub) == 0 { + return nil + } + + ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + vhex := hexutil.Encode(vpub) + + q := fmt.Sprintf("INSERT INTO blocks (slot, validator_pubkey) VALUES (%d, '%s')", slot, vhex) + + if _, err := db.conn.ExecContext(ctx2, q); err != nil { + return fmt.Errorf("update validator slot=%d: %w", slot, err) + } + + return nil +} + +// Minimal batching: builds one multi-VALUES INSERT. + +type BidRow struct { + Slot, RelayID int64 + Builder, Proposer, FeeRec string + ValStr string + BlockNum, TsMS *int64 +} + +func (db *DB) InsertBidsBatch(ctx context.Context, rows []BidRow) error { + if len(rows) == 0 { + return nil + } + + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var sb strings.Builder + sb.WriteString(` + INSERT INTO bids( + slot, relay_id, builder_pubkey, proposer_pubkey, + proposer_fee_recipient, value_wei, block_number, timestamp_ms + ) VALUES `) + + for i, r := range rows { + if i > 0 { + sb.WriteString(",") + } + + blockNumSQL := "NULL" + if r.BlockNum != nil { + blockNumSQL = fmt.Sprintf("%d", *r.BlockNum) + } + + tsMSSQL := "NULL" + if r.TsMS != nil { + tsMSSQL = fmt.Sprintf("%d", *r.TsMS) + } + + fmt.Fprintf(&sb, "(%d,%d,'%s','%s','%s','%s',%s,%s)", + r.Slot, r.RelayID, r.Builder, r.Proposer, r.FeeRec, r.ValStr, blockNumSQL, tsMSSQL) + } + + _, err := db.conn.ExecContext(ctx2, sb.String()) + return err +} + +func (db *DB) GetActiveRelays(ctx context.Context) ([]struct { + ID int64 + URL string +}, error) { + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + rows, err := db.conn.QueryContext(ctx2, `SELECT relay_id, base_url FROM relays WHERE is_active = 1`) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var results []struct { + ID int64 + URL string + } + for rows.Next() { + var id int64 + var url string + if err := rows.Scan(&id, &url); err != nil { + continue // Skip bad rows + } + results = append(results, struct { + ID int64 + URL string + }{ID: id, URL: url}) + } + return results, rows.Err() +} + +func (db *DB) GetRecentMissingBlocks(ctx context.Context, lookback int64, batch int) ([]struct { + Slot int64 + BlockNumber int64 +}, error) { + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + if lookback < 0 || batch < 0 || batch > 10000 { + return nil, fmt.Errorf("invalid parameters: lookback=%d, batch=%d", lookback, batch) + } + + query := fmt.Sprintf(` + WITH recent AS ( + SELECT COALESCE(MAX(slot), 0) AS s FROM blocks + ) + SELECT slot, block_number + FROM blocks, recent + WHERE slot > recent.s - %d + AND block_number IS NOT NULL + AND (winning_relay IS NULL + OR winning_builder_pubkey IS NULL + OR fee_recipient IS NULL + OR producer_reward_eth IS NULL + OR timestamp IS NULL + OR proposer_index IS NULL) + ORDER BY slot DESC + LIMIT %d`, lookback, batch) + + rows, err := db.conn.QueryContext(ctx2, query) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var results []struct { + Slot int64 + BlockNumber int64 + } + for rows.Next() { + var slot, bn int64 + if err := rows.Scan(&slot, &bn); err != nil { + continue + } + results = append(results, struct { + Slot int64 + BlockNumber int64 + }{Slot: slot, BlockNumber: bn}) + } + return results, rows.Err() +} + +func (db *DB) GetRecentSlotsWithBlocks(ctx context.Context, lookback int64, batch int) ([]int64, error) { + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + q := fmt.Sprintf(` +WITH recent AS (SELECT COALESCE(MAX(slot),0) AS s FROM blocks) +SELECT DISTINCT slot +FROM blocks, recent +WHERE slot > recent.s - ? + AND block_number IS NOT NULL +ORDER BY slot DESC +LIMIT %d`, batch) + rows, err := db.conn.QueryContext(ctx2, q, lookback) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var slots []int64 + for rows.Next() { + var slot int64 + if err := rows.Scan(&slot); err != nil { + continue + } + slots = append(slots, slot) + } + return slots, rows.Err() +} + +func (db *DB) GetValidatorsNeedingOptInCheck(ctx context.Context, lookback int64, batch int) ([]struct { + Slot int64 + BlockNumber int64 + ValidatorPubkey []byte +}, error) { + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + q := fmt.Sprintf(` +WITH recent AS (SELECT COALESCE(MAX(slot),0) AS s FROM blocks) +SELECT slot, block_number, validator_pubkey +FROM blocks, recent +WHERE slot > recent.s - ? + AND block_number IS NOT NULL + AND validator_pubkey IS NOT NULL + AND validator_opted_in IS NULL +ORDER BY slot DESC +LIMIT %d`, batch) + rows, err := db.conn.QueryContext(ctx2, q, lookback) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var results []struct { + Slot int64 + BlockNumber int64 + ValidatorPubkey []byte + } + for rows.Next() { + var slot, bn int64 + var vpk []byte + if err := rows.Scan(&slot, &bn, &vpk); err != nil { + continue + } + results = append(results, struct { + Slot int64 + BlockNumber int64 + ValidatorPubkey []byte + }{ + Slot: slot, BlockNumber: bn, ValidatorPubkey: vpk, + }) + } + return results, rows.Err() +} + +func (db *DB) UpdateValidatorOptInStatus(ctx context.Context, slot int64, opted bool) error { + ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + v := 0 + if opted { + v = 1 + } // TINYINT(1) in StarRocks + q := fmt.Sprintf( + "UPDATE blocks SET validator_opted_in=%d WHERE slot=%d AND validator_opted_in IS NULL", + v, slot, + ) + _, err := db.conn.ExecContext(ctx2, q) + return err +} + +func (db *DB) GetValidatorPubkeyWithRetry(ctx context.Context, slot int64, retries int, retryDelay time.Duration) ([]byte, error) { + var vpk []byte + for i := 0; i < retries; i++ { + ctx2, cancel := context.WithTimeout(ctx, 3*time.Second) + err := db.conn.QueryRowContext(ctx2, `SELECT validator_pubkey FROM blocks WHERE slot=?`, slot).Scan(&vpk) + cancel() + + if err == nil && len(vpk) > 0 { + return vpk, nil + } + + if i < retries-1 { + time.Sleep(retryDelay) + } + } + return nil, fmt.Errorf("validator pubkey not available after %d retries", retries) +} diff --git a/tools/indexer/pkg/ethereum/client.go b/tools/indexer/pkg/ethereum/client.go new file mode 100644 index 000000000..31b3b8d71 --- /dev/null +++ b/tools/indexer/pkg/ethereum/client.go @@ -0,0 +1,76 @@ +package ethereum + +import ( + "bytes" + + "encoding/json" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/primev/mev-commit/tools/indexer/pkg/config" + + "net/http" + "strconv" + + "github.com/primev/mev-commit/contracts-abi/clients/ValidatorOptInRouter" +) + +func CallAreOptedInAtBlock(httpc *http.Client, cfg *config.Config, blockNum int64, pubkey []byte) (bool, error) { + if len(pubkey) == 0 { + return false, fmt.Errorf("empty pubkey") + } + client, err := ethclient.Dial(cfg.InfuraRPC) + if err != nil { + return false, err + } + contract, err := validatoroptinrouter.NewValidatoroptinrouter(common.HexToAddress(cfg.OptInContract), client) + if err != nil { + return false, err + } + + result, err := contract.AreValidatorsOptedIn(&bind.CallOpts{BlockNumber: big.NewInt(blockNum)}, [][]byte{pubkey}) + if err != nil { + return false, err + } + + if len(result) == 0 { + return false, nil + } + o := result[0] + return o.IsVanillaOptedIn || o.IsAvsOptedIn || o.IsMiddlewareOptedIn, nil +} + +// GetLatestBlockNumber gets the latest block number from Ethereum RPC +func GetLatestBlockNumber(httpc *http.Client, rpcURL string) (int64, error) { + payload := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_blockNumber", + "params": []any{}, + } + + buf, _ := json.Marshal(payload) + req, _ := http.NewRequest("POST", rpcURL, bytes.NewReader(buf)) + req.Header.Set("Content-Type", "application/json") + + resp, err := httpc.Do(req) + if err != nil { + return 0, err + } + defer func() { _ = resp.Body.Close() }() + + var result struct { + Result string `json:"result"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return 0, err + } + + // Convert hex to int64 + blockNum, err := strconv.ParseInt(result.Result[2:], 16, 64) + return blockNum, err +} diff --git a/tools/indexer/pkg/ethereum/conversions.go b/tools/indexer/pkg/ethereum/conversions.go new file mode 100644 index 000000000..962bacf5a --- /dev/null +++ b/tools/indexer/pkg/ethereum/conversions.go @@ -0,0 +1,14 @@ +package ethereum + +func BlockNumberToSlot(blockNumber int64) int64 { + // Ethereum mainnet merge happened at slot 4700013 (block 15537394) + const MERGE_BLOCK = 15537394 + const MERGE_SLOT = 4700013 + + if blockNumber < MERGE_BLOCK { + return 0 // Pre-merge blocks don't have valid slots + } + + // Post-merge: roughly 1 slot per block + return MERGE_SLOT + (blockNumber - MERGE_BLOCK) +} diff --git a/tools/indexer/pkg/http/client.go b/tools/indexer/pkg/http/client.go new file mode 100644 index 000000000..945d6d451 --- /dev/null +++ b/tools/indexer/pkg/http/client.go @@ -0,0 +1,39 @@ +package http + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/hashicorp/go-retryablehttp" +) + +func NewHTTPClient(timeout time.Duration) *retryablehttp.Client { + client := retryablehttp.NewClient() + client.HTTPClient.Timeout = timeout + client.RetryMax = 3 + client.RetryWaitMin = 200 * time.Millisecond + client.RetryWaitMax = 2 * time.Second + client.Logger = nil + return client +} + +func FetchJSON(ctx context.Context, client *retryablehttp.Client, url string, out any) error { + req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return err + } + + resp, err := client.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != 200 { + return fmt.Errorf("HTTP %d", resp.StatusCode) + } + + return json.NewDecoder(resp.Body).Decode(out) +} diff --git a/tools/indexer/pkg/relay/client.go b/tools/indexer/pkg/relay/client.go new file mode 100644 index 000000000..1eafb7d9f --- /dev/null +++ b/tools/indexer/pkg/relay/client.go @@ -0,0 +1,159 @@ +package relay + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/hashicorp/go-retryablehttp" + "github.com/primev/mev-commit/tools/indexer/pkg/config" + + "github.com/primev/mev-commit/tools/indexer/pkg/database" + httputil "github.com/primev/mev-commit/tools/indexer/pkg/http" + + "strconv" +) + +type Row struct { + ID int64 + URL string +} + +func parseBigString(v any) (string, bool) { + switch t := v.(type) { + case nil: + return "", false + case string: + z := strings.ReplaceAll(strings.TrimSpace(t), ",", "") + if z == "" { + return "", false + } + + if strings.HasPrefix(z, "0x") || strings.HasPrefix(z, "0X") { + bi, err := hexutil.DecodeBig(z) + if err != nil { + return "", false + } + return bi.String(), true + } + + // For decimal strings + if _, ok := new(big.Int).SetString(z, 10); ok { + return z, true + } + return "", false + case float64: + return strconv.FormatFloat(t, 'f', 0, 64), true + case json.Number: + return t.String(), true + default: + return fmt.Sprintf("%v", t), true + } +} + +func BuildBidInsert(slot int64, relayID int64, bid map[string]any) (database.BidRow, bool) { + + if slot <= 0 || relayID <= 0 { + return database.BidRow{}, false + } + + // helper to read alternative keys from different relay schemas + get := func(keys ...string) any { + for _, k := range keys { + if v, ok := bid[k]; ok { + return v + } + } + return nil + } + + // Parse fields + builder := common.FromHex(fmt.Sprint(get("builder_pubkey", "builderPubkey", "builder"))) + proposer := common.FromHex(fmt.Sprint(get("proposer_pubkey", "proposerPubkey"))) + feeRec := common.FromHex(fmt.Sprint(get("proposer_fee_recipient", "proposerFeeRecipient", "feeRecipient"))) + + valStr, ok := parseBigString(get("value", "value_wei", "valueWei")) + if !ok || valStr == "" { + return database.BidRow{}, false // skip if no value + } + + var blockNum *int64 + if v := get("block_number", "blockNumber"); v != nil { + switch t := v.(type) { + case float64: + x := int64(t) + blockNum = &x + case string: + if strings.HasPrefix(t, "0x") || strings.HasPrefix(t, "0X") { + if bi, ok := new(big.Int).SetString(t[2:], 16); ok { + x := bi.Int64() + blockNum = &x + } + } else if n, err := strconv.ParseInt(t, 10, 64); err == nil { + blockNum = &n + } + } + } + + var tsMS *int64 + if v := get("timestamp_ms", "timestampMs", "time_ms", "time"); v != nil { + switch t := v.(type) { + case float64: + x := int64(t) + tsMS = &x + case string: + if n, err := strconv.ParseInt(t, 10, 64); err == nil { + tsMS = &n + } + } + } + + return database.BidRow{ + Slot: slot, + RelayID: relayID, + Builder: hexutil.Encode(builder), + Proposer: hexutil.Encode(proposer), + FeeRec: hexutil.Encode(feeRec), + ValStr: valStr, + BlockNum: blockNum, + TsMS: tsMS, + }, true +} + +func UpsertRelaysAndLoad(ctx context.Context, db *database.DB) ([]Row, error) { + // upsert defaults from code + if err := db.UpsertRelays(ctx, config.RelaysDefault); err != nil { + return nil, err + } + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + dbResults, err := db.GetActiveRelays(ctx2) + if err != nil { + return nil, err + } + + var rws []Row + for _, result := range dbResults { + rws = append(rws, Row{ID: result.ID, URL: result.URL}) + } + return rws, nil +} + +func FetchBuilderBlocksReceived(ctx context.Context, httpc *retryablehttp.Client, relayBase string, slot int64) ([]map[string]any, error) { + url := fmt.Sprintf("%s/relay/v1/data/bidtraces/builder_blocks_received?slot=%d", strings.TrimRight(relayBase, "/"), slot) + + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + var arr []map[string]any + if err := httputil.FetchJSON(ctx2, httpc, url, &arr); err != nil { + return nil, err + } + + return arr, nil +}