From de13f4d3c3c9be2de5ec73485c01d8e064ab64a9 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 17 Sep 2025 12:39:20 -0700 Subject: [PATCH] follower health --- cl/cmd/singlenode/main.go | 6 +++ cl/singlenode/follower/follower.go | 74 +++++++++++++++++++++++++----- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/cl/cmd/singlenode/main.go b/cl/cmd/singlenode/main.go index dc1386aa3..b259e73ab 100644 --- a/cl/cmd/singlenode/main.go +++ b/cl/cmd/singlenode/main.go @@ -413,11 +413,17 @@ func startFollowerNode(c *cli.Context) error { } bb := blockbuilder.NewMemberBlockBuilder(engineCL, logger.With("component", "BlockBuilder")) + healthAddr := c.String(healthAddrPortFlag.Name) + if healthAddr == "" { + return fmt.Errorf("health-addr is required") + } + followerNode, err := follower.NewFollower( logger, repo, syncBatchSize, bb, + healthAddr, ) if err != nil { logger.Error("Failed to initialize Follower", "error", err) diff --git a/cl/singlenode/follower/follower.go b/cl/singlenode/follower/follower.go index 12bd5bbb7..5abea967e 100644 --- a/cl/singlenode/follower/follower.go +++ b/cl/singlenode/follower/follower.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "net/http" "sync" "time" @@ -13,12 +14,16 @@ import ( ) type Follower struct { - logger *slog.Logger - sharedDB payloadDB - syncBatchSize uint64 - payloadCh chan types.PayloadInfo - bbMutex sync.RWMutex - bb blockBuilder + logger *slog.Logger + sharedDB payloadDB + syncBatchSize uint64 + payloadCh chan types.PayloadInfo + bbMutex sync.RWMutex + bb blockBuilder + healthAddr string + syncStopped chan struct{} + handlePayloadsStopped chan struct{} + healthStopped chan struct{} } const ( @@ -41,6 +46,7 @@ func NewFollower( sharedDB payloadDB, syncBatchSize uint64, bb blockBuilder, + healthAddr string, ) (*Follower, error) { if sharedDB == nil { return nil, errors.New("payload repository not provided") @@ -49,11 +55,15 @@ func NewFollower( return nil, errors.New("sync batch size must be greater than 0") } return &Follower{ - logger: logger, - sharedDB: sharedDB, - syncBatchSize: syncBatchSize, - payloadCh: make(chan types.PayloadInfo), - bb: bb, + logger: logger, + sharedDB: sharedDB, + syncBatchSize: syncBatchSize, + payloadCh: make(chan types.PayloadInfo), + bb: bb, + healthAddr: healthAddr, + healthStopped: make(chan struct{}), + syncStopped: make(chan struct{}), + handlePayloadsStopped: make(chan struct{}), }, nil } @@ -61,11 +71,34 @@ func (f *Follower) Start(ctx context.Context) <-chan struct{} { done := make(chan struct{}) eg, egCtx := errgroup.WithContext(ctx) + + eg.Go(func() error { + defer close(f.healthStopped) + mux := http.NewServeMux() + mux.HandleFunc("/health", f.healthHandler) + server := &http.Server{Addr: f.healthAddr, Handler: mux} + f.logger.Info("Health endpoint listening", "address", f.healthAddr) + + go func() { + <-egCtx.Done() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _ = server.Shutdown(ctx) + }() + + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil + }) + eg.Go(func() error { + defer close(f.handlePayloadsStopped) return f.handlePayloads(egCtx) }) eg.Go(func() error { + defer close(f.syncStopped) f.logger.Info("Starting sync from shared DB") return f.syncFromSharedDB(egCtx) }) @@ -80,6 +113,25 @@ func (f *Follower) Start(ctx context.Context) <-chan struct{} { return done } +func (f *Follower) healthHandler(w http.ResponseWriter, r *http.Request) { + + select { + case <-f.healthStopped: + http.Error(w, "health loop has stopped", http.StatusServiceUnavailable) + return + case <-f.syncStopped: + http.Error(w, "sync from shared DB has stopped", http.StatusServiceUnavailable) + return + case <-f.handlePayloadsStopped: + http.Error(w, "handle payloads loop has stopped", http.StatusServiceUnavailable) + return + default: + } + + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) +} + func (f *Follower) syncFromSharedDB(ctx context.Context) error { if f.getExecutionHead() == nil { if err := f.setExecutionHeadFromRPC(ctx); err != nil {