Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions cl/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ type stateManager interface {
ResetBlockState(ctx context.Context) error
}

type rpcClient interface {
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}

type BlockBuilder struct {
stateManager stateManager
engineCl EngineClient
rpcClient rpcClient
logger *slog.Logger
buildDelay time.Duration
buildEmptyBlocksDelay time.Duration
Expand All @@ -62,6 +67,7 @@ func NewBlockBuilder(
buildDelay,
buildDelayEmptyBlocks time.Duration,
feeReceipt string,
rpcClient rpcClient,
) *BlockBuilder {
return &BlockBuilder{
stateManager: stateManager,
Expand All @@ -72,6 +78,7 @@ func NewBlockBuilder(
buildEmptyBlocksDelay: buildDelayEmptyBlocks,
feeRecipient: common.HexToAddress(feeReceipt),
lastBlockTime: time.Now().Add(-buildDelayEmptyBlocks),
rpcClient: rpcClient,
}
}

Expand Down Expand Up @@ -121,6 +128,31 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
)
currentCallTime := time.Now()

mempoolStatus, err := bb.GetMempoolStatus(ctx)
if err != nil {
return fmt.Errorf("failed to get pending transaction count: %w", err)
}
bb.logger.Debug("GetMempoolStatus rpc duration", "duration", time.Since(currentCallTime))

if mempoolStatus.Pending == 0 {
timeSinceLastBlock := currentCallTime.Sub(bb.lastBlockTime)
if timeSinceLastBlock < bb.buildEmptyBlocksDelay {
bb.logger.Debug(
"Leader: Skipping empty block",
"timeSinceLastBlock", timeSinceLastBlock,
"pendingTxes", mempoolStatus.Pending,
"queuedTxes", mempoolStatus.Queued,
)
return ErrEmptyBlock
}
bb.logger.Info(
"Leader: Empty block will be created",
"timeSinceLastBlock", timeSinceLastBlock,
"pendingTxes", mempoolStatus.Pending,
"queuedTxes", mempoolStatus.Queued,
)
}

// Load execution head to get previous block timestamp
err = util.RetryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
head, err = bb.loadExecutionHead(ctx)
Expand Down Expand Up @@ -221,17 +253,6 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
return fmt.Errorf("failed to get payload: %w", err)
}

hasTransactions := len(payloadResp.ExecutionPayload.Transactions) > 0
now := time.Now()
timeSinceLastBlock := now.Sub(bb.lastBlockTime)
if !hasTransactions && timeSinceLastBlock < bb.buildEmptyBlocksDelay {
bb.logger.Info(
"Leader: Skipping empty block",
"timeSinceLastBlock", timeSinceLastBlock,
)
return ErrEmptyBlock
}

payloadData, err := msgpack.Marshal(payloadResp.ExecutionPayload)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
Expand All @@ -255,7 +276,7 @@ func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
"PayloadID", payloadIDStr,
)

bb.lastBlockTime = now
bb.lastBlockTime = time.Now()
return nil
}

Expand Down Expand Up @@ -535,3 +556,17 @@ func (bb *BlockBuilder) loadExecutionHead(ctx context.Context) (*types.Execution
func (bb *BlockBuilder) GetExecutionHead() *types.ExecutionHead {
return bb.executionHead
}

type MempoolStatus struct {
Pending hexutil.Uint64 `json:"pending"`
Queued hexutil.Uint64 `json:"queued"`
}

func (bb *BlockBuilder) GetMempoolStatus(ctx context.Context) (*MempoolStatus, error) {
var result MempoolStatus
err := bb.rpcClient.CallContext(ctx, &result, "txpool_status")
if err != nil {
return nil, err
}
return &result, nil
}
26 changes: 26 additions & 0 deletions cl/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ func (m *MockEngineClient) HeaderByNumber(ctx context.Context, number *big.Int)
return args.Get(0).(*etypes.Header), args.Error(1)
}

func createMockRPCClient() rpcClient {
mockRPC := &MockRPCClient{}
return mockRPC
}

type MockRPCClient struct{}

func (m *MockRPCClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
if method == "txpool_status" && result != nil {
if mempoolStatus, ok := result.(*MempoolStatus); ok {
mempoolStatus.Pending = 1
mempoolStatus.Queued = 0
}
}
return nil
}

func TestBlockBuilder_startBuild(t *testing.T) {
ctx := context.Background()

Expand All @@ -78,6 +95,7 @@ func TestBlockBuilder_startBuild(t *testing.T) {
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
Expand Down Expand Up @@ -142,6 +160,7 @@ func TestBlockBuilder_getPayload(t *testing.T) {
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
Expand Down Expand Up @@ -208,6 +227,7 @@ func TestBlockBuilder_FinalizeBlock(t *testing.T) {
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
Expand Down Expand Up @@ -290,6 +310,7 @@ func TestBlockBuilder_startBuild_ForkchoiceUpdatedError(t *testing.T) {
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
Expand Down Expand Up @@ -334,6 +355,7 @@ func TestBlockBuilder_startBuild_InvalidPayloadStatus(t *testing.T) {
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
Expand Down Expand Up @@ -383,6 +405,7 @@ func TestBlockBuilder_getPayload_GetPayloadUnknownPayload(t *testing.T) {
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: time.Duration(1 * time.Second),
logger: stLog,
}
Expand Down Expand Up @@ -434,6 +457,7 @@ func TestBlockBuilder_FinalizeBlock_InvalidBlockHeight(t *testing.T) {
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
Expand Down Expand Up @@ -489,6 +513,7 @@ func TestBlockBuilder_FinalizeBlock_NewPayloadInvalidStatus(t *testing.T) {
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
Expand Down Expand Up @@ -549,6 +574,7 @@ func TestBlockBuilder_FinalizeBlock_ForkchoiceUpdatedInvalidStatus(t *testing.T)
blockBuilder := &BlockBuilder{
stateManager: stateManager,
engineCl: mockEngineClient,
rpcClient: createMockRPCClient(),
buildDelay: buildDelay,
buildDelayMs: uint64(buildDelay.Milliseconds()),
logger: stLog,
Expand Down
18 changes: 18 additions & 0 deletions cl/cmd/singlenode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ var (
},
})

nonAuthRpcUrlFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "non-auth-rpc-url",
Usage: "Non-authenticated Ethereum RPC URL (e.g., http://localhost:8545)",
EnvVars: []string{"LEADER_NON_AUTH_RPC_URL"},
Value: "http://localhost:8545",
})

ethClientURLFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "eth-client-url",
Usage: "Ethereum Execution client Engine API URL (e.g., http://localhost:8551)",
Expand Down Expand Up @@ -197,6 +204,13 @@ var (
},
})

txPoolPollingIntervalFlag = altsrc.NewDurationFlag(&cli.DurationFlag{
Name: "tx-pool-polling-interval",
Usage: "Wait interval for polling the tx pool while there are no pending transactions (e.g., '5ms')",
EnvVars: []string{"LEADER_TX_POOL_POLLING_INTERVAL"},
Value: 5 * time.Millisecond,
})

// Member node specific flags
leaderAPIURLFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "leader-api-url",
Expand Down Expand Up @@ -238,6 +252,8 @@ func main() {
healthAddrPortFlag,
postgresDSNFlag,
apiAddrFlag,
nonAuthRpcUrlFlag,
txPoolPollingIntervalFlag,
}

memberFlags := []cli.Flag{
Expand Down Expand Up @@ -337,6 +353,8 @@ func startLeaderNode(c *cli.Context) error {
HealthAddr: c.String(healthAddrPortFlag.Name),
PostgresDSN: c.String(postgresDSNFlag.Name),
APIAddr: c.String(apiAddrFlag.Name),
NonAuthRpcURL: c.String(nonAuthRpcUrlFlag.Name),
TxPoolPollingInterval: c.Duration(txPoolPollingIntervalFlag.Name),
}

logger.Info("Starting leader node with configuration", "config", cfg)
Expand Down
2 changes: 1 addition & 1 deletion cl/redisapp/rapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewMevCommitChain(
)
return nil, err
}
blockBuilder := blockbuilder.NewBlockBuilder(coordinator, engineCL, logger, buildDelay, buildDelayEmptyBlocks, feeReceipt)
blockBuilder := blockbuilder.NewBlockBuilder(coordinator, engineCL, logger, buildDelay, buildDelayEmptyBlocks, feeReceipt, nil)

lfm, err := leaderfollower.NewLeaderFollowerManager(
instanceID,
Expand Down
24 changes: 23 additions & 1 deletion cl/singlenode/singlenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/rpc"
"github.com/primev/mev-commit/cl/blockbuilder"
"github.com/primev/mev-commit/cl/ethclient"
"github.com/primev/mev-commit/cl/singlenode/api"
Expand All @@ -35,6 +36,8 @@ type Config struct {
HealthAddr string
PostgresDSN string
APIAddr string
NonAuthRpcURL string
TxPoolPollingInterval time.Duration
}

type BlockBuilder interface {
Expand All @@ -58,6 +61,7 @@ type SingleNodeApp struct {
wg sync.WaitGroup
connectionStatus sync.Mutex
connectionRefused bool
rpcClient *rpc.Client
}

// NewSingleNodeApp creates and initializes a new SingleNodeApp.
Expand Down Expand Up @@ -88,6 +92,16 @@ func NewSingleNodeApp(
return nil, err
}

rpcClient, err := rpc.DialContext(ctx, cfg.NonAuthRpcURL)
if err != nil {
cancel()
logger.Error(
"failed to create non-authenticated Ethereum RPC client",
"error", err,
)
return nil, err
}

stateMgr := localstate.NewLocalStateManager(logger.With("component", "LocalStateManager"))
bb := blockbuilder.NewBlockBuilder(
stateMgr,
Expand All @@ -96,6 +110,7 @@ func NewSingleNodeApp(
cfg.EVMBuildDelay,
cfg.EVMBuildDelayEmptyBlocks,
cfg.PriorityFeeReceipt,
rpcClient,
)

var pRepo types.PayloadRepository
Expand Down Expand Up @@ -138,6 +153,7 @@ func NewSingleNodeApp(
appCtx: ctx,
cancel: cancel,
connectionRefused: false,
rpcClient: rpcClient,
}, nil
}

Expand Down Expand Up @@ -270,7 +286,8 @@ func (app *SingleNodeApp) runLoop() {

if err != nil {
if errors.Is(err, blockbuilder.ErrEmptyBlock) {
app.logger.Info("empty block produced, waiting for new payload")
app.logger.Debug("no pending transactions, will try again after timeout", "timeout", app.cfg.TxPoolPollingInterval)
time.Sleep(app.cfg.TxPoolPollingInterval)
continue
} else if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
app.logger.Info("context canceled or deadline exceeded, stopping block production")
Expand Down Expand Up @@ -371,5 +388,10 @@ func (app *SingleNodeApp) Stop() {
}
}

if app.rpcClient != nil {
app.rpcClient.Close()
app.logger.Info("Ethereum client closed.")
}

app.logger.Info("SingleNodeApp stopped.")
}
10 changes: 10 additions & 0 deletions cl/singlenode/singlenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,24 @@ func TestNewSingleNodeApp(t *testing.T) {
ctx := context.Background()
logger := setupTestLogger()

mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":{"pending":"0x1","queued":"0x0"}}`))
require.NoError(t, err)
}))
defer mockServer.Close()

validCfg := Config{
InstanceID: "test-instance",
EthClientURL: "http://localhost:8545",
NonAuthRpcURL: mockServer.URL,
JWTSecret: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
EVMBuildDelay: time.Second,
EVMBuildDelayEmptyBlocks: time.Second * 2,
PriorityFeeReceipt: "0x1234567890abcdef1234567890abcdef12345678",
HealthAddr: ":8080",
TxPoolPollingInterval: time.Second,
}

app, err := NewSingleNodeApp(ctx, validCfg, logger)
Expand Down
Loading