diff --git a/cl/cmd/singlenode/main.go b/cl/cmd/singlenode/main.go index dc1386aa3..b259e73ab 100644 --- a/cl/cmd/singlenode/main.go +++ b/cl/cmd/singlenode/main.go @@ -413,11 +413,17 @@ func startFollowerNode(c *cli.Context) error { } bb := blockbuilder.NewMemberBlockBuilder(engineCL, logger.With("component", "BlockBuilder")) + healthAddr := c.String(healthAddrPortFlag.Name) + if healthAddr == "" { + return fmt.Errorf("health-addr is required") + } + followerNode, err := follower.NewFollower( logger, repo, syncBatchSize, bb, + healthAddr, ) if err != nil { logger.Error("Failed to initialize Follower", "error", err) diff --git a/cl/singlenode/follower/follower.go b/cl/singlenode/follower/follower.go index 12bd5bbb7..5abea967e 100644 --- a/cl/singlenode/follower/follower.go +++ b/cl/singlenode/follower/follower.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "net/http" "sync" "time" @@ -13,12 +14,16 @@ import ( ) type Follower struct { - logger *slog.Logger - sharedDB payloadDB - syncBatchSize uint64 - payloadCh chan types.PayloadInfo - bbMutex sync.RWMutex - bb blockBuilder + logger *slog.Logger + sharedDB payloadDB + syncBatchSize uint64 + payloadCh chan types.PayloadInfo + bbMutex sync.RWMutex + bb blockBuilder + healthAddr string + syncStopped chan struct{} + handlePayloadsStopped chan struct{} + healthStopped chan struct{} } const ( @@ -41,6 +46,7 @@ func NewFollower( sharedDB payloadDB, syncBatchSize uint64, bb blockBuilder, + healthAddr string, ) (*Follower, error) { if sharedDB == nil { return nil, errors.New("payload repository not provided") @@ -49,11 +55,15 @@ func NewFollower( return nil, errors.New("sync batch size must be greater than 0") } return &Follower{ - logger: logger, - sharedDB: sharedDB, - syncBatchSize: syncBatchSize, - payloadCh: make(chan types.PayloadInfo), - bb: bb, + logger: logger, + sharedDB: sharedDB, + syncBatchSize: syncBatchSize, + payloadCh: make(chan types.PayloadInfo), + bb: bb, + healthAddr: healthAddr, + healthStopped: make(chan struct{}), + syncStopped: make(chan struct{}), + handlePayloadsStopped: make(chan struct{}), }, nil } @@ -61,11 +71,34 @@ func (f *Follower) Start(ctx context.Context) <-chan struct{} { done := make(chan struct{}) eg, egCtx := errgroup.WithContext(ctx) + + eg.Go(func() error { + defer close(f.healthStopped) + mux := http.NewServeMux() + mux.HandleFunc("/health", f.healthHandler) + server := &http.Server{Addr: f.healthAddr, Handler: mux} + f.logger.Info("Health endpoint listening", "address", f.healthAddr) + + go func() { + <-egCtx.Done() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _ = server.Shutdown(ctx) + }() + + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil + }) + eg.Go(func() error { + defer close(f.handlePayloadsStopped) return f.handlePayloads(egCtx) }) eg.Go(func() error { + defer close(f.syncStopped) f.logger.Info("Starting sync from shared DB") return f.syncFromSharedDB(egCtx) }) @@ -80,6 +113,25 @@ func (f *Follower) Start(ctx context.Context) <-chan struct{} { return done } +func (f *Follower) healthHandler(w http.ResponseWriter, r *http.Request) { + + select { + case <-f.healthStopped: + http.Error(w, "health loop has stopped", http.StatusServiceUnavailable) + return + case <-f.syncStopped: + http.Error(w, "sync from shared DB has stopped", http.StatusServiceUnavailable) + return + case <-f.handlePayloadsStopped: + http.Error(w, "handle payloads loop has stopped", http.StatusServiceUnavailable) + return + default: + } + + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) +} + func (f *Follower) syncFromSharedDB(ctx context.Context) error { if f.getExecutionHead() == nil { if err := f.setExecutionHeadFromRPC(ctx); err != nil { diff --git a/cl/singlenode/follower/follower_test.go b/cl/singlenode/follower/follower_test.go index 046132365..7744e96d2 100644 --- a/cl/singlenode/follower/follower_test.go +++ b/cl/singlenode/follower/follower_test.go @@ -81,7 +81,7 @@ func TestFollower_syncFromSharedDB(t *testing.T) { syncBatchSize := uint64(100) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080") if err != nil { t.Fatal(err) } @@ -166,7 +166,7 @@ func TestFollower_syncFromSharedDB_NoRows(t *testing.T) { syncBatchSize := uint64(100) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8081") if err != nil { t.Fatal(err) } @@ -280,7 +280,7 @@ func TestFollower_syncFromSharedDB_MultipleIterations(t *testing.T) { syncBatchSize := uint64(20) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8082") if err != nil { t.Fatal(err) } @@ -364,7 +364,7 @@ func TestFollower_Start_SimulateNewChain(t *testing.T) { syncBatchSize := uint64(100) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8083") if err != nil { t.Fatal(err) } @@ -442,7 +442,7 @@ func TestFollower_Start_SyncExistingChain(t *testing.T) { syncBatchSize := uint64(20) bb := newMockBlockBuilder() - follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb) + follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8084") if err != nil { t.Fatal(err) } diff --git a/cl/singlenode/singlenode.go b/cl/singlenode/singlenode.go index 5dc5c1380..34434019f 100644 --- a/cl/singlenode/singlenode.go +++ b/cl/singlenode/singlenode.go @@ -62,6 +62,7 @@ type SingleNodeApp struct { connectionStatus sync.Mutex connectionRefused bool rpcClient *rpc.Client + runLoopStopped chan struct{} } // NewSingleNodeApp creates and initializes a new SingleNodeApp. @@ -154,6 +155,7 @@ func NewSingleNodeApp( cancel: cancel, connectionRefused: false, rpcClient: rpcClient, + runLoopStopped: make(chan struct{}), }, nil } @@ -201,6 +203,13 @@ func (app *SingleNodeApp) healthHandler(w http.ResponseWriter, r *http.Request) return } + select { + case <-app.runLoopStopped: + http.Error(w, "run loop has stopped", http.StatusServiceUnavailable) + return + default: + } + w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("OK")) } @@ -247,6 +256,7 @@ func (app *SingleNodeApp) Start() { go func() { defer app.wg.Done() defer app.logger.Info("SingleNodeApp run loop finished.") + defer close(app.runLoopStopped) app.runLoop() }() } @@ -329,24 +339,22 @@ func (app *SingleNodeApp) produceBlock() error { } if app.payloadRepo != nil { - payloadInfo := &types.PayloadInfo{ - PayloadID: currentState.PayloadID, - ExecutionPayload: currentState.ExecutionPayload, - BlockHeight: blockHeight, - } + // Save payload to repository saveCtx, saveCancel := context.WithTimeout(app.appCtx, 200*time.Millisecond) defer saveCancel() - if err := app.payloadRepo.SavePayload(saveCtx, payloadInfo); err != nil { - app.logger.Error( - "Failed to save payload to database", - "payload_id", currentState.PayloadID, - "error", err, - ) - return fmt.Errorf("failed to save payload to database: %w", err) - } else { - app.logger.Info("Payload details submitted to database for saving", "payload_id", currentState.PayloadID) + if err := app.payloadRepo.SavePayload(saveCtx, &types.PayloadInfo{ + PayloadID: currentState.PayloadID, + ExecutionPayload: currentState.ExecutionPayload, + BlockHeight: blockHeight, + }); err != nil { + return fmt.Errorf("failed to save payload: %w", err) } + app.logger.Info( + "payload saved to repository", + "payload_id", currentState.PayloadID, + "block_height", blockHeight, + ) } // Step 2: Finalize the block @@ -358,6 +366,7 @@ func (app *SingleNodeApp) produceBlock() error { if err := app.blockBuilder.FinalizeBlock(app.appCtx, currentState.PayloadID, currentState.ExecutionPayload, ""); err != nil { return fmt.Errorf("failed to finalize block: %w", err) } + return nil } diff --git a/cl/singlenode/singlenode_test.go b/cl/singlenode/singlenode_test.go index 73e6ddd86..769c7c130 100644 --- a/cl/singlenode/singlenode_test.go +++ b/cl/singlenode/singlenode_test.go @@ -315,12 +315,13 @@ func TestStartStop(t *testing.T) { // Create app with minimal configuration for testing app := &SingleNodeApp{ - logger: logger, - cfg: Config{HealthAddr: ":0"}, - appCtx: ctx, - cancel: cancel, - blockBuilder: mockBuilder, - stateManager: stateMgr, + logger: logger, + cfg: Config{HealthAddr: ":0"}, + appCtx: ctx, + cancel: cancel, + blockBuilder: mockBuilder, + stateManager: stateMgr, + runLoopStopped: make(chan struct{}), } app.Start()