diff --git a/cl/blockbuilder/blockbuilder.go b/cl/blockbuilder/blockbuilder.go index dbbe775fd..2d6fcd239 100644 --- a/cl/blockbuilder/blockbuilder.go +++ b/cl/blockbuilder/blockbuilder.go @@ -43,9 +43,14 @@ type stateManager interface { ResetBlockState(ctx context.Context) error } +type rpcClient interface { + CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error +} + type BlockBuilder struct { stateManager stateManager engineCl EngineClient + rpcClient rpcClient logger *slog.Logger buildDelay time.Duration buildEmptyBlocksDelay time.Duration @@ -62,6 +67,7 @@ func NewBlockBuilder( buildDelay, buildDelayEmptyBlocks time.Duration, feeReceipt string, + rpcClient rpcClient, ) *BlockBuilder { return &BlockBuilder{ stateManager: stateManager, @@ -72,6 +78,7 @@ func NewBlockBuilder( buildEmptyBlocksDelay: buildDelayEmptyBlocks, feeRecipient: common.HexToAddress(feeReceipt), lastBlockTime: time.Now().Add(-buildDelayEmptyBlocks), + rpcClient: rpcClient, } } @@ -121,6 +128,31 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error { ) currentCallTime := time.Now() + mempoolStatus, err := bb.GetMempoolStatus(ctx) + if err != nil { + return fmt.Errorf("failed to get pending transaction count: %w", err) + } + bb.logger.Debug("GetMempoolStatus rpc duration", "duration", time.Since(currentCallTime)) + + if mempoolStatus.Pending == 0 { + timeSinceLastBlock := currentCallTime.Sub(bb.lastBlockTime) + if timeSinceLastBlock < bb.buildEmptyBlocksDelay { + bb.logger.Debug( + "Leader: Skipping empty block", + "timeSinceLastBlock", timeSinceLastBlock, + "pendingTxes", mempoolStatus.Pending, + "queuedTxes", mempoolStatus.Queued, + ) + return ErrEmptyBlock + } + bb.logger.Info( + "Leader: Empty block will be created", + "timeSinceLastBlock", timeSinceLastBlock, + "pendingTxes", mempoolStatus.Pending, + "queuedTxes", mempoolStatus.Queued, + ) + } + // Load execution head to get previous block timestamp err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error { head, err = bb.loadExecutionHead(ctx) @@ -221,17 +253,6 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error { return fmt.Errorf("failed to get payload: %w", err) } - hasTransactions := len(payloadResp.ExecutionPayload.Transactions) > 0 - now := time.Now() - timeSinceLastBlock := now.Sub(bb.lastBlockTime) - if !hasTransactions && timeSinceLastBlock < bb.buildEmptyBlocksDelay { - bb.logger.Info( - "Leader: Skipping empty block", - "timeSinceLastBlock", timeSinceLastBlock, - ) - return ErrEmptyBlock - } - payloadData, err := msgpack.Marshal(payloadResp.ExecutionPayload) if err != nil { return fmt.Errorf("failed to marshal payload: %w", err) @@ -255,7 +276,7 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error { "PayloadID", payloadIDStr, ) - bb.lastBlockTime = now + bb.lastBlockTime = time.Now() return nil } @@ -535,3 +556,17 @@ func (bb *BlockBuilder) loadExecutionHead(ctx context.Context) (*types.Execution func (bb *BlockBuilder) GetExecutionHead() *types.ExecutionHead { return bb.executionHead } + +type MempoolStatus struct { + Pending hexutil.Uint64 `json:"pending"` + Queued hexutil.Uint64 `json:"queued"` +} + +func (bb *BlockBuilder) GetMempoolStatus(ctx context.Context) (*MempoolStatus, error) { + var result MempoolStatus + err := bb.rpcClient.CallContext(ctx, &result, "txpool_status") + if err != nil { + return nil, err + } + return &result, nil +} diff --git a/cl/blockbuilder/blockbuilder_test.go b/cl/blockbuilder/blockbuilder_test.go index 93f69e1bb..45c34950b 100644 --- a/cl/blockbuilder/blockbuilder_test.go +++ b/cl/blockbuilder/blockbuilder_test.go @@ -59,6 +59,23 @@ func (m *MockEngineClient) HeaderByNumber(ctx context.Context, number *big.Int) return args.Get(0).(*etypes.Header), args.Error(1) } +func createMockRPCClient() rpcClient { + mockRPC := &MockRPCClient{} + return mockRPC +} + +type MockRPCClient struct{} + +func (m *MockRPCClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + if method == "txpool_status" && result != nil { + if mempoolStatus, ok := result.(*MempoolStatus); ok { + mempoolStatus.Pending = 1 + mempoolStatus.Queued = 0 + } + } + return nil +} + func TestBlockBuilder_startBuild(t *testing.T) { ctx := context.Background() @@ -78,6 +95,7 @@ func TestBlockBuilder_startBuild(t *testing.T) { blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: buildDelay, buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, @@ -142,6 +160,7 @@ func TestBlockBuilder_getPayload(t *testing.T) { blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: buildDelay, buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, @@ -208,6 +227,7 @@ func TestBlockBuilder_FinalizeBlock(t *testing.T) { blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: buildDelay, buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, @@ -290,6 +310,7 @@ func TestBlockBuilder_startBuild_ForkchoiceUpdatedError(t *testing.T) { blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: buildDelay, buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, @@ -334,6 +355,7 @@ func TestBlockBuilder_startBuild_InvalidPayloadStatus(t *testing.T) { blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: buildDelay, buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, @@ -383,6 +405,7 @@ func TestBlockBuilder_getPayload_GetPayloadUnknownPayload(t *testing.T) { blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: time.Duration(1 * time.Second), logger: stLog, } @@ -434,6 +457,7 @@ func TestBlockBuilder_FinalizeBlock_InvalidBlockHeight(t *testing.T) { blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: buildDelay, buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, @@ -489,6 +513,7 @@ func TestBlockBuilder_FinalizeBlock_NewPayloadInvalidStatus(t *testing.T) { blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: buildDelay, buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, @@ -549,6 +574,7 @@ func TestBlockBuilder_FinalizeBlock_ForkchoiceUpdatedInvalidStatus(t *testing.T) blockBuilder := &BlockBuilder{ stateManager: stateManager, engineCl: mockEngineClient, + rpcClient: createMockRPCClient(), buildDelay: buildDelay, buildDelayMs: uint64(buildDelay.Milliseconds()), logger: stLog, diff --git a/cl/cmd/singlenode/main.go b/cl/cmd/singlenode/main.go index 3664519e0..3d4e1b306 100644 --- a/cl/cmd/singlenode/main.go +++ b/cl/cmd/singlenode/main.go @@ -57,6 +57,13 @@ var ( }, }) + nonAuthRpcUrlFlag = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "non-auth-rpc-url", + Usage: "Non-authenticated Ethereum RPC URL (e.g., http://localhost:8545)", + EnvVars: []string{"LEADER_NON_AUTH_RPC_URL"}, + Value: "http://localhost:8545", + }) + ethClientURLFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "eth-client-url", Usage: "Ethereum Execution client Engine API URL (e.g., http://localhost:8551)", @@ -197,6 +204,13 @@ var ( }, }) + txPoolPollingIntervalFlag = altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: "tx-pool-polling-interval", + Usage: "Wait interval for polling the tx pool while there are no pending transactions (e.g., '5ms')", + EnvVars: []string{"LEADER_TX_POOL_POLLING_INTERVAL"}, + Value: 5 * time.Millisecond, + }) + // Member node specific flags leaderAPIURLFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "leader-api-url", @@ -238,6 +252,8 @@ func main() { healthAddrPortFlag, postgresDSNFlag, apiAddrFlag, + nonAuthRpcUrlFlag, + txPoolPollingIntervalFlag, } memberFlags := []cli.Flag{ @@ -337,6 +353,8 @@ func startLeaderNode(c *cli.Context) error { HealthAddr: c.String(healthAddrPortFlag.Name), PostgresDSN: c.String(postgresDSNFlag.Name), APIAddr: c.String(apiAddrFlag.Name), + NonAuthRpcURL: c.String(nonAuthRpcUrlFlag.Name), + TxPoolPollingInterval: c.Duration(txPoolPollingIntervalFlag.Name), } logger.Info("Starting leader node with configuration", "config", cfg) diff --git a/cl/redisapp/rapp.go b/cl/redisapp/rapp.go index dabf3fc25..b2057d690 100644 --- a/cl/redisapp/rapp.go +++ b/cl/redisapp/rapp.go @@ -76,7 +76,7 @@ func NewMevCommitChain( ) return nil, err } - blockBuilder := blockbuilder.NewBlockBuilder(coordinator, engineCL, logger, buildDelay, buildDelayEmptyBlocks, feeReceipt) + blockBuilder := blockbuilder.NewBlockBuilder(coordinator, engineCL, logger, buildDelay, buildDelayEmptyBlocks, feeReceipt, nil) lfm, err := leaderfollower.NewLeaderFollowerManager( instanceID, diff --git a/cl/singlenode/singlenode.go b/cl/singlenode/singlenode.go index bbe13a016..72252e774 100644 --- a/cl/singlenode/singlenode.go +++ b/cl/singlenode/singlenode.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/rpc" "github.com/primev/mev-commit/cl/blockbuilder" "github.com/primev/mev-commit/cl/ethclient" "github.com/primev/mev-commit/cl/singlenode/api" @@ -35,6 +36,8 @@ type Config struct { HealthAddr string PostgresDSN string APIAddr string + NonAuthRpcURL string + TxPoolPollingInterval time.Duration } type BlockBuilder interface { @@ -58,6 +61,7 @@ type SingleNodeApp struct { wg sync.WaitGroup connectionStatus sync.Mutex connectionRefused bool + rpcClient *rpc.Client } // NewSingleNodeApp creates and initializes a new SingleNodeApp. @@ -88,6 +92,16 @@ func NewSingleNodeApp( return nil, err } + rpcClient, err := rpc.DialContext(ctx, cfg.NonAuthRpcURL) + if err != nil { + cancel() + logger.Error( + "failed to create non-authenticated Ethereum RPC client", + "error", err, + ) + return nil, err + } + stateMgr := localstate.NewLocalStateManager(logger.With("component", "LocalStateManager")) bb := blockbuilder.NewBlockBuilder( stateMgr, @@ -96,6 +110,7 @@ func NewSingleNodeApp( cfg.EVMBuildDelay, cfg.EVMBuildDelayEmptyBlocks, cfg.PriorityFeeReceipt, + rpcClient, ) var pRepo types.PayloadRepository @@ -138,6 +153,7 @@ func NewSingleNodeApp( appCtx: ctx, cancel: cancel, connectionRefused: false, + rpcClient: rpcClient, }, nil } @@ -270,7 +286,8 @@ func (app *SingleNodeApp) runLoop() { if err != nil { if errors.Is(err, blockbuilder.ErrEmptyBlock) { - app.logger.Info("empty block produced, waiting for new payload") + app.logger.Debug("no pending transactions, will try again after timeout", "timeout", app.cfg.TxPoolPollingInterval) + time.Sleep(app.cfg.TxPoolPollingInterval) continue } else if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { app.logger.Info("context canceled or deadline exceeded, stopping block production") @@ -371,5 +388,10 @@ func (app *SingleNodeApp) Stop() { } } + if app.rpcClient != nil { + app.rpcClient.Close() + app.logger.Info("Ethereum client closed.") + } + app.logger.Info("SingleNodeApp stopped.") } diff --git a/cl/singlenode/singlenode_test.go b/cl/singlenode/singlenode_test.go index e11a7bec6..73e6ddd86 100644 --- a/cl/singlenode/singlenode_test.go +++ b/cl/singlenode/singlenode_test.go @@ -65,14 +65,24 @@ func TestNewSingleNodeApp(t *testing.T) { ctx := context.Background() logger := setupTestLogger() + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":{"pending":"0x1","queued":"0x0"}}`)) + require.NoError(t, err) + })) + defer mockServer.Close() + validCfg := Config{ InstanceID: "test-instance", EthClientURL: "http://localhost:8545", + NonAuthRpcURL: mockServer.URL, JWTSecret: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", EVMBuildDelay: time.Second, EVMBuildDelayEmptyBlocks: time.Second * 2, PriorityFeeReceipt: "0x1234567890abcdef1234567890abcdef12345678", HealthAddr: ":8080", + TxPoolPollingInterval: time.Second, } app, err := NewSingleNodeApp(ctx, validCfg, logger)