From 143a492b107a43378be71b2f7a5e203aa79b60dc Mon Sep 17 00:00:00 2001 From: Alok Date: Wed, 17 Sep 2025 13:10:40 +0530 Subject: [PATCH 1/6] fix: payload update handling in CL --- cl/singlenode/singlenode.go | 122 +++++++++++++++++++++---------- cl/singlenode/singlenode_test.go | 13 ++-- 2 files changed, 89 insertions(+), 46 deletions(-) diff --git a/cl/singlenode/singlenode.go b/cl/singlenode/singlenode.go index 5dc5c1380..a3bc0ccfc 100644 --- a/cl/singlenode/singlenode.go +++ b/cl/singlenode/singlenode.go @@ -18,6 +18,7 @@ import ( "github.com/primev/mev-commit/cl/singlenode/payloadstore" localstate "github.com/primev/mev-commit/cl/singlenode/state" "github.com/primev/mev-commit/cl/types" + "github.com/primev/mev-commit/cl/util" ) const ( @@ -53,15 +54,18 @@ type SingleNodeApp struct { blockBuilder BlockBuilder // stateManager is a local state manager for block production // it's not anticipated to use DB as all the state already in geth client - stateManager *localstate.LocalStateManager - payloadRepo types.PayloadRepository - payloadServer *api.PayloadServer - appCtx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - connectionStatus sync.Mutex - connectionRefused bool - rpcClient *rpc.Client + stateManager *localstate.LocalStateManager + payloadRepo types.PayloadRepository + payloadServer *api.PayloadServer + appCtx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + connectionStatus sync.Mutex + connectionRefused bool + rpcClient *rpc.Client + payloadChan chan *types.PayloadInfo + runLoopStopped chan struct{} + payloadUploadStopped chan struct{} } // NewSingleNodeApp creates and initializes a new SingleNodeApp. @@ -144,16 +148,19 @@ func NewSingleNodeApp( } return &SingleNodeApp{ - logger: logger, - cfg: cfg, - blockBuilder: bb, - stateManager: stateMgr, - payloadRepo: pRepo, - payloadServer: payloadServer, - appCtx: ctx, - cancel: cancel, - connectionRefused: false, - rpcClient: rpcClient, + logger: logger, + cfg: cfg, + blockBuilder: bb, + stateManager: stateMgr, + payloadRepo: pRepo, + payloadServer: payloadServer, + appCtx: ctx, + cancel: cancel, + connectionRefused: false, + rpcClient: rpcClient, + payloadChan: make(chan *types.PayloadInfo, 10), + runLoopStopped: make(chan struct{}), + payloadUploadStopped: make(chan struct{}), }, nil } @@ -201,6 +208,16 @@ func (app *SingleNodeApp) healthHandler(w http.ResponseWriter, r *http.Request) return } + select { + case <-app.runLoopStopped: + http.Error(w, "run loop has stopped", http.StatusServiceUnavailable) + return + case <-app.payloadUploadStopped: + http.Error(w, "payload upload loop has stopped", http.StatusServiceUnavailable) + return + default: + } + w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("OK")) } @@ -247,8 +264,41 @@ func (app *SingleNodeApp) Start() { go func() { defer app.wg.Done() defer app.logger.Info("SingleNodeApp run loop finished.") + defer close(app.runLoopStopped) app.runLoop() }() + + if app.payloadRepo != nil { + app.wg.Add(1) + go func() { + defer app.wg.Done() + defer app.logger.Info("Payload upload loop finished.") + defer close(app.payloadUploadStopped) + + for { + select { + case <-app.appCtx.Done(): + return + case payloadInfo := <-app.payloadChan: + if payloadInfo == nil { + continue + } + if err := util.RetryWithBackoff(app.appCtx, 3, app.logger.With("payload", payloadInfo.PayloadID), func() error { + return app.payloadRepo.SavePayload(app.appCtx, payloadInfo) + }); err != nil { + app.logger.Error( + "Failed to save payload to database after retries", + "payload_id", payloadInfo.PayloadID, + "error", err, + ) + return + } else { + app.logger.Info("Payload details submitted to database for saving", "payload_id", payloadInfo.PayloadID) + } + } + } + }() + } } // shutdownWithError handles errors during the run loop and initiates a shutdown. @@ -328,27 +378,6 @@ func (app *SingleNodeApp) produceBlock() error { blockHeight = 0 } - if app.payloadRepo != nil { - payloadInfo := &types.PayloadInfo{ - PayloadID: currentState.PayloadID, - ExecutionPayload: currentState.ExecutionPayload, - BlockHeight: blockHeight, - } - saveCtx, saveCancel := context.WithTimeout(app.appCtx, 200*time.Millisecond) - defer saveCancel() - - if err := app.payloadRepo.SavePayload(saveCtx, payloadInfo); err != nil { - app.logger.Error( - "Failed to save payload to database", - "payload_id", currentState.PayloadID, - "error", err, - ) - return fmt.Errorf("failed to save payload to database: %w", err) - } else { - app.logger.Info("Payload details submitted to database for saving", "payload_id", currentState.PayloadID) - } - } - // Step 2: Finalize the block app.logger.Info( "finalizing block", @@ -358,6 +387,19 @@ func (app *SingleNodeApp) produceBlock() error { if err := app.blockBuilder.FinalizeBlock(app.appCtx, currentState.PayloadID, currentState.ExecutionPayload, ""); err != nil { return fmt.Errorf("failed to finalize block: %w", err) } + + if app.payloadRepo != nil { + // Non-blocking send to the payload channel + select { + case app.payloadChan <- &types.PayloadInfo{ + PayloadID: currentState.PayloadID, + ExecutionPayload: currentState.ExecutionPayload, + BlockHeight: blockHeight, + }: + case <-app.appCtx.Done(): + return app.appCtx.Err() + } + } return nil } diff --git a/cl/singlenode/singlenode_test.go b/cl/singlenode/singlenode_test.go index 73e6ddd86..769c7c130 100644 --- a/cl/singlenode/singlenode_test.go +++ b/cl/singlenode/singlenode_test.go @@ -315,12 +315,13 @@ func TestStartStop(t *testing.T) { // Create app with minimal configuration for testing app := &SingleNodeApp{ - logger: logger, - cfg: Config{HealthAddr: ":0"}, - appCtx: ctx, - cancel: cancel, - blockBuilder: mockBuilder, - stateManager: stateMgr, + logger: logger, + cfg: Config{HealthAddr: ":0"}, + appCtx: ctx, + cancel: cancel, + blockBuilder: mockBuilder, + stateManager: stateMgr, + runLoopStopped: make(chan struct{}), } app.Start() From 6374030c522331dd307bf1404395f1e1ae7e95f6 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Thu, 18 Sep 2025 00:30:12 +0530 Subject: [PATCH 2/6] fix: revert async payload upload --- cl/singlenode/singlenode.go | 113 +++++++++++++----------------------- 1 file changed, 40 insertions(+), 73 deletions(-) diff --git a/cl/singlenode/singlenode.go b/cl/singlenode/singlenode.go index a3bc0ccfc..87af369d1 100644 --- a/cl/singlenode/singlenode.go +++ b/cl/singlenode/singlenode.go @@ -18,7 +18,6 @@ import ( "github.com/primev/mev-commit/cl/singlenode/payloadstore" localstate "github.com/primev/mev-commit/cl/singlenode/state" "github.com/primev/mev-commit/cl/types" - "github.com/primev/mev-commit/cl/util" ) const ( @@ -54,18 +53,16 @@ type SingleNodeApp struct { blockBuilder BlockBuilder // stateManager is a local state manager for block production // it's not anticipated to use DB as all the state already in geth client - stateManager *localstate.LocalStateManager - payloadRepo types.PayloadRepository - payloadServer *api.PayloadServer - appCtx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - connectionStatus sync.Mutex - connectionRefused bool - rpcClient *rpc.Client - payloadChan chan *types.PayloadInfo - runLoopStopped chan struct{} - payloadUploadStopped chan struct{} + stateManager *localstate.LocalStateManager + payloadRepo types.PayloadRepository + payloadServer *api.PayloadServer + appCtx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + connectionStatus sync.Mutex + connectionRefused bool + rpcClient *rpc.Client + runLoopStopped chan struct{} } // NewSingleNodeApp creates and initializes a new SingleNodeApp. @@ -148,19 +145,17 @@ func NewSingleNodeApp( } return &SingleNodeApp{ - logger: logger, - cfg: cfg, - blockBuilder: bb, - stateManager: stateMgr, - payloadRepo: pRepo, - payloadServer: payloadServer, - appCtx: ctx, - cancel: cancel, - connectionRefused: false, - rpcClient: rpcClient, - payloadChan: make(chan *types.PayloadInfo, 10), - runLoopStopped: make(chan struct{}), - payloadUploadStopped: make(chan struct{}), + logger: logger, + cfg: cfg, + blockBuilder: bb, + stateManager: stateMgr, + payloadRepo: pRepo, + payloadServer: payloadServer, + appCtx: ctx, + cancel: cancel, + connectionRefused: false, + rpcClient: rpcClient, + runLoopStopped: make(chan struct{}), }, nil } @@ -212,9 +207,6 @@ func (app *SingleNodeApp) healthHandler(w http.ResponseWriter, r *http.Request) case <-app.runLoopStopped: http.Error(w, "run loop has stopped", http.StatusServiceUnavailable) return - case <-app.payloadUploadStopped: - http.Error(w, "payload upload loop has stopped", http.StatusServiceUnavailable) - return default: } @@ -267,38 +259,6 @@ func (app *SingleNodeApp) Start() { defer close(app.runLoopStopped) app.runLoop() }() - - if app.payloadRepo != nil { - app.wg.Add(1) - go func() { - defer app.wg.Done() - defer app.logger.Info("Payload upload loop finished.") - defer close(app.payloadUploadStopped) - - for { - select { - case <-app.appCtx.Done(): - return - case payloadInfo := <-app.payloadChan: - if payloadInfo == nil { - continue - } - if err := util.RetryWithBackoff(app.appCtx, 3, app.logger.With("payload", payloadInfo.PayloadID), func() error { - return app.payloadRepo.SavePayload(app.appCtx, payloadInfo) - }); err != nil { - app.logger.Error( - "Failed to save payload to database after retries", - "payload_id", payloadInfo.PayloadID, - "error", err, - ) - return - } else { - app.logger.Info("Payload details submitted to database for saving", "payload_id", payloadInfo.PayloadID) - } - } - } - }() - } } // shutdownWithError handles errors during the run loop and initiates a shutdown. @@ -378,6 +338,25 @@ func (app *SingleNodeApp) produceBlock() error { blockHeight = 0 } + if app.payloadRepo != nil { + // Save payload to repository + saveCtx, saveCancel := context.WithTimeout(app.appCtx, 400*time.Millisecond) + defer saveCancel() + + if err := app.payloadRepo.SavePayload(saveCtx, &types.PayloadInfo{ + PayloadID: currentState.PayloadID, + ExecutionPayload: currentState.ExecutionPayload, + BlockHeight: blockHeight, + }); err != nil { + return fmt.Errorf("failed to save payload: %w", err) + } + app.logger.Info( + "payload saved to repository", + "payload_id", currentState.PayloadID, + "block_height", blockHeight, + ) + } + // Step 2: Finalize the block app.logger.Info( "finalizing block", @@ -388,18 +367,6 @@ func (app *SingleNodeApp) produceBlock() error { return fmt.Errorf("failed to finalize block: %w", err) } - if app.payloadRepo != nil { - // Non-blocking send to the payload channel - select { - case app.payloadChan <- &types.PayloadInfo{ - PayloadID: currentState.PayloadID, - ExecutionPayload: currentState.ExecutionPayload, - BlockHeight: blockHeight, - }: - case <-app.appCtx.Done(): - return app.appCtx.Err() - } - } return nil } From 30ce189e4384c0910c3df017b65a95c0186bf00b Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 17 Sep 2025 12:48:31 -0700 Subject: [PATCH 3/6] follower health (#790) --- cl/cmd/singlenode/main.go | 6 +++ cl/singlenode/follower/follower.go | 74 +++++++++++++++++++++++++----- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/cl/cmd/singlenode/main.go b/cl/cmd/singlenode/main.go index dc1386aa3..b259e73ab 100644 --- a/cl/cmd/singlenode/main.go +++ b/cl/cmd/singlenode/main.go @@ -413,11 +413,17 @@ func startFollowerNode(c *cli.Context) error { } bb := blockbuilder.NewMemberBlockBuilder(engineCL, logger.With("component", "BlockBuilder")) + healthAddr := c.String(healthAddrPortFlag.Name) + if healthAddr == "" { + return fmt.Errorf("health-addr is required") + } + followerNode, err := follower.NewFollower( logger, repo, syncBatchSize, bb, + healthAddr, ) if err != nil { logger.Error("Failed to initialize Follower", "error", err) diff --git a/cl/singlenode/follower/follower.go b/cl/singlenode/follower/follower.go index 12bd5bbb7..5abea967e 100644 --- a/cl/singlenode/follower/follower.go +++ b/cl/singlenode/follower/follower.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "net/http" "sync" "time" @@ -13,12 +14,16 @@ import ( ) type Follower struct { - logger *slog.Logger - sharedDB payloadDB - syncBatchSize uint64 - payloadCh chan types.PayloadInfo - bbMutex sync.RWMutex - bb blockBuilder + logger *slog.Logger + sharedDB payloadDB + syncBatchSize uint64 + payloadCh chan types.PayloadInfo + bbMutex sync.RWMutex + bb blockBuilder + healthAddr string + syncStopped chan struct{} + handlePayloadsStopped chan struct{} + healthStopped chan struct{} } const ( @@ -41,6 +46,7 @@ func NewFollower( sharedDB payloadDB, syncBatchSize uint64, bb blockBuilder, + healthAddr string, ) (*Follower, error) { if sharedDB == nil { return nil, errors.New("payload repository not provided") @@ -49,11 +55,15 @@ func NewFollower( return nil, errors.New("sync batch size must be greater than 0") } return &Follower{ - logger: logger, - sharedDB: sharedDB, - syncBatchSize: syncBatchSize, - payloadCh: make(chan types.PayloadInfo), - bb: bb, + logger: logger, + sharedDB: sharedDB, + syncBatchSize: syncBatchSize, + payloadCh: make(chan types.PayloadInfo), + bb: bb, + healthAddr: healthAddr, + healthStopped: make(chan struct{}), + syncStopped: make(chan struct{}), + handlePayloadsStopped: make(chan struct{}), }, nil } @@ -61,11 +71,34 @@ func (f *Follower) Start(ctx context.Context) <-chan struct{} { done := make(chan struct{}) eg, egCtx := errgroup.WithContext(ctx) + + eg.Go(func() error { + defer close(f.healthStopped) + mux := http.NewServeMux() + mux.HandleFunc("/health", f.healthHandler) + server := &http.Server{Addr: f.healthAddr, Handler: mux} + f.logger.Info("Health endpoint listening", "address", f.healthAddr) + + go func() { + <-egCtx.Done() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _ = server.Shutdown(ctx) + }() + + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil + }) + eg.Go(func() error { + defer close(f.handlePayloadsStopped) return f.handlePayloads(egCtx) }) eg.Go(func() error { + defer close(f.syncStopped) f.logger.Info("Starting sync from shared DB") return f.syncFromSharedDB(egCtx) }) @@ -80,6 +113,25 @@ func (f *Follower) Start(ctx context.Context) <-chan struct{} { return done } +func (f *Follower) healthHandler(w http.ResponseWriter, r *http.Request) { + + select { + case <-f.healthStopped: + http.Error(w, "health loop has stopped", http.StatusServiceUnavailable) + return + case <-f.syncStopped: + http.Error(w, "sync from shared DB has stopped", http.StatusServiceUnavailable) + return + case <-f.handlePayloadsStopped: + http.Error(w, "handle payloads loop has stopped", http.StatusServiceUnavailable) + return + default: + } + + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) +} + func (f *Follower) syncFromSharedDB(ctx context.Context) error { if f.getExecutionHead() == nil { if err := f.setExecutionHeadFromRPC(ctx); err != nil { From 8b0e5da7790ab818342a8917fdae1c19ae82abe6 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Thu, 18 Sep 2025 01:19:43 +0530 Subject: [PATCH 4/6] fix: restore timeout --- cl/singlenode/singlenode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/singlenode/singlenode.go b/cl/singlenode/singlenode.go index 87af369d1..34434019f 100644 --- a/cl/singlenode/singlenode.go +++ b/cl/singlenode/singlenode.go @@ -340,7 +340,7 @@ func (app *SingleNodeApp) produceBlock() error { if app.payloadRepo != nil { // Save payload to repository - saveCtx, saveCancel := context.WithTimeout(app.appCtx, 400*time.Millisecond) + saveCtx, saveCancel := context.WithTimeout(app.appCtx, 200*time.Millisecond) defer saveCancel() if err := app.payloadRepo.SavePayload(saveCtx, &types.PayloadInfo{ From 35db6dfb4b43850a2dbd4b98b3b36acbd2a6bbd2 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 17 Sep 2025 13:06:19 -0700 Subject: [PATCH 5/6] Update follower_test.go --- cl/singlenode/follower/follower_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cl/singlenode/follower/follower_test.go b/cl/singlenode/follower/follower_test.go index 046132365..812852b12 100644 --- a/cl/singlenode/follower/follower_test.go +++ b/cl/singlenode/follower/follower_test.go @@ -81,7 +81,7 @@ func TestFollower_syncFromSharedDB(t *testing.T) { syncBatchSize := uint64(100) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") if err != nil { t.Fatal(err) } @@ -166,7 +166,7 @@ func TestFollower_syncFromSharedDB_NoRows(t *testing.T) { syncBatchSize := uint64(100) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") if err != nil { t.Fatal(err) } @@ -280,7 +280,7 @@ func TestFollower_syncFromSharedDB_MultipleIterations(t *testing.T) { syncBatchSize := uint64(20) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") if err != nil { t.Fatal(err) } @@ -364,7 +364,7 @@ func TestFollower_Start_SimulateNewChain(t *testing.T) { syncBatchSize := uint64(100) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") if err != nil { t.Fatal(err) } @@ -442,7 +442,7 @@ func TestFollower_Start_SyncExistingChain(t *testing.T) { syncBatchSize := uint64(20) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") if err != nil { t.Fatal(err) } From 8060b13c2f550086059e5f18c1aca1c7bc22cf54 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 17 Sep 2025 13:16:04 -0700 Subject: [PATCH 6/6] fix: use different health ports since tests run in parallel --- cl/singlenode/follower/follower_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cl/singlenode/follower/follower_test.go b/cl/singlenode/follower/follower_test.go index 812852b12..7744e96d2 100644 --- a/cl/singlenode/follower/follower_test.go +++ b/cl/singlenode/follower/follower_test.go @@ -166,7 +166,7 @@ func TestFollower_syncFromSharedDB_NoRows(t *testing.T) { syncBatchSize := uint64(100) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8081") if err != nil { t.Fatal(err) } @@ -280,7 +280,7 @@ func TestFollower_syncFromSharedDB_MultipleIterations(t *testing.T) { syncBatchSize := uint64(20) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8082") if err != nil { t.Fatal(err) } @@ -364,7 +364,7 @@ func TestFollower_Start_SimulateNewChain(t *testing.T) { syncBatchSize := uint64(100) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8083") if err != nil { t.Fatal(err) } @@ -442,7 +442,7 @@ func TestFollower_Start_SyncExistingChain(t *testing.T) { syncBatchSize := uint64(20) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8084") if err != nil { t.Fatal(err) }