Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cl/cmd/singlenode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,17 @@ func startFollowerNode(c *cli.Context) error {
}
bb := blockbuilder.NewMemberBlockBuilder(engineCL, logger.With("component", "BlockBuilder"))

healthAddr := c.String(healthAddrPortFlag.Name)
if healthAddr == "" {
return fmt.Errorf("health-addr is required")
}

followerNode, err := follower.NewFollower(
logger,
repo,
syncBatchSize,
bb,
healthAddr,
)
if err != nil {
logger.Error("Failed to initialize Follower", "error", err)
Expand Down
74 changes: 63 additions & 11 deletions cl/singlenode/follower/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"net/http"
"sync"
"time"

Expand All @@ -13,12 +14,16 @@ import (
)

type Follower struct {
logger *slog.Logger
sharedDB payloadDB
syncBatchSize uint64
payloadCh chan types.PayloadInfo
bbMutex sync.RWMutex
bb blockBuilder
logger *slog.Logger
sharedDB payloadDB
syncBatchSize uint64
payloadCh chan types.PayloadInfo
bbMutex sync.RWMutex
bb blockBuilder
healthAddr string
syncStopped chan struct{}
handlePayloadsStopped chan struct{}
healthStopped chan struct{}
}

const (
Expand All @@ -41,6 +46,7 @@ func NewFollower(
sharedDB payloadDB,
syncBatchSize uint64,
bb blockBuilder,
healthAddr string,
) (*Follower, error) {
if sharedDB == nil {
return nil, errors.New("payload repository not provided")
Expand All @@ -49,23 +55,50 @@ func NewFollower(
return nil, errors.New("sync batch size must be greater than 0")
}
return &Follower{
logger: logger,
sharedDB: sharedDB,
syncBatchSize: syncBatchSize,
payloadCh: make(chan types.PayloadInfo),
bb: bb,
logger: logger,
sharedDB: sharedDB,
syncBatchSize: syncBatchSize,
payloadCh: make(chan types.PayloadInfo),
bb: bb,
healthAddr: healthAddr,
healthStopped: make(chan struct{}),
syncStopped: make(chan struct{}),
handlePayloadsStopped: make(chan struct{}),
}, nil
}

func (f *Follower) Start(ctx context.Context) <-chan struct{} {

done := make(chan struct{})
eg, egCtx := errgroup.WithContext(ctx)

eg.Go(func() error {
defer close(f.healthStopped)
mux := http.NewServeMux()
mux.HandleFunc("/health", f.healthHandler)
server := &http.Server{Addr: f.healthAddr, Handler: mux}
f.logger.Info("Health endpoint listening", "address", f.healthAddr)

go func() {
<-egCtx.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_ = server.Shutdown(ctx)
}()

if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
})

eg.Go(func() error {
defer close(f.handlePayloadsStopped)
return f.handlePayloads(egCtx)
})

eg.Go(func() error {
defer close(f.syncStopped)
f.logger.Info("Starting sync from shared DB")
return f.syncFromSharedDB(egCtx)
})
Expand All @@ -80,6 +113,25 @@ func (f *Follower) Start(ctx context.Context) <-chan struct{} {
return done
}

func (f *Follower) healthHandler(w http.ResponseWriter, r *http.Request) {

select {
case <-f.healthStopped:
http.Error(w, "health loop has stopped", http.StatusServiceUnavailable)
return
case <-f.syncStopped:
http.Error(w, "sync from shared DB has stopped", http.StatusServiceUnavailable)
return
case <-f.handlePayloadsStopped:
http.Error(w, "handle payloads loop has stopped", http.StatusServiceUnavailable)
return
default:
}

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
}

func (f *Follower) syncFromSharedDB(ctx context.Context) error {
if f.getExecutionHead() == nil {
if err := f.setExecutionHeadFromRPC(ctx); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions cl/singlenode/follower/follower_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestFollower_syncFromSharedDB(t *testing.T) {
syncBatchSize := uint64(100)

bb := newMockBlockBuilder()
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb)
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8080")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestFollower_syncFromSharedDB_NoRows(t *testing.T) {
syncBatchSize := uint64(100)

bb := newMockBlockBuilder()
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb)
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8081")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestFollower_syncFromSharedDB_MultipleIterations(t *testing.T) {
syncBatchSize := uint64(20)

bb := newMockBlockBuilder()
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb)
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8082")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestFollower_Start_SimulateNewChain(t *testing.T) {
syncBatchSize := uint64(100)

bb := newMockBlockBuilder()
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb)
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8083")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestFollower_Start_SyncExistingChain(t *testing.T) {
syncBatchSize := uint64(20)

bb := newMockBlockBuilder()
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb)
follower, err := follower.NewFollower(logger, payloadRepo, syncBatchSize, bb, ":8084")
if err != nil {
t.Fatal(err)
}
Expand Down
37 changes: 23 additions & 14 deletions cl/singlenode/singlenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type SingleNodeApp struct {
connectionStatus sync.Mutex
connectionRefused bool
rpcClient *rpc.Client
runLoopStopped chan struct{}
}

// NewSingleNodeApp creates and initializes a new SingleNodeApp.
Expand Down Expand Up @@ -154,6 +155,7 @@ func NewSingleNodeApp(
cancel: cancel,
connectionRefused: false,
rpcClient: rpcClient,
runLoopStopped: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -201,6 +203,13 @@ func (app *SingleNodeApp) healthHandler(w http.ResponseWriter, r *http.Request)
return
}

select {
case <-app.runLoopStopped:
http.Error(w, "run loop has stopped", http.StatusServiceUnavailable)
return
default:
}

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
}
Expand Down Expand Up @@ -247,6 +256,7 @@ func (app *SingleNodeApp) Start() {
go func() {
defer app.wg.Done()
defer app.logger.Info("SingleNodeApp run loop finished.")
defer close(app.runLoopStopped)
app.runLoop()
}()
}
Expand Down Expand Up @@ -329,24 +339,22 @@ func (app *SingleNodeApp) produceBlock() error {
}

if app.payloadRepo != nil {
payloadInfo := &types.PayloadInfo{
PayloadID: currentState.PayloadID,
ExecutionPayload: currentState.ExecutionPayload,
BlockHeight: blockHeight,
}
// Save payload to repository
saveCtx, saveCancel := context.WithTimeout(app.appCtx, 200*time.Millisecond)
defer saveCancel()

if err := app.payloadRepo.SavePayload(saveCtx, payloadInfo); err != nil {
app.logger.Error(
"Failed to save payload to database",
"payload_id", currentState.PayloadID,
"error", err,
)
return fmt.Errorf("failed to save payload to database: %w", err)
} else {
app.logger.Info("Payload details submitted to database for saving", "payload_id", currentState.PayloadID)
if err := app.payloadRepo.SavePayload(saveCtx, &types.PayloadInfo{
PayloadID: currentState.PayloadID,
ExecutionPayload: currentState.ExecutionPayload,
BlockHeight: blockHeight,
}); err != nil {
return fmt.Errorf("failed to save payload: %w", err)
}
app.logger.Info(
"payload saved to repository",
"payload_id", currentState.PayloadID,
"block_height", blockHeight,
)
}

// Step 2: Finalize the block
Expand All @@ -358,6 +366,7 @@ func (app *SingleNodeApp) produceBlock() error {
if err := app.blockBuilder.FinalizeBlock(app.appCtx, currentState.PayloadID, currentState.ExecutionPayload, ""); err != nil {
return fmt.Errorf("failed to finalize block: %w", err)
}

return nil
}

Expand Down
13 changes: 7 additions & 6 deletions cl/singlenode/singlenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,13 @@ func TestStartStop(t *testing.T) {

// Create app with minimal configuration for testing
app := &SingleNodeApp{
logger: logger,
cfg: Config{HealthAddr: ":0"},
appCtx: ctx,
cancel: cancel,
blockBuilder: mockBuilder,
stateManager: stateMgr,
logger: logger,
cfg: Config{HealthAddr: ":0"},
appCtx: ctx,
cancel: cancel,
blockBuilder: mockBuilder,
stateManager: stateMgr,
runLoopStopped: make(chan struct{}),
}

app.Start()
Expand Down
Loading