From e30ceb3a531a7d06788b3b833066f69a1c523b3a Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Mon, 4 Aug 2025 13:48:09 -0700 Subject: [PATCH] feat: simplify single node cl implementation (#755) --- cl/blockbuilder/blockbuilder.go | 150 +++++++------------ cl/blockbuilder/blockbuilder_test.go | 20 ++- cl/redisapp/leaderfollower/leaderfollower.go | 4 - 3 files changed, 65 insertions(+), 109 deletions(-) diff --git a/cl/blockbuilder/blockbuilder.go b/cl/blockbuilder/blockbuilder.go index 2d6fcd239..78378287a 100644 --- a/cl/blockbuilder/blockbuilder.go +++ b/cl/blockbuilder/blockbuilder.go @@ -54,8 +54,6 @@ type BlockBuilder struct { logger *slog.Logger buildDelay time.Duration buildEmptyBlocksDelay time.Duration - buildDelayMs uint64 - lastBlockTime time.Time feeRecipient common.Address executionHead *types.ExecutionHead } @@ -74,10 +72,8 @@ func NewBlockBuilder( engineCl: engineCl, logger: logger, buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), buildEmptyBlocksDelay: buildDelayEmptyBlocks, feeRecipient: common.HexToAddress(feeReceipt), - lastBlockTime: time.Now().Add(-buildDelayEmptyBlocks), rpcClient: rpcClient, } } @@ -89,12 +85,8 @@ func NewMemberBlockBuilder(engineCL EngineClient, logger *slog.Logger) *BlockBui } } -func (bb *BlockBuilder) SetLastCallTimeToZero() { - bb.lastBlockTime = time.Time{} -} - -func (bb *BlockBuilder) startBuild(ctx context.Context, head *types.ExecutionHead, ts uint64) (engine.ForkChoiceResponse, error) { - hash := common.BytesToHash(head.BlockHash) +func (bb *BlockBuilder) startBuild(ctx context.Context, ts uint64) (engine.ForkChoiceResponse, error) { + hash := common.BytesToHash(bb.executionHead.BlockHash) fcs := engine.ForkchoiceStateV1{ HeadBlockHash: hash, @@ -123,19 +115,39 @@ func (bb *BlockBuilder) startBuild(ctx context.Context, head *types.ExecutionHea func (bb *BlockBuilder) GetPayload(ctx context.Context) error { var ( payloadID *engine.PayloadID - head *types.ExecutionHead err error ) - currentCallTime := time.Now() + + if bb.executionHead == nil { + bb.logger.Info("executionHead is nil, it'll be set by RPC. CL is likely being restarted") + err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error { + innerErr := bb.setExecutionHeadFromRPC(ctx) + if innerErr != nil { + bb.logger.Warn( + "Failed to set execution head from rpc, retrying...", + "error", innerErr, + ) + return innerErr // Will retry + } + return nil // Success + }) + if err != nil { + return fmt.Errorf("failed to set execution head from rpc: %w", err) + } + } else { + bb.logger.Debug("executionHead is not nil, using cached value") + } mempoolStatus, err := bb.GetMempoolStatus(ctx) if err != nil { return fmt.Errorf("failed to get pending transaction count: %w", err) } - bb.logger.Debug("GetMempoolStatus rpc duration", "duration", time.Since(currentCallTime)) + + lastBlockTime := time.UnixMilli(int64(bb.executionHead.BlockTime)) + bb.logger.Debug("lastBlockTime from execution head", "lastBlockTime", lastBlockTime) if mempoolStatus.Pending == 0 { - timeSinceLastBlock := currentCallTime.Sub(bb.lastBlockTime) + timeSinceLastBlock := time.Since(lastBlockTime) if timeSinceLastBlock < bb.buildEmptyBlocksDelay { bb.logger.Debug( "Leader: Skipping empty block", @@ -153,50 +165,24 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error { ) } - // Load execution head to get previous block timestamp - err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error { - head, err = bb.loadExecutionHead(ctx) - if err != nil { - bb.logger.Warn( - "Failed to load execution head, retrying...", - "error", err, - ) - return err // Will retry - } - return nil // Success - }) - if err != nil { - return fmt.Errorf("latest execution block: %w", err) - } - - prevTimestamp := head.BlockTime - - var ts uint64 - - if bb.lastBlockTime.IsZero() { - // First block, initialize LastCallTime and set default timestamp - ts = uint64(time.Now().UnixMilli()) + bb.buildDelayMs - } else { - // Compute diff in milliseconds - diff := currentCallTime.Sub(bb.lastBlockTime) - diffMillis := diff.Milliseconds() - - if uint64(diffMillis) <= bb.buildDelayMs { - ts = prevTimestamp + bb.buildDelayMs - } else { - // For every multiple of buildDelay that diff exceeds, increment the block time by that multiple. - multiples := (uint64(diffMillis) + bb.buildDelayMs - 1) / bb.buildDelayMs // Round up to next multiple of buildDelay - ts = prevTimestamp + multiples*bb.buildDelayMs + ts := uint64(time.Now().UnixMilli()) + if ts <= bb.executionHead.BlockTime { + bb.logger.Warn(`Leader: Current timestamp is less than or equal to + previous block timestamp. Genesis timestamp could have been set incorrectly`, + "current_ts", ts, + "prev_head_block_time", bb.executionHead.BlockTime, + ) + ts = bb.executionHead.BlockTime + 1 + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled") + case <-time.After(time.Until(time.UnixMilli(int64(ts)))): + bb.logger.Info("Leader: Waited until proper block timestamp", "ts", ts) } } - // Very low chance to happen, only after restart and time.Now is broken - if ts <= head.BlockTime { - ts = head.BlockTime + 1 // Subsequent blocks must have a higher timestamp. - } - err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error { - response, err := bb.startBuild(ctx, head, ts) + response, err := bb.startBuild(ctx, ts) if err != nil { bb.logger.Warn( "Failed to build new EVM payload, will retry", @@ -275,8 +261,6 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error { "Leader: BuildBlock completed and block is distributed", "PayloadID", payloadIDStr, ) - - bb.lastBlockTime = time.Now() return nil } @@ -418,30 +402,13 @@ func (bb *BlockBuilder) FinalizeBlock(ctx context.Context, payloadIDStr, executi return fmt.Errorf("failed to deserialize ExecutionPayload: %w", err) } - var head *types.ExecutionHead - err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error { - head, err = bb.loadExecutionHead(ctx) - if err != nil { - bb.logger.Warn( - "Failed to load execution head, retrying...", - "error", err, - ) - return err // Will retry - } - return nil // Success - }) - if err != nil { - return fmt.Errorf("failed to load execution head: %w", err) - } - - if err := bb.validateExecutionPayload(executionPayload, head); err != nil { + if err := bb.validateExecutionPayload(executionPayload); err != nil { return fmt.Errorf("failed to validate execution payload: %w", err) } - hash := common.BytesToHash(head.BlockHash) retryFunc := bb.selectRetryFunction(ctx, msgID) - if err := bb.pushNewPayload(ctx, executionPayload, hash, retryFunc); err != nil { + if err := bb.pushNewPayload(ctx, executionPayload, retryFunc); err != nil { return fmt.Errorf("failed to push new payload: %w", err) } @@ -464,20 +431,20 @@ func (bb *BlockBuilder) FinalizeBlock(ctx context.Context, payloadIDStr, executi return nil } -func (bb *BlockBuilder) validateExecutionPayload(executionPayload engine.ExecutableData, head *types.ExecutionHead) error { - if executionPayload.Number != head.BlockHeight+1 { - return fmt.Errorf("invalid block height: %d, expected: %d", executionPayload.Number, head.BlockHeight+1) +func (bb *BlockBuilder) validateExecutionPayload(executionPayload engine.ExecutableData) error { + if executionPayload.Number != bb.executionHead.BlockHeight+1 { + return fmt.Errorf("invalid block height: %d, expected: %d", executionPayload.Number, bb.executionHead.BlockHeight+1) } - if executionPayload.ParentHash != common.Hash(head.BlockHash) { - return fmt.Errorf("invalid parent hash: %s, head: %s", executionPayload.ParentHash, head.BlockHash) + if executionPayload.ParentHash != common.Hash(bb.executionHead.BlockHash) { + return fmt.Errorf("invalid parent hash: %s, head: %s", executionPayload.ParentHash, bb.executionHead.BlockHash) } - minTimestamp := head.BlockTime + 1 + minTimestamp := bb.executionHead.BlockTime + 1 if executionPayload.Timestamp < minTimestamp && executionPayload.Number != 1 { return fmt.Errorf("invalid timestamp: %d, min: %d", executionPayload.Timestamp, minTimestamp) } - hash := common.BytesToHash(head.BlockHash) + hash := common.BytesToHash(bb.executionHead.BlockHash) if executionPayload.Random != hash { - return fmt.Errorf("invalid random: %s, head: %s", executionPayload.Random, head.BlockHash) + return fmt.Errorf("invalid random: %s, head: %s", executionPayload.Random, bb.executionHead.BlockHash) } return nil } @@ -493,10 +460,11 @@ func (bb *BlockBuilder) selectRetryFunction(ctx context.Context, msgID string) f } } -func (bb *BlockBuilder) pushNewPayload(ctx context.Context, executionPayload engine.ExecutableData, hash common.Hash, retryFunc func(f func() error) error) error { +func (bb *BlockBuilder) pushNewPayload(ctx context.Context, executionPayload engine.ExecutableData, retryFunc func(f func() error) error) error { emptyVersionHashes := []common.Hash{} + parentHash := common.BytesToHash(bb.executionHead.BlockHash) return retryFunc(func() error { - status, err := bb.engineCl.NewPayloadV4(ctx, executionPayload, emptyVersionHashes, &hash, []hexutil.Bytes{}) + status, err := bb.engineCl.NewPayloadV4(ctx, executionPayload, emptyVersionHashes, &parentHash, []hexutil.Bytes{}) bb.logger.Debug("newPayload result", "status", status.Status, "validationError", status.ValidationError, @@ -534,23 +502,17 @@ func (bb *BlockBuilder) updateForkChoice(ctx context.Context, fcs engine.Forkcho }) } -func (bb *BlockBuilder) loadExecutionHead(ctx context.Context) (*types.ExecutionHead, error) { - if bb.executionHead != nil { - return bb.executionHead, nil - } - +func (bb *BlockBuilder) setExecutionHeadFromRPC(ctx context.Context) error { header, err := bb.engineCl.HeaderByNumber(ctx, nil) // nil for the latest block if err != nil { - return nil, fmt.Errorf("failed to get the latest block header: %w", err) + return fmt.Errorf("failed to get the latest block header: %w", err) } - bb.executionHead = &types.ExecutionHead{ BlockHeight: header.Number.Uint64(), BlockHash: header.Hash().Bytes(), BlockTime: header.Time, } - - return bb.executionHead, nil + return nil } func (bb *BlockBuilder) GetExecutionHead() *types.ExecutionHead { diff --git a/cl/blockbuilder/blockbuilder_test.go b/cl/blockbuilder/blockbuilder_test.go index 45c34950b..fa2a5028e 100644 --- a/cl/blockbuilder/blockbuilder_test.go +++ b/cl/blockbuilder/blockbuilder_test.go @@ -97,7 +97,6 @@ func TestBlockBuilder_startBuild(t *testing.T) { engineCl: mockEngineClient, rpcClient: createMockRPCClient(), buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, } timestamp := time.Now() @@ -117,7 +116,9 @@ func TestBlockBuilder_startBuild(t *testing.T) { } mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(forkChoiceResponse, nil) - resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli())) + blockBuilder.executionHead = executionHead + + resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli())) require.NoError(t, err) assert.Equal(t, forkChoiceResponse, resp) @@ -162,7 +163,6 @@ func TestBlockBuilder_getPayload(t *testing.T) { engineCl: mockEngineClient, rpcClient: createMockRPCClient(), buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, } @@ -229,7 +229,6 @@ func TestBlockBuilder_FinalizeBlock(t *testing.T) { engineCl: mockEngineClient, rpcClient: createMockRPCClient(), buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, } @@ -312,7 +311,6 @@ func TestBlockBuilder_startBuild_ForkchoiceUpdatedError(t *testing.T) { engineCl: mockEngineClient, rpcClient: createMockRPCClient(), buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, } @@ -327,7 +325,9 @@ func TestBlockBuilder_startBuild_ForkchoiceUpdatedError(t *testing.T) { mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(engine.ForkChoiceResponse{}, errors.New("engine error")) - resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli())) + blockBuilder.executionHead = executionHead + + resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli())) require.Error(t, err) assert.Contains(t, err.Error(), "forkchoice update") @@ -357,7 +357,6 @@ func TestBlockBuilder_startBuild_InvalidPayloadStatus(t *testing.T) { engineCl: mockEngineClient, rpcClient: createMockRPCClient(), buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, } @@ -378,7 +377,9 @@ func TestBlockBuilder_startBuild_InvalidPayloadStatus(t *testing.T) { } mockEngineClient.On("ForkchoiceUpdatedV3", mock.Anything, expectedFCS, mock.MatchedBy(matchPayloadAttributes(hash, executionHead.BlockTime))).Return(forkChoiceResponse, nil) - resp, err := blockBuilder.startBuild(ctx, executionHead, uint64(timestamp.UnixMilli())) + blockBuilder.executionHead = executionHead + + resp, err := blockBuilder.startBuild(ctx, uint64(timestamp.UnixMilli())) require.NoError(t, err) assert.Equal(t, forkChoiceResponse, resp) @@ -459,7 +460,6 @@ func TestBlockBuilder_FinalizeBlock_InvalidBlockHeight(t *testing.T) { engineCl: mockEngineClient, rpcClient: createMockRPCClient(), buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, } @@ -515,7 +515,6 @@ func TestBlockBuilder_FinalizeBlock_NewPayloadInvalidStatus(t *testing.T) { engineCl: mockEngineClient, rpcClient: createMockRPCClient(), buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, } @@ -576,7 +575,6 @@ func TestBlockBuilder_FinalizeBlock_ForkchoiceUpdatedInvalidStatus(t *testing.T) engineCl: mockEngineClient, rpcClient: createMockRPCClient(), buildDelay: buildDelay, - buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, } diff --git a/cl/redisapp/leaderfollower/leaderfollower.go b/cl/redisapp/leaderfollower/leaderfollower.go index f7b38d30f..a34b5ac35 100644 --- a/cl/redisapp/leaderfollower/leaderfollower.go +++ b/cl/redisapp/leaderfollower/leaderfollower.go @@ -39,9 +39,6 @@ type blockBuilder interface { // Processes any unfinished payload from a previous session ProcessLastPayload(ctx context.Context) error - - // Sets the last call time to zero - SetLastCallTimeToZero() } // todo: work with block state through block builder, not directly @@ -263,7 +260,6 @@ func (lfm *LeaderFollowerManager) leaderWork(ctx context.Context) error { lfm.logger.Error("Leader: failed to reach geth node after max attempts, exiting") stopElecErr := lfm.leaderProc.Stop() // todo: refactor to generate timestamp outside blockbuilder - lfm.blockBuilder.SetLastCallTimeToZero() if stopElecErr != nil { lfm.logger.Error( "Leader: Failed to stop leader election",