Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
4498ed2
init
shaspitz Sep 10, 2025
4262039
address todo
shaspitz Sep 10, 2025
ed55b2e
wip
shaspitz Sep 10, 2025
7d6417e
prog
shaspitz Sep 10, 2025
39e8fb0
fix tests
shaspitz Sep 10, 2025
c39866d
Update follower_test.go
shaspitz Sep 10, 2025
d493d47
Update follower_test.go
shaspitz Sep 10, 2025
4dfae98
TestFollower_syncFromSharedDB_MultipleIterations
shaspitz Sep 10, 2025
701825e
Update follower_test.go
shaspitz Sep 10, 2025
68ea80a
TestFollower_queryPayloadsFromSharedDB
shaspitz Sep 10, 2025
40c5361
go mod sync
shaspitz Sep 10, 2025
5877785
Update follower_test.go
shaspitz Sep 10, 2025
205c84d
sleepRespectingContext
shaspitz Sep 10, 2025
afa6e58
fix TestFollower_Start_SimulateNewChain
shaspitz Sep 10, 2025
4a3aeab
TestFollower_Start_SyncExistingChain
shaspitz Sep 10, 2025
c44bd1c
fix races
shaspitz Sep 10, 2025
564ae76
init engine api
shaspitz Sep 11, 2025
39d1b6c
integrate block builder
shaspitz Sep 11, 2025
71333e2
Update main.go
shaspitz Sep 11, 2025
3d50c89
go mod tidy
shaspitz Sep 11, 2025
f3ad52e
single function for reading db
shaspitz Sep 11, 2025
66f7004
rm lastProcessed
shaspitz Sep 11, 2025
9728b3f
GetLatestHeight doesnt return pointer
shaspitz Sep 11, 2025
3b53161
cleanup main.go
shaspitz Sep 11, 2025
589edba
rm channel buffer
shaspitz Sep 11, 2025
decdcef
go mod tidy
shaspitz Sep 11, 2025
3ba9a0d
Update follower.go
shaspitz Sep 12, 2025
51265ee
sync test
shaspitz Sep 12, 2025
ba862f4
dont reinit postgres
shaspitz Sep 12, 2025
1b70643
logs
shaspitz Sep 12, 2025
fc1fd84
debugs
shaspitz Sep 12, 2025
1319bad
Update follower.go
shaspitz Sep 12, 2025
6f21ca7
Merge branch 'main' into follower
kant777 Sep 12, 2025
7dd4ac3
remove stale store code
shaspitz Sep 12, 2025
ccd85cc
return errors
shaspitz Sep 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cl/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
if bb.executionHead == nil {
bb.logger.Info("executionHead is nil, it'll be set by RPC. CL is likely being restarted")
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
innerErr := bb.setExecutionHeadFromRPC(ctx)
innerErr := bb.SetExecutionHeadFromRPC(ctx)
if innerErr != nil {
bb.logger.Warn(
"Failed to set execution head from rpc, retrying...",
Expand Down Expand Up @@ -493,7 +493,7 @@ func (bb *BlockBuilder) updateForkChoice(ctx context.Context, fcs engine.Forkcho
})
}

func (bb *BlockBuilder) setExecutionHeadFromRPC(ctx context.Context) error {
func (bb *BlockBuilder) SetExecutionHeadFromRPC(ctx context.Context) error {
header, err := bb.engineCl.HeaderByNumber(ctx, nil) // nil for the latest block
if err != nil {
return fmt.Errorf("failed to get the latest block header: %w", err)
Expand Down
126 changes: 66 additions & 60 deletions cl/cmd/singlenode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"syscall"
"time"

"github.com/primev/mev-commit/cl/blockbuilder"
"github.com/primev/mev-commit/cl/ethclient"
"github.com/primev/mev-commit/cl/singlenode"
"github.com/primev/mev-commit/cl/singlenode/membernode"
"github.com/primev/mev-commit/cl/singlenode/follower"
"github.com/primev/mev-commit/cl/singlenode/payloadstore"
"github.com/primev/mev-commit/x/util"
"github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
Expand Down Expand Up @@ -211,29 +214,12 @@ var (
Value: 5 * time.Millisecond,
})

// Member node specific flags
leaderAPIURLFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "leader-api-url",
Usage: "Leader node API URL for member nodes (e.g., 'http://leader:9090')",
EnvVars: []string{"MEMBER_LEADER_API_URL"},
Category: categoryMember,
Action: func(_ *cli.Context, s string) error {
if s == "" {
return nil // Will be validated in member command
}
if _, err := url.Parse(s); err != nil {
return fmt.Errorf("invalid leader-api-url: %v", err)
}
return nil
},
})

pollIntervalFlag = altsrc.NewDurationFlag(&cli.DurationFlag{
Name: "poll-interval",
Usage: "Interval for polling leader node for new payloads (e.g., '1s')",
EnvVars: []string{"MEMBER_POLL_INTERVAL"},
Value: 1 * time.Second,
Category: categoryMember,
// Follower node specific flags
syncBatchSizeFlag = altsrc.NewUint64Flag(&cli.Uint64Flag{
Name: "sync-batch-size",
Usage: "Number of payloads per request to the EL during sync",
EnvVars: []string{"FOLLOWER_SYNC_BATCH_SIZE"},
Value: 100,
})
)

Expand All @@ -256,7 +242,7 @@ func main() {
txPoolPollingIntervalFlag,
}

memberFlags := []cli.Flag{
followerFlags := []cli.Flag{
configFlag,
instanceIDFlag,
ethClientURLFlag,
Expand All @@ -265,8 +251,8 @@ func main() {
logLevelFlag,
logTagsFlag,
healthAddrPortFlag,
leaderAPIURLFlag,
pollIntervalFlag,
postgresDSNFlag,
syncBatchSizeFlag,
}

app := &cli.App{
Expand All @@ -290,10 +276,10 @@ func main() {
},
},
{
Name: "member",
Name: "follower",
Usage: "Start as member node (follows leader)",
Flags: memberFlags,
Before: altsrc.InitInputSourceWithContext(memberFlags,
Flags: followerFlags,
Before: altsrc.InitInputSourceWithContext(followerFlags,
func(c *cli.Context) (altsrc.InputSourceContext, error) {
configFile := c.String(configFlag.Name)
if configFile != "" {
Expand All @@ -302,7 +288,7 @@ func main() {
return &altsrc.MapInputSource{}, nil
}),
Action: func(c *cli.Context) error {
return startMemberNode(c)
return startFollowerNode(c)
},
},
// Keep the old "start" command for backward compatibility
Expand Down Expand Up @@ -380,12 +366,7 @@ func startLeaderNode(c *cli.Context) error {
return nil
}

func startMemberNode(c *cli.Context) error {
leaderURL := c.String(leaderAPIURLFlag.Name)
if leaderURL == "" {
return fmt.Errorf("leader-api-url is required for member nodes")
}

func startFollowerNode(c *cli.Context) error {
logger, err := util.NewLogger(
c.String(logLevelFlag.Name),
c.String(logFmtFlag.Name),
Expand All @@ -395,36 +376,61 @@ func startMemberNode(c *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to create logger: %w", err)
}
logger = logger.With("app", "snode", "role", "member")

cfg := membernode.Config{
InstanceID: c.String(instanceIDFlag.Name),
LeaderAPIURL: leaderURL,
EthClientURL: c.String(ethClientURLFlag.Name),
JWTSecret: c.String(jwtSecretFlag.Name),
HealthAddr: c.String(healthAddrPortFlag.Name),
PollInterval: c.Duration(pollIntervalFlag.Name),
}
logger = logger.With("app", "snode", "role", "follower")

logger.Info("Starting member node", "config", cfg)
logger.Info("Starting follower node")

// Create a root context that can be cancelled for graceful shutdown
rootCtx, rootCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer rootCancel()

memberNode, err := membernode.NewMemberNodeApp(rootCtx, cfg, logger)
postgresDSN := c.String(postgresDSNFlag.Name)
if postgresDSN == "" {
return fmt.Errorf("postgresDSN is required")
}
repo, err := payloadstore.NewPostgresFollower(rootCtx, postgresDSN, logger)
if err != nil {
logger.Error("Failed to initialize MemberNodeApp", "error", err)
return err
return fmt.Errorf("failed to initialize payload repository: %w", err)
}
syncBatchSize := c.Uint64(syncBatchSizeFlag.Name)
if syncBatchSize == 0 {
return fmt.Errorf("sync-batch-size is required")
}
ethClientURL := c.String(ethClientURLFlag.Name)
if ethClientURL == "" {
return fmt.Errorf("eth-client-url is required")
}
jwtSecret := c.String(jwtSecretFlag.Name)
if jwtSecret == "" {
return fmt.Errorf("jwt-secret is required")
}
jwtBytes, err := hex.DecodeString(jwtSecret)
if err != nil {
return fmt.Errorf("failed to decode JWT secret: %w", err)
}
engineCL, err := ethclient.NewAuthClient(rootCtx, ethClientURL, jwtBytes)
if err != nil {
return fmt.Errorf("failed to create Ethereum engine client: %w", err)
}
bb := blockbuilder.NewMemberBlockBuilder(engineCL, logger.With("component", "BlockBuilder"))

memberNode.Start()

<-rootCtx.Done()

logger.Info("Shutdown signal received, stopping member node...")
memberNode.Stop()
followerNode, err := follower.NewFollower(
logger,
repo,
syncBatchSize,
bb,
)
if err != nil {
logger.Error("Failed to initialize Follower", "error", err)
return err
}

logger.Info("Member node shutdown completed.")
return nil
done := followerNode.Start(rootCtx)
select {
case <-done:
logger.Info("Follower node shutdown completed.")
return nil
case <-rootCtx.Done():
logger.Info("Follower node shutdown completed.")
return nil
}
}
14 changes: 8 additions & 6 deletions cl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/lib/pq v1.10.9
github.com/redis/go-redis/v9 v9.6.1
github.com/urfave/cli/v2 v2.27.5
golang.org/x/sync v0.11.0
golang.org/x/tools v0.29.0
)

Expand All @@ -34,7 +35,9 @@ require (
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/holiman/uint256 v1.3.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.11 // indirect
Expand All @@ -44,6 +47,7 @@ require (
github.com/pion/transport/v2 v2.2.5 // indirect
github.com/pion/transport/v3 v3.0.2 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rs/cors v1.8.3 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/stretchr/objx v0.5.2 // indirect
Expand All @@ -52,8 +56,8 @@ require (
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/sync v0.11.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)

Expand All @@ -64,23 +68,21 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/ethereum/go-ethereum v1.15.11
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/heyvito/go-leader v0.1.0
github.com/klauspost/compress v1.17.9 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/primev/mev-commit/x v0.0.0-20241029202458-b151c03fa49e
github.com/primev/mev-commit/x v0.0.1
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rs/cors v1.8.3 // indirect
github.com/stretchr/testify v1.10.0
github.com/vmihailenco/msgpack/v5 v5.4.1
golang.org/x/crypto v0.35.0 // indirect
golang.org/x/sys v0.30.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/primev/mev-commit/x => ../x
2 changes: 0 additions & 2 deletions cl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/primev/mev-commit/x v0.0.0-20241029202458-b151c03fa49e h1:4UCC8PDllbWQNGuliOw3rxcmH79CUn6+xZhVGx3qZnQ=
github.com/primev/mev-commit/x v0.0.0-20241029202458-b151c03fa49e/go.mod h1:EEJMyKLa7ZLqTghwo1PlvHJPDW4MVl5WzBnhE6f5jDM=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
Expand Down
19 changes: 19 additions & 0 deletions cl/singlenode/follower/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package follower

import (
"context"

"github.com/primev/mev-commit/cl/types"
)

func (f *Follower) PayloadCh() <-chan types.PayloadInfo {
return f.payloadCh
}

func (f *Follower) SyncFromSharedDB(ctx context.Context) error {
return f.syncFromSharedDB(ctx)
}

func (f *Follower) GetExecutionHead() *types.ExecutionHead {
return f.getExecutionHead()
}
Loading
Loading