Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cl/cmd/singlenode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,17 @@ func startFollowerNode(c *cli.Context) error {
}
bb := blockbuilder.NewMemberBlockBuilder(engineCL, logger.With("component", "BlockBuilder"))

healthAddr := c.String(healthAddrPortFlag.Name)
if healthAddr == "" {
return fmt.Errorf("health-addr is required")
}

followerNode, err := follower.NewFollower(
logger,
repo,
syncBatchSize,
bb,
healthAddr,
)
if err != nil {
logger.Error("Failed to initialize Follower", "error", err)
Expand Down
74 changes: 63 additions & 11 deletions cl/singlenode/follower/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"net/http"
"sync"
"time"

Expand All @@ -13,12 +14,16 @@ import (
)

type Follower struct {
logger *slog.Logger
sharedDB payloadDB
syncBatchSize uint64
payloadCh chan types.PayloadInfo
bbMutex sync.RWMutex
bb blockBuilder
logger *slog.Logger
sharedDB payloadDB
syncBatchSize uint64
payloadCh chan types.PayloadInfo
bbMutex sync.RWMutex
bb blockBuilder
healthAddr string
syncStopped chan struct{}
handlePayloadsStopped chan struct{}
healthStopped chan struct{}
}

const (
Expand All @@ -41,6 +46,7 @@ func NewFollower(
sharedDB payloadDB,
syncBatchSize uint64,
bb blockBuilder,
healthAddr string,
) (*Follower, error) {
if sharedDB == nil {
return nil, errors.New("payload repository not provided")
Expand All @@ -49,23 +55,50 @@ func NewFollower(
return nil, errors.New("sync batch size must be greater than 0")
}
return &Follower{
logger: logger,
sharedDB: sharedDB,
syncBatchSize: syncBatchSize,
payloadCh: make(chan types.PayloadInfo),
bb: bb,
logger: logger,
sharedDB: sharedDB,
syncBatchSize: syncBatchSize,
payloadCh: make(chan types.PayloadInfo),
bb: bb,
healthAddr: healthAddr,
healthStopped: make(chan struct{}),
syncStopped: make(chan struct{}),
handlePayloadsStopped: make(chan struct{}),
}, nil
}

func (f *Follower) Start(ctx context.Context) <-chan struct{} {

done := make(chan struct{})
eg, egCtx := errgroup.WithContext(ctx)

eg.Go(func() error {
defer close(f.healthStopped)
mux := http.NewServeMux()
mux.HandleFunc("/health", f.healthHandler)
server := &http.Server{Addr: f.healthAddr, Handler: mux}
f.logger.Info("Health endpoint listening", "address", f.healthAddr)

go func() {
<-egCtx.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_ = server.Shutdown(ctx)
}()

if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
})

eg.Go(func() error {
defer close(f.handlePayloadsStopped)
return f.handlePayloads(egCtx)
})

eg.Go(func() error {
defer close(f.syncStopped)
f.logger.Info("Starting sync from shared DB")
return f.syncFromSharedDB(egCtx)
})
Expand All @@ -80,6 +113,25 @@ func (f *Follower) Start(ctx context.Context) <-chan struct{} {
return done
}

func (f *Follower) healthHandler(w http.ResponseWriter, r *http.Request) {

select {
case <-f.healthStopped:
http.Error(w, "health loop has stopped", http.StatusServiceUnavailable)
return
case <-f.syncStopped:
http.Error(w, "sync from shared DB has stopped", http.StatusServiceUnavailable)
return
case <-f.handlePayloadsStopped:
http.Error(w, "handle payloads loop has stopped", http.StatusServiceUnavailable)
return
default:
}

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
}

func (f *Follower) syncFromSharedDB(ctx context.Context) error {
if f.getExecutionHead() == nil {
if err := f.setExecutionHeadFromRPC(ctx); err != nil {
Expand Down