diff --git a/cl/README.md b/cl/README.md index b17871058..3b0c65042 100644 --- a/cl/README.md +++ b/cl/README.md @@ -230,7 +230,36 @@ Run the client with the configuration file: ## Running the Single Node Application (snode) -The single node application provides a simplified MEV-commit setup that doesn't require Redis. +The single node application provides a simplified MEV-commit setup that doesn't require Redis, but using Postgres to save payloads, so member nodes could request that payload later on. + +## Architecture Overview + +The application supports two operational modes: + +1. **Leader Node**: Produces blocks and serves payloads to member nodes via API +2. **Member Node**: Follows a leader node by polling for and processing payloads sequentially + +## Running Postgres + +We will use Docker Compose to run Postgres + +### Postgres Docker Compose Configuration + +Postgres is configured in `postgres` folder within `docker-compose.yml` + +### Start Postgres + +Stop any existing containers and remove volumes: + +```bash +docker compose down -v +``` + +Start Postgres in detached mode: + +```bash +docker compose up -d +``` ### Build the Single Node Application @@ -239,84 +268,187 @@ go mod tidy go build -o snode main.go ``` -### Configuration +### SNode Configuration The snode application can be configured via command-line flags, environment variables, or a YAML configuration file. -#### Command-Line Flags +### Common Configuration Flags -- `--instance-id`: **(Required)** Unique instance ID for this node. -- `--eth-client-url`: Ethereum Execution client Engine API URL (default: `http://localhost:8551`). -- `--jwt-secret`: Hex-encoded JWT secret for Ethereum Execution client Engine API (default: `13373d9a0257983ad150392d7ddb2f9172c9396b4c450e26af469d123c7aaa5c`). -- `--priority-fee-recipient`: **(Required)** Ethereum address for receiving priority fees (block proposer fee). -- `--evm-build-delay`: Delay after initiating payload construction before calling getPayload (default: `100ms`). -- `--evm-build-delay-empty-block`: Minimum time since last block to build an empty block (default: `2s`, 0 to disable skipping). -- `--health-addr`: Address for health check endpoint (default: `:8080`). -- `--config`: Path to a YAML configuration file. -- `--log-fmt`: Log format ('text' or 'json') (default: `text`). -- `--log-level`: Log level ('debug', 'info', 'warn', 'error') (default: `info`). -- `--log-tags`: Comma-separated log tags (e.g., `env:prod,service:snode`). +- `--instance-id`: **(Required)** Unique instance ID for this node +- `--eth-client-url`: Ethereum Execution client Engine API URL (default: `http://localhost:8551`) +- `--jwt-secret`: Hex-encoded JWT secret for Ethereum Execution client Engine API (default: `13373d9a0257983ad150392d7ddb2f9172c9396b4c450e26af469d123c7aaa5c`) +- `--health-addr`: Address for health check endpoint (default: `:8080`) +- `--config`: Path to a YAML configuration file +- `--log-fmt`: Log format ('text' or 'json') (default: `text`) +- `--log-level`: Log level ('debug', 'info', 'warn', 'error') (default: `info`) +- `--log-tags`: Comma-separated log tags (e.g., `env:prod,service:snode`) -#### Environment Variables +### Leader Node Specific Flags + +- `--priority-fee-recipient`: **(Required)** Ethereum address for receiving priority fees (block proposer fee) +- `--evm-build-delay`: Delay after initiating payload construction before calling getPayload (default: `100ms`) +- `--evm-build-delay-empty-block`: Minimum time since last block to build an empty block (default: `2s`, 0 to disable skipping) +- `--postgres-dsn`: PostgreSQL DSN for storing payloads (optional, e.g., `postgres://user:pass@host:port/dbname?sslmode=disable`) +- `--api-addr`: Address for member node API endpoint (default: `:9090`, empty to disable) + +### Member Node Specific Flags + +- `--leader-api-url`: **(Required)** Leader node API URL for member nodes (e.g., `http://leader:9090`) +- `--poll-interval`: Interval for polling leader node for new payloads (default: `1s`) + +### SNode Environment Variables + +**Common:** -- `SNODE_INSTANCE_ID` -- `SNODE_ETH_CLIENT_URL` -- `SNODE_JWT_SECRET` -- `SNODE_PRIORITY_FEE_RECIPIENT` -- `SNODE_EVM_BUILD_DELAY` -- `SNODE_EVM_BUILD_DELAY_EMPTY_BLOCK` -- `SNODE_HEALTH_ADDR` -- `SNODE_CONFIG` +- `LEADER_INSTANCE_ID` +- `LEADER_ETH_CLIENT_URL` +- `LEADER_JWT_SECRET` +- `LEADER_HEALTH_ADDR` +- `LEADER_CONFIG` - `MEV_COMMIT_LOG_FMT` - `MEV_COMMIT_LOG_LEVEL` - `MEV_COMMIT_LOG_TAGS` -### Run the Single Node Application +**Leader Node:** -Run the application using command-line flags: +- `LEADER_PRIORITY_FEE_RECIPIENT` +- `LEADER_EVM_BUILD_DELAY` +- `LEADER_EVM_BUILD_DELAY_EMPTY_BLOCK` +- `LEADER_POSTGRES_DSN` +- `LEADER_API_ADDR` + +**Member Node:** + +- `MEMBER_LEADER_API_URL` +- `MEMBER_POLL_INTERVAL` + +## Running the Application + +### Leader Node + +Run as a leader node (produces blocks and serves API for member nodes): ```bash -./snode start \ - --instance-id "snode1" \ +./snode leader \ + --instance-id "leader1" \ --eth-client-url "http://localhost:8551" \ --jwt-secret "13373d9a0257983ad150392d7ddb2f9172c9396b4c450e26af469d123c7aaa5c" \ --priority-fee-recipient "0xYourEthereumAddress" \ --evm-build-delay "100ms" \ --evm-build-delay-empty-block "2s" \ + --api-addr ":9090" \ --log-level "info" ``` +### Member Node + +Run as a member node (follows leader by polling for payloads): + +```bash +./snode member \ + --instance-id "member1" \ + --eth-client-url "http://localhost:8552" \ + --jwt-secret "13373d9a0257983ad150392d7ddb2f9172c9396b4c450e26af469d123c7aaa5c" \ + --leader-api-url "http://localhost:9090" \ + --poll-interval "1s" \ + --log-level "info" +``` + +### Backward Compatibility + +The legacy `start` command is still supported and runs as a leader node: + +```bash +./snode start \ + --instance-id "snode1" \ + --priority-fee-recipient "0xYourEthereumAddress" \ + # ... other flags +``` + **Note**: - Replace `"0xYourEthereumAddress"` with a valid Ethereum address for receiving priority fees. - The JWT secret should be a 64-character hex string (32 bytes). -### Using a Configuration File for snode +## Configuration Files -Create a `snode-config.yaml` file: +### Leader Node Configuration + +Create a `leader-config.yaml` file: ```yaml -instance-id: "snode1" +instance-id: "leader1" eth-client-url: "http://localhost:8551" jwt-secret: "13373d9a0257983ad150392d7ddb2f9172c9396b4c450e26af469d123c7aaa5c" priority-fee-recipient: "0xYourEthereumAddress" evm-build-delay: "100ms" evm-build-delay-empty-block: "2s" +api-addr: ":9090" +postgres-dsn: "postgres://user:pass@localhost:5432/mevcommit?sslmode=disable" +health-addr: ":8080" log-fmt: "text" log-level: "info" -log-tags: "env:dev,service:snode" +log-tags: "env:dev,service:leader" ``` -Run the application with the configuration file: +Run with configuration file: ```bash -./snode start --config snode-config.yaml +./snode leader --config leader-config.yaml ``` +### Member Node Configuration + +Create a `member-config.yaml` file: + +```yaml +instance-id: "member1" +eth-client-url: "http://localhost:8552" +jwt-secret: "13373d9a0257983ad150392d7ddb2f9172c9396b4c450e26af469d123c7aaa5c" +leader-api-url: "http://localhost:9090" +poll-interval: "1s" +health-addr: ":8081" +log-fmt: "text" +log-level: "info" +log-tags: "env:dev,service:member" +``` + +Run with configuration file: + +```bash +./snode member --config member-config.yaml +``` + +### Health Endpoints + +Both node types provide health check endpoints: + +- **Leader**: Returns 200 OK when operational, 503 when Ethereum client unavailable +- **Member**: Returns 200 OK when operational and leader available, 503 otherwise + +Access health endpoints at: `http://localhost:8080/health` (or configured port) + +## Multi-Node Setup Example + +For a complete leader-follower setup: + +1. **Start Leader Node**: + + ```bash + ./snode leader --instance-id "leader" --priority-fee-recipient "0xYourAddress" --api-addr ":9090" + ``` + +2. **Start Member Node(s)**: + + ```bash + ./snode member --instance-id "member1" --leader-api-url "http://localhost:9090" --eth-client-url "http://localhost:8552" --health-addr ":8081" + ``` + +Each member node should connect to its own Geth instance and configure unique health endpoints to avoid port conflicts. + ## Additional Notes - **Graceful Shutdown**: Both applications support graceful shutdown via SIGTERM or Ctrl+C. -- **Health Endpoint**: The snode application provides a health check endpoint at `/health` that returns a 200 OK response when the application is running normally, or a 503 Service Unavailable if there are connection issues with the Ethereum client. ## Conclusion diff --git a/cl/blockbuilder/blockbuilder.go b/cl/blockbuilder/blockbuilder.go index cf2a55d6e..370d154c9 100644 --- a/cl/blockbuilder/blockbuilder.go +++ b/cl/blockbuilder/blockbuilder.go @@ -531,3 +531,7 @@ func (bb *BlockBuilder) loadExecutionHead(ctx context.Context) (*types.Execution return bb.executionHead, nil } + +func (bb *BlockBuilder) GetExecutionHead() *types.ExecutionHead { + return bb.executionHead +} diff --git a/cl/cmd/singlenode/main.go b/cl/cmd/singlenode/main.go index 3ccd84215..04c2991a8 100644 --- a/cl/cmd/singlenode/main.go +++ b/cl/cmd/singlenode/main.go @@ -8,18 +8,22 @@ import ( "os" "os/signal" "slices" + "strconv" "strings" "syscall" "time" "github.com/primev/mev-commit/cl/singlenode" + "github.com/primev/mev-commit/cl/singlenode/membernode" "github.com/primev/mev-commit/x/util" "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" ) const ( - categoryDebug = "Debug" + categoryDebug = "Debug" + categoryDatabase = "Database" + categoryMember = "Member Node" ) var ( @@ -37,13 +41,13 @@ var ( configFlag = &cli.StringFlag{ Name: "config", Usage: "Path to YAML config file", - EnvVars: []string{"SNODE_CONFIG"}, + EnvVars: []string{"LEADER_CONFIG"}, } instanceIDFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "instance-id", Usage: "Unique instance ID for this node (for logging/identification)", - EnvVars: []string{"SNODE_INSTANCE_ID"}, + EnvVars: []string{"LEADER_INSTANCE_ID"}, Required: true, Action: func(_ *cli.Context, s string) error { if s == "" { @@ -56,7 +60,7 @@ var ( ethClientURLFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "eth-client-url", Usage: "Ethereum Execution client Engine API URL (e.g., http://localhost:8551)", - EnvVars: []string{"SNODE_ETH_CLIENT_URL"}, + EnvVars: []string{"LEADER_ETH_CLIENT_URL"}, Value: "http://localhost:8551", Action: func(_ *cli.Context, s string) error { if _, err := url.Parse(s); err != nil { @@ -69,7 +73,7 @@ var ( jwtSecretFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "jwt-secret", Usage: "Hex-encoded JWT secret for Ethereum Execution client Engine API", - EnvVars: []string{"SNODE_JWT_SECRET"}, + EnvVars: []string{"LEADER_JWT_SECRET"}, Value: "13373d9a0257983ad150392d7ddb2f9172c9396b4c450e26af469d123c7aaa5c", Action: func(_ *cli.Context, s string) error { if len(s) != 64 { @@ -121,21 +125,21 @@ var ( evmBuildDelayFlag = altsrc.NewDurationFlag(&cli.DurationFlag{ Name: "evm-build-delay", Usage: "Delay after initiating payload construction before calling getPayload (e.g., '200ms')", - EnvVars: []string{"SNODE_EVM_BUILD_DELAY"}, + EnvVars: []string{"LEADER_EVM_BUILD_DELAY"}, Value: 100 * time.Millisecond, }) evmBuildDelayEmptyBlockFlag = altsrc.NewDurationFlag(&cli.DurationFlag{ Name: "evm-build-delay-empty-block", Usage: "Minimum time since last block to build an empty block (0 to disable skipping, e.g., '2s')", - EnvVars: []string{"SNODE_EVM_BUILD_DELAY_EMPTY_BLOCK"}, + EnvVars: []string{"LEADER_EVM_BUILD_DELAY_EMPTY_BLOCK"}, Value: 2 * time.Second, }) priorityFeeReceiptFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "priority-fee-recipient", Usage: "Ethereum address for receiving priority fees (block proposer fee)", - EnvVars: []string{"SNODE_PRIORITY_FEE_RECIPIENT"}, + EnvVars: []string{"LEADER_PRIORITY_FEE_RECIPIENT"}, Required: true, Action: func(c *cli.Context, s string) error { if !strings.HasPrefix(s, "0x") || len(s) != 42 { @@ -152,22 +156,74 @@ var ( healthAddrPortFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "health-addr", Usage: "Address for health check endpoint (e.g., ':8080')", - EnvVars: []string{"SNODE_HEALTH_ADDR"}, + EnvVars: []string{"LEADER_HEALTH_ADDR"}, Value: ":8080", Action: func(_ *cli.Context, s string) error { if !strings.HasPrefix(s, ":") { - return fmt.Errorf("health-addr must start with ':'") + return fmt.Errorf("health-addr must start with ':' (e.g., ':8080')") + } + // Validate port number + portStr := s[1:] // Remove the ':' + if port, err := strconv.Atoi(portStr); err != nil || port < 1 || port > 65535 { + return fmt.Errorf("health-addr must be a valid port number (e.g., ':8080')") + } + return nil + }, + }) + + postgresDSNFlag = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "postgres-dsn", + Usage: "PostgreSQL DSN for storing payloads. If empty, saving to DB is disabled. " + + "(e.g., 'postgres://user:pass@host:port/dbname?sslmode=disable')", + EnvVars: []string{"LEADER_POSTGRES_DSN"}, + Value: "", // Default to empty, making it optional + Category: categoryDatabase, + }) + + apiAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "api-addr", + Usage: "Address for member node API endpoint (e.g., ':9090'). If empty, API is disabled.", + EnvVars: []string{"LEADER_API_ADDR"}, + Value: ":9090", + Action: func(_ *cli.Context, s string) error { + if s == "" { + return nil // Optional flag + } + if !strings.HasPrefix(s, ":") { + return fmt.Errorf("api-addr must start with ':'") + } + return nil + }, + }) + + // Member node specific flags + leaderAPIURLFlag = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "leader-api-url", + Usage: "Leader node API URL for member nodes (e.g., 'http://leader:9090')", + EnvVars: []string{"MEMBER_LEADER_API_URL"}, + Category: categoryMember, + Action: func(_ *cli.Context, s string) error { + if s == "" { + return nil // Will be validated in member command } if _, err := url.Parse(s); err != nil { - return fmt.Errorf("invalid health-addr: %v", err) + return fmt.Errorf("invalid leader-api-url: %v", err) } return nil }, }) + + pollIntervalFlag = altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: "poll-interval", + Usage: "Interval for polling leader node for new payloads (e.g., '1s')", + EnvVars: []string{"MEMBER_POLL_INTERVAL"}, + Value: 1 * time.Second, + Category: categoryMember, + }) ) func main() { - flags := []cli.Flag{ + leaderFlags := []cli.Flag{ configFlag, instanceIDFlag, ethClientURLFlag, @@ -179,17 +235,65 @@ func main() { evmBuildDelayEmptyBlockFlag, priorityFeeReceiptFlag, healthAddrPortFlag, + postgresDSNFlag, + apiAddrFlag, + } + + memberFlags := []cli.Flag{ + configFlag, + instanceIDFlag, + ethClientURLFlag, + jwtSecretFlag, + logFmtFlag, + logLevelFlag, + logTagsFlag, + healthAddrPortFlag, + leaderAPIURLFlag, + pollIntervalFlag, } app := &cli.App{ Name: "snode", Usage: "Single-node MEV-commit application", Commands: []*cli.Command{ + { + Name: "leader", + Usage: "Start as leader node (produces blocks)", + Flags: leaderFlags, + Before: altsrc.InitInputSourceWithContext(leaderFlags, + func(c *cli.Context) (altsrc.InputSourceContext, error) { + configFile := c.String(configFlag.Name) + if configFile != "" { + return altsrc.NewYamlSourceFromFile(configFile) + } + return &altsrc.MapInputSource{}, nil + }), + Action: func(c *cli.Context) error { + return startLeaderNode(c) + }, + }, + { + Name: "member", + Usage: "Start as member node (follows leader)", + Flags: memberFlags, + Before: altsrc.InitInputSourceWithContext(memberFlags, + func(c *cli.Context) (altsrc.InputSourceContext, error) { + configFile := c.String(configFlag.Name) + if configFile != "" { + return altsrc.NewYamlSourceFromFile(configFile) + } + return &altsrc.MapInputSource{}, nil + }), + Action: func(c *cli.Context) error { + return startMemberNode(c) + }, + }, + // Keep the old "start" command for backward compatibility { Name: "start", - Usage: "Start the snode node", - Flags: flags, - Before: altsrc.InitInputSourceWithContext(flags, + Usage: "Start as leader node (deprecated, use 'leader' instead)", + Flags: leaderFlags, + Before: altsrc.InitInputSourceWithContext(leaderFlags, func(c *cli.Context) (altsrc.InputSourceContext, error) { configFile := c.String(configFlag.Name) if configFile != "" { @@ -198,7 +302,7 @@ func main() { return &altsrc.MapInputSource{}, nil }), Action: func(c *cli.Context) error { - return startSingleNodeApplication(c) + return startLeaderNode(c) }, }, }, @@ -210,7 +314,7 @@ func main() { } } -func startSingleNodeApplication(c *cli.Context) error { +func startLeaderNode(c *cli.Context) error { logger, err := util.NewLogger( c.String(logLevelFlag.Name), c.String(logFmtFlag.Name), @@ -220,7 +324,7 @@ func startSingleNodeApplication(c *cli.Context) error { if err != nil { return fmt.Errorf("failed to create logger: %w", err) } - logger = logger.With("app", "snode") + logger = logger.With("app", "snode", "role", "leader") cfg := singlenode.Config{ InstanceID: c.String(instanceIDFlag.Name), @@ -230,9 +334,11 @@ func startSingleNodeApplication(c *cli.Context) error { EVMBuildDelayEmptyBlocks: c.Duration(evmBuildDelayEmptyBlockFlag.Name), PriorityFeeReceipt: c.String(priorityFeeReceiptFlag.Name), HealthAddr: c.String(healthAddrPortFlag.Name), + PostgresDSN: c.String(postgresDSNFlag.Name), + APIAddr: c.String(apiAddrFlag.Name), } - logger.Info("Starting snode with configuration", "config", cfg) // Be careful logging sensitive parts of config + logger.Info("Starting leader node with configuration", "config", cfg) // Create a root context that can be cancelled for graceful shutdown rootCtx, rootCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -248,9 +354,58 @@ func startSingleNodeApplication(c *cli.Context) error { <-rootCtx.Done() - logger.Info("Shutdown signal received, stopping snode...") + logger.Info("Shutdown signal received, stopping leader node...") snode.Stop() - logger.Info("SRApp shutdown completed.") + logger.Info("Leader node shutdown completed.") + return nil +} + +func startMemberNode(c *cli.Context) error { + leaderURL := c.String(leaderAPIURLFlag.Name) + if leaderURL == "" { + return fmt.Errorf("leader-api-url is required for member nodes") + } + + logger, err := util.NewLogger( + c.String(logLevelFlag.Name), + c.String(logFmtFlag.Name), + c.String(logTagsFlag.Name), + c.App.Writer, + ) + if err != nil { + return fmt.Errorf("failed to create logger: %w", err) + } + logger = logger.With("app", "snode", "role", "member") + + cfg := membernode.Config{ + InstanceID: c.String(instanceIDFlag.Name), + LeaderAPIURL: leaderURL, + EthClientURL: c.String(ethClientURLFlag.Name), + JWTSecret: c.String(jwtSecretFlag.Name), + HealthAddr: c.String(healthAddrPortFlag.Name), + PollInterval: c.Duration(pollIntervalFlag.Name), + } + + logger.Info("Starting member node", "config", cfg) + + // Create a root context that can be cancelled for graceful shutdown + rootCtx, rootCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer rootCancel() + + memberNode, err := membernode.NewMemberNodeApp(rootCtx, cfg, logger) + if err != nil { + logger.Error("Failed to initialize MemberNodeApp", "error", err) + return err + } + + memberNode.Start() + + <-rootCtx.Done() + + logger.Info("Shutdown signal received, stopping member node...") + memberNode.Stop() + + logger.Info("Member node shutdown completed.") return nil } diff --git a/cl/go.mod b/cl/go.mod index 95f097407..1f8c77bed 100644 --- a/cl/go.mod +++ b/cl/go.mod @@ -8,6 +8,7 @@ require ( github.com/go-redis/redismock/v9 v9.2.0 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/golang/mock v1.6.0 + github.com/lib/pq v1.10.9 github.com/redis/go-redis/v9 v9.6.1 github.com/urfave/cli/v2 v2.27.5 golang.org/x/tools v0.29.0 diff --git a/cl/go.sum b/cl/go.sum index 77dbe8528..d406a3e90 100644 --- a/cl/go.sum +++ b/cl/go.sum @@ -113,6 +113,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4= github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= diff --git a/cl/postgres/docker-compose.yml b/cl/postgres/docker-compose.yml new file mode 100644 index 000000000..77fad4d4d --- /dev/null +++ b/cl/postgres/docker-compose.yml @@ -0,0 +1,40 @@ +version: '3.8' + +services: + postgres: + image: postgres:17 + container_name: mev-commit-postgres + environment: + POSTGRES_DB: mevcommit + POSTGRES_USER: mevcommit + POSTGRES_PASSWORD: password123 + POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --lc-collate=C --lc-ctype=C" + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql:ro + networks: + - mev-commit-network + healthcheck: + test: ["CMD-SHELL", "pg_isready -U mevcommit -d mevcommit"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 30s + restart: unless-stopped + command: > + postgres + +volumes: + postgres_data: + driver: local + pgadmin_data: + driver: local + +networks: + mev-commit-network: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/16 diff --git a/cl/postgres/init-db.sql b/cl/postgres/init-db.sql new file mode 100644 index 000000000..8420f1349 --- /dev/null +++ b/cl/postgres/init-db.sql @@ -0,0 +1,21 @@ +-- init-db.sql +-- This script will be automatically executed when PostgreSQL starts for the first time + +-- Create the execution_payloads table with proper indexing +CREATE TABLE IF NOT EXISTS execution_payloads ( + id SERIAL PRIMARY KEY, + payload_id VARCHAR(66) UNIQUE NOT NULL, -- e.g., 0x... (32 bytes hex + 0x prefix) + raw_execution_payload TEXT NOT NULL, + block_height BIGINT NOT NULL UNIQUE, + inserted_at TIMESTAMPTZ DEFAULT NOW() +); + +-- Create indexes for efficient querying +CREATE INDEX IF NOT EXISTS idx_execution_payloads_block_height ON execution_payloads(block_height); +CREATE INDEX IF NOT EXISTS idx_execution_payloads_inserted_at ON execution_payloads(inserted_at); +CREATE INDEX IF NOT EXISTS idx_execution_payloads_payload_id ON execution_payloads(payload_id); + +-- Create a partial index for recent payloads (optimization for common queries) +CREATE INDEX IF NOT EXISTS idx_execution_payloads_recent +ON execution_payloads(block_height DESC) +WHERE inserted_at > NOW() - INTERVAL '24 hours'; diff --git a/cl/singlenode/api/api_client.go b/cl/singlenode/api/api_client.go new file mode 100644 index 000000000..7fbaac52d --- /dev/null +++ b/cl/singlenode/api/api_client.go @@ -0,0 +1,204 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "time" +) + +// PayloadResponse represents the API response for payload requests +type PayloadResponse struct { + PayloadID string `json:"payload_id"` + ExecutionPayload string `json:"execution_payload"` + BlockHeight uint64 `json:"block_height"` + Timestamp int64 `json:"timestamp"` +} + +// PayloadListResponse represents the response for multiple payloads +type PayloadListResponse struct { + Payloads []PayloadResponse `json:"payloads"` + HasMore bool `json:"has_more"` + NextHeight uint64 `json:"next_height,omitempty"` + TotalCount int `json:"total_count"` +} + +// ErrorResponse represents an error response +type ErrorResponse struct { + Error string `json:"error"` + Code int `json:"code"` + Message string `json:"message"` +} + +// PayloadClient handles communication with the leader node's API +type PayloadClient struct { + baseURL string + httpClient *http.Client + logger *slog.Logger +} + +// NewPayloadClient creates a new payload API client +func NewPayloadClient(baseURL string, logger *slog.Logger) *PayloadClient { + return &PayloadClient{ + baseURL: baseURL, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + logger: logger.With("component", "PayloadClient"), + } +} + +// GetLatestPayload fetches the latest payload from the leader node +func (pc *PayloadClient) GetLatestPayload(ctx context.Context) (*PayloadResponse, error) { + url := fmt.Sprintf("%s/api/v1/payload/latest", pc.baseURL) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := pc.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + //nolint:errcheck + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode != http.StatusOK { + var errorResp ErrorResponse + if err := json.Unmarshal(body, &errorResp); err != nil { + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + return nil, fmt.Errorf("API error: %s", errorResp.Message) + } + + var payloadResp PayloadResponse + if err := json.Unmarshal(body, &payloadResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + pc.logger.Debug( + "Retrieved payload from leader", + "payload_id", payloadResp.PayloadID, + "block_height", payloadResp.BlockHeight, + ) + return &payloadResp, nil +} + +// GetPayloadsSince fetches payloads with block height >= sinceHeight from the leader node +func (pc *PayloadClient) GetPayloadsSince(ctx context.Context, sinceHeight uint64, limit int) (*PayloadListResponse, error) { + url := fmt.Sprintf("%s/api/v1/payload/since/%d?limit=%d", pc.baseURL, sinceHeight, limit) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := pc.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + //nolint:errcheck + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode != http.StatusOK { + var errorResp ErrorResponse + if err := json.Unmarshal(body, &errorResp); err != nil { + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + return nil, fmt.Errorf("API error: %s", errorResp.Message) + } + + var payloadListResp PayloadListResponse + if err := json.Unmarshal(body, &payloadListResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + pc.logger.Debug( + "Retrieved payloads since height from leader", + "since_height", sinceHeight, + "count", len(payloadListResp.Payloads), + "has_more", payloadListResp.HasMore, + "next_height", payloadListResp.NextHeight, + ) + + return &payloadListResp, nil +} + +// GetPayloadByHeight fetches a specific payload by block height from the leader node +func (pc *PayloadClient) GetPayloadByHeight(ctx context.Context, height uint64) (*PayloadResponse, error) { + url := fmt.Sprintf("%s/api/v1/payload/height/%d", pc.baseURL, height) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := pc.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + //nolint:errcheck + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode != http.StatusOK { + var errorResp ErrorResponse + if err := json.Unmarshal(body, &errorResp); err != nil { + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) + } + return nil, fmt.Errorf("API error: %s", errorResp.Message) + } + + var payloadResp PayloadResponse + if err := json.Unmarshal(body, &payloadResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + pc.logger.Debug( + "Retrieved payload by height from leader", + "height", height, + "payload_id", payloadResp.PayloadID, + ) + return &payloadResp, nil +} + +// CheckHealth checks if the leader node API is healthy +func (pc *PayloadClient) CheckHealth(ctx context.Context) error { + url := fmt.Sprintf("%s/api/v1/health", pc.baseURL) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("failed to create health check request: %w", err) + } + + resp, err := pc.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to execute health check: %w", err) + } + //nolint:errcheck + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("leader node unhealthy (status %d)", resp.StatusCode) + } + + return nil +} diff --git a/cl/singlenode/api/api_client_test.go b/cl/singlenode/api/api_client_test.go new file mode 100644 index 000000000..e6b2d22a9 --- /dev/null +++ b/cl/singlenode/api/api_client_test.go @@ -0,0 +1,618 @@ +package api + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +func TestNewPayloadClient(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + baseURL := "http://localhost:8080" + + client := NewPayloadClient(baseURL, logger) + + if client.baseURL != baseURL { + t.Errorf("Expected baseURL %s, got %s", baseURL, client.baseURL) + } + + if client.httpClient == nil { + t.Error("Expected httpClient to be initialized") + } + + if client.httpClient.Timeout != 30*time.Second { + t.Errorf("Expected timeout to be 30s, got %v", client.httpClient.Timeout) + } + + if client.logger == nil { + t.Error("Expected logger to be initialized") + } +} + +func TestPayloadClient_GetLatestPayload_Success(t *testing.T) { + // Create test payload response + expectedPayload := PayloadResponse{ + PayloadID: "payload_123", + ExecutionPayload: "0x1234567890abcdef", + BlockHeight: 100, + Timestamp: time.Now().Unix(), + } + + // Create test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v1/payload/latest" { + t.Errorf("Expected path /api/v1/payload/latest, got %s", r.URL.Path) + } + if r.Method != http.MethodGet { + t.Errorf("Expected GET method, got %s", r.Method) + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(expectedPayload) + if err != nil { + t.Fatalf("Failed to encode response: %v", err) + } + })) + defer server.Close() + + // Create client + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + // Test the method + ctx := context.Background() + result, err := client.GetLatestPayload(ctx) + + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if result.PayloadID != expectedPayload.PayloadID { + t.Errorf("Expected PayloadID %s, got %s", expectedPayload.PayloadID, result.PayloadID) + } + + if result.ExecutionPayload != expectedPayload.ExecutionPayload { + t.Errorf("Expected ExecutionPayload %s, got %s", expectedPayload.ExecutionPayload, result.ExecutionPayload) + } + + if result.BlockHeight != expectedPayload.BlockHeight { + t.Errorf("Expected BlockHeight %d, got %d", expectedPayload.BlockHeight, result.BlockHeight) + } +} + +func TestPayloadClient_GetLatestPayload_ErrorResponse(t *testing.T) { + // Create test server that returns error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + errorResp := ErrorResponse{ + Error: "internal_error", + Code: 500, + Message: "Internal server error occurred", + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + err := json.NewEncoder(w).Encode(errorResp) + if err != nil { + t.Fatalf("Failed to encode error response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + result, err := client.GetLatestPayload(ctx) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + if result != nil { + t.Error("Expected nil result on error") + } + + expectedError := "API error: Internal server error occurred" + if err.Error() != expectedError { + t.Errorf("Expected error %s, got %s", expectedError, err.Error()) + } +} + +func TestPayloadClient_GetLatestPayload_InvalidJSON(t *testing.T) { + // Create test server that returns invalid JSON + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("invalid json")) + if err != nil { + t.Fatalf("Failed to write response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + result, err := client.GetLatestPayload(ctx) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + if result != nil { + t.Error("Expected nil result on error") + } +} + +func TestPayloadClient_GetLatestPayload_ContextCanceled(t *testing.T) { + // Create test server with delay + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + // Create context that will be canceled + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + result, err := client.GetLatestPayload(ctx) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + if result != nil { + t.Error("Expected nil result on error") + } +} + +func TestPayloadClient_GetPayloadsSince_Success(t *testing.T) { + // Create test payload list response + expectedResponse := PayloadListResponse{ + Payloads: []PayloadResponse{ + { + PayloadID: "payload_100", + ExecutionPayload: "0x100", + BlockHeight: 100, + Timestamp: time.Now().Unix(), + }, + { + PayloadID: "payload_101", + ExecutionPayload: "0x101", + BlockHeight: 101, + Timestamp: time.Now().Unix(), + }, + }, + HasMore: true, + NextHeight: 102, + TotalCount: 50, + } + + // Create test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + expectedPath := "/api/v1/payload/since/100" + if r.URL.Path != expectedPath { + t.Errorf("Expected path %s, got %s", expectedPath, r.URL.Path) + } + + expectedQuery := "limit=10" + if r.URL.RawQuery != expectedQuery { + t.Errorf("Expected query %s, got %s", expectedQuery, r.URL.RawQuery) + } + + if r.Method != http.MethodGet { + t.Errorf("Expected GET method, got %s", r.Method) + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(expectedResponse) + if err != nil { + t.Fatalf("Failed to encode response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + result, err := client.GetPayloadsSince(ctx, 100, 10) + + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if len(result.Payloads) != 2 { + t.Errorf("Expected 2 payloads, got %d", len(result.Payloads)) + } + + if result.HasMore != true { + t.Error("Expected HasMore to be true") + } + + if result.NextHeight != 102 { + t.Errorf("Expected NextHeight 102, got %d", result.NextHeight) + } + + if result.TotalCount != 50 { + t.Errorf("Expected TotalCount 50, got %d", result.TotalCount) + } +} + +func TestPayloadClient_GetPayloadsSince_ErrorResponse(t *testing.T) { + // Create test server that returns error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + errorResp := ErrorResponse{ + Error: "not_found", + Code: 404, + Message: "No payloads found for the given height", + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + err := json.NewEncoder(w).Encode(errorResp) + if err != nil { + t.Fatalf("Failed to encode error response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + result, err := client.GetPayloadsSince(ctx, 999, 10) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + if result != nil { + t.Error("Expected nil result on error") + } + + expectedError := "API error: No payloads found for the given height" + if err.Error() != expectedError { + t.Errorf("Expected error %s, got %s", expectedError, err.Error()) + } +} + +func TestPayloadClient_GetPayloadByHeight_Success(t *testing.T) { + // Create test payload response + expectedPayload := PayloadResponse{ + PayloadID: "payload_150", + ExecutionPayload: "0x150", + BlockHeight: 150, + Timestamp: time.Now().Unix(), + } + + // Create test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + expectedPath := "/api/v1/payload/height/150" + if r.URL.Path != expectedPath { + t.Errorf("Expected path %s, got %s", expectedPath, r.URL.Path) + } + + if r.Method != http.MethodGet { + t.Errorf("Expected GET method, got %s", r.Method) + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(expectedPayload) + if err != nil { + t.Fatalf("Failed to encode response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + result, err := client.GetPayloadByHeight(ctx, 150) + + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if result.PayloadID != expectedPayload.PayloadID { + t.Errorf("Expected PayloadID %s, got %s", expectedPayload.PayloadID, result.PayloadID) + } + + if result.BlockHeight != expectedPayload.BlockHeight { + t.Errorf("Expected BlockHeight %d, got %d", expectedPayload.BlockHeight, result.BlockHeight) + } +} + +func TestPayloadClient_GetPayloadByHeight_NotFound(t *testing.T) { + // Create test server that returns 404 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + errorResp := ErrorResponse{ + Error: "not_found", + Code: 404, + Message: "Payload not found for height 999", + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + err := json.NewEncoder(w).Encode(errorResp) + if err != nil { + t.Fatalf("Failed to encode error response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + result, err := client.GetPayloadByHeight(ctx, 999) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + if result != nil { + t.Error("Expected nil result on error") + } + + expectedError := "API error: Payload not found for height 999" + if err.Error() != expectedError { + t.Errorf("Expected error %s, got %s", expectedError, err.Error()) + } +} + +func TestPayloadClient_CheckHealth_Success(t *testing.T) { + // Create test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v1/health" { + t.Errorf("Expected path /api/v1/health, got %s", r.URL.Path) + } + + if r.Method != http.MethodGet { + t.Errorf("Expected GET method, got %s", r.Method) + } + + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("OK")) + if err != nil { + t.Fatalf("Failed to write response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + err := client.CheckHealth(ctx) + + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } +} + +func TestPayloadClient_CheckHealth_Unhealthy(t *testing.T) { + // Create test server that returns unhealthy status + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, err := w.Write([]byte("Internal Server Error")) + if err != nil { + t.Fatalf("Failed to write response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + err := client.CheckHealth(ctx) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + expectedError := "leader node unhealthy (status 500)" + if err.Error() != expectedError { + t.Errorf("Expected error %s, got %s", expectedError, err.Error()) + } +} + +func TestPayloadClient_CheckHealth_NetworkError(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient("http://nonexistent.example.com", logger) + + ctx := context.Background() + err := client.CheckHealth(ctx) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + // Should contain "failed to execute health check" + if !contains(err.Error(), "failed to execute health check") { + t.Errorf("Expected error to contain 'failed to execute health check', got %s", err.Error()) + } +} + +func TestPayloadClient_ErrorResponse_InvalidJSON(t *testing.T) { + // Create test server that returns non-JSON error response + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte("Bad Request - Not JSON")) + if err != nil { + t.Fatalf("Failed to write response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + result, err := client.GetLatestPayload(ctx) + + if err == nil { + t.Fatal("Expected error, got nil") + } + + if result != nil { + t.Error("Expected nil result on error") + } + + expectedError := "API error (status 400): Bad Request - Not JSON" + if err.Error() != expectedError { + t.Errorf("Expected error %s, got %s", expectedError, err.Error()) + } +} + +// Benchmark tests +func BenchmarkPayloadClient_GetLatestPayload(b *testing.B) { + payload := PayloadResponse{ + PayloadID: "payload_bench", + ExecutionPayload: "0xbenchmark", + BlockHeight: 1000, + Timestamp: time.Now().Unix(), + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(payload) + if err != nil { + b.Fatalf("Failed to encode response: %v", err) + } + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + ctx := context.Background() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := client.GetLatestPayload(ctx) + if err != nil { + b.Fatalf("Unexpected error: %v", err) + } + } +} + +// Helper function to check if a string contains a substring +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || + len(s) > len(substr) && (s[:len(substr)] == substr || + s[len(s)-len(substr):] == substr || + containsHelper(s, substr))) +} + +func containsHelper(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// Table-driven test for multiple scenarios +func TestPayloadClient_GetLatestPayload_TableDriven(t *testing.T) { + tests := []struct { + name string + serverResponse func(w http.ResponseWriter, r *http.Request) + expectedError bool + errorContains string + }{ + { + name: "successful response", + serverResponse: func(w http.ResponseWriter, r *http.Request) { + payload := PayloadResponse{ + PayloadID: "test_payload", + BlockHeight: 42, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(payload) + if err != nil { + t.Fatalf("Failed to encode response: %v", err) + } + }, + expectedError: false, + }, + { + name: "server error", + serverResponse: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, err := w.Write([]byte("Internal Server Error")) + if err != nil { + t.Fatalf("Failed to write response: %v", err) + } + }, + expectedError: true, + errorContains: "API error (status 500)", + }, + { + name: "delayed response", + serverResponse: func(w http.ResponseWriter, r *http.Request) { + time.Sleep(50 * time.Millisecond) // Short delay to test but not cause timeout + payload := PayloadResponse{ + PayloadID: "delayed_payload", + BlockHeight: 123, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(payload) + if err != nil { + t.Fatalf("Failed to encode response: %v", err) + } + }, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(tt.serverResponse)) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + client := NewPayloadClient(server.URL, logger) + + ctx := context.Background() + result, err := client.GetLatestPayload(ctx) + + if tt.expectedError { + if err == nil { + t.Fatal("Expected error, got nil") + } + if tt.errorContains != "" && !contains(err.Error(), tt.errorContains) { + t.Errorf("Expected error to contain %s, got %s", tt.errorContains, err.Error()) + } + if result != nil { + t.Error("Expected nil result on error") + } + } else { + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if result == nil { + t.Error("Expected non-nil result on success") + } + } + }) + } +} diff --git a/cl/singlenode/api/api_server.go b/cl/singlenode/api/api_server.go new file mode 100644 index 000000000..672a42b56 --- /dev/null +++ b/cl/singlenode/api/api_server.go @@ -0,0 +1,315 @@ +package api + +import ( + "context" + "database/sql" + "encoding/json" + "log/slog" + "net/http" + "strconv" + "strings" + "time" + + "github.com/primev/mev-commit/cl/types" +) + +// StateManager interface for accessing block build state +type StateManager interface { + GetBlockBuildState(ctx context.Context) types.BlockBuildState +} + +// PayloadServer provides HTTP API for member nodes to fetch payloads +type PayloadServer struct { + logger *slog.Logger + stateManager StateManager + payloadRepo types.PayloadRepository + server *http.Server +} + +// NewPayloadServer creates a new payload API server +func NewPayloadServer( + addr string, + stateManager StateManager, + payloadRepo types.PayloadRepository, + logger *slog.Logger, +) *PayloadServer { + mux := http.NewServeMux() + + ps := &PayloadServer{ + logger: logger.With("component", "PayloadServer"), + stateManager: stateManager, + payloadRepo: payloadRepo, + server: &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 60 * time.Second, + }, + } + + // Register endpoints + mux.HandleFunc("/api/v1/payload/latest", ps.handleGetLatestPayload) + mux.HandleFunc("/api/v1/payload/since/", ps.handleGetPayloadsSince) + mux.HandleFunc("/api/v1/payload/height/", ps.handleGetPayloadByHeight) + mux.HandleFunc("/api/v1/health", ps.handleHealth) + + return ps +} + +// Start starts the HTTP server +func (ps *PayloadServer) Start(ctx context.Context) error { + ps.logger.Info("Starting payload API server", "addr", ps.server.Addr) + + // Start server in goroutine + go func() { + if err := ps.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + ps.logger.Error("Payload API server error", "error", err) + } + }() + + // Wait for context cancellation to shutdown + <-ctx.Done() + return ps.Stop() +} + +// Stop gracefully stops the HTTP server +func (ps *PayloadServer) Stop() error { + ps.logger.Info("Stopping payload API server") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return ps.server.Shutdown(ctx) +} + +// convertToResponse converts types.PayloadInfo to PayloadResponse +func convertToResponse(payload *types.PayloadInfo) PayloadResponse { + return PayloadResponse{ + PayloadID: payload.PayloadID, + ExecutionPayload: payload.ExecutionPayload, + BlockHeight: payload.BlockHeight, + Timestamp: payload.InsertedAt.Unix(), + } +} + +// handleGetLatestPayload returns the latest payload from the current block build state +func (ps *PayloadServer) handleGetLatestPayload(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + ps.writeError(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + // Try to get from repository first if available + if ps.payloadRepo != nil { + if payload, err := ps.payloadRepo.GetLatestPayload(ctx); err == nil && payload != nil { + response := convertToResponse(payload) + ps.writeJSON(w, response, http.StatusOK) + ps.logger.Debug( + "Served latest payload from repository", + "payload_id", payload.PayloadID, + "height", payload.BlockHeight, + ) + return + } + } + + // Fallback to state manager + state := ps.stateManager.GetBlockBuildState(ctx) + + if state.PayloadID == "" || state.ExecutionPayload == "" { + ps.writeError(w, "No payload available", http.StatusNotFound) + return + } + + response := PayloadResponse{ + PayloadID: state.PayloadID, + ExecutionPayload: state.ExecutionPayload, + BlockHeight: 0, // We don't have height from state manager + Timestamp: time.Now().Unix(), + } + + ps.writeJSON(w, response, http.StatusOK) + ps.logger.Debug("Served latest payload from state", "payload_id", state.PayloadID) +} + +// handleGetPayloadsSince returns payloads since a given block height +func (ps *PayloadServer) handleGetPayloadsSince(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + ps.writeError(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if ps.payloadRepo == nil { + ps.writeError(w, "Payload repository not available", http.StatusServiceUnavailable) + return + } + + // Extract height from URL path: /api/v1/payload/since/{height} + pathParts := strings.Split(r.URL.Path, "/") + if len(pathParts) < 6 { + ps.writeError(w, "Block height required", http.StatusBadRequest) + return + } + + heightStr := pathParts[5] + height, err := strconv.ParseUint(heightStr, 10, 64) + if err != nil { + ps.writeError(w, "Invalid block height", http.StatusBadRequest) + return + } + + // Parse query parameters + limitStr := r.URL.Query().Get("limit") + limit := 50 // Default limit + if limitStr != "" { + if parsedLimit, err := strconv.Atoi(limitStr); err == nil && parsedLimit > 0 && parsedLimit <= 1000 { + limit = parsedLimit + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + payloads, err := ps.payloadRepo.GetPayloadsSince(ctx, height, limit+1) // +1 to check if there are more + if err != nil { + ps.logger.Error( + "Failed to get payloads since height", + "height", height, + "error", err, + ) + ps.writeError(w, "Failed to retrieve payloads", http.StatusInternalServerError) + return + } + + // Check if there are more payloads + hasMore := len(payloads) > limit + if hasMore { + payloads = payloads[:limit] // Remove the extra payload + } + + // Convert to response format + responsePayloads := make([]PayloadResponse, len(payloads)) + var nextHeight uint64 + for i, payload := range payloads { + responsePayloads[i] = convertToResponse(&payload) + if i == len(payloads)-1 { + nextHeight = payload.BlockHeight + 1 + } + } + + response := PayloadListResponse{ + Payloads: responsePayloads, + HasMore: hasMore, + NextHeight: nextHeight, + TotalCount: len(responsePayloads), + } + + ps.writeJSON(w, response, http.StatusOK) + ps.logger.Debug( + "Served payloads since height", + "since_height", height, + "count", len(responsePayloads), + "has_more", hasMore, + "next_height", nextHeight, + ) +} + +// handleGetPayloadByHeight returns a specific payload by block height +func (ps *PayloadServer) handleGetPayloadByHeight(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + ps.writeError(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if ps.payloadRepo == nil { + ps.writeError(w, "Payload repository not available", http.StatusServiceUnavailable) + return + } + + // Extract height from URL path: /api/v1/payload/height/{height} + pathParts := strings.Split(r.URL.Path, "/") + if len(pathParts) < 6 { + ps.writeError(w, "Block height required", http.StatusBadRequest) + return + } + + heightStr := pathParts[5] + height, err := strconv.ParseUint(heightStr, 10, 64) + if err != nil { + ps.writeError(w, "Invalid block height", http.StatusBadRequest) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + payload, err := ps.payloadRepo.GetPayloadByHeight(ctx, height) + if err != nil { + if err == sql.ErrNoRows { + ps.writeError(w, "Payload not found", http.StatusNotFound) + } else { + ps.logger.Error( + "Failed to get payload by height", + "height", height, + "error", err, + ) + ps.writeError(w, "Failed to retrieve payload", http.StatusInternalServerError) + } + return + } + + response := convertToResponse(payload) + ps.writeJSON(w, response, http.StatusOK) + ps.logger.Debug( + "Served payload by height", + "height", height, + "payload_id", payload.PayloadID, + ) +} + +// handleHealth returns server health status +func (ps *PayloadServer) handleHealth(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + ps.writeError(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("OK")) + if err != nil { + ps.logger.Error( + "Failed to write health response", + "error", err, + ) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } +} + +// writeJSON writes a JSON response +func (ps *PayloadServer) writeJSON(w http.ResponseWriter, data interface{}, statusCode int) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + + if err := json.NewEncoder(w).Encode(data); err != nil { + ps.logger.Error( + "Failed to encode JSON response", + "error", err, + ) + } +} + +// writeError writes an error response +func (ps *PayloadServer) writeError(w http.ResponseWriter, message string, statusCode int) { + response := ErrorResponse{ + Error: message, + Code: statusCode, + Message: message, + } + + ps.writeJSON(w, response, statusCode) +} diff --git a/cl/singlenode/api/api_server_test.go b/cl/singlenode/api/api_server_test.go new file mode 100644 index 000000000..8e6107b91 --- /dev/null +++ b/cl/singlenode/api/api_server_test.go @@ -0,0 +1,519 @@ +package api + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "log/slog" + + "github.com/primev/mev-commit/cl/types" +) + +// Mock implementations for testing + +type mockStateManager struct { + state types.BlockBuildState +} + +func (m *mockStateManager) GetBlockBuildState(ctx context.Context) types.BlockBuildState { + return m.state +} + +type mockPayloadRepository struct { + latestPayload *types.PayloadInfo + latestError error + payloadsByHeight map[uint64]*types.PayloadInfo + payloadsSince []types.PayloadInfo + payloadsSinceErr error + getByHeightErr error +} + +func (m *mockPayloadRepository) Close() error { + return nil +} + +func (m *mockPayloadRepository) GetLatestPayload(ctx context.Context) (*types.PayloadInfo, error) { + return m.latestPayload, m.latestError +} + +func (m *mockPayloadRepository) GetPayloadsSince(ctx context.Context, height uint64, limit int) ([]types.PayloadInfo, error) { + if m.payloadsSinceErr != nil { + return nil, m.payloadsSinceErr + } + return m.payloadsSince, nil +} + +func (m *mockPayloadRepository) GetPayloadByHeight(ctx context.Context, height uint64) (*types.PayloadInfo, error) { + if m.getByHeightErr != nil { + return nil, m.getByHeightErr + } + if payload, exists := m.payloadsByHeight[height]; exists { + return payload, nil + } + return nil, sql.ErrNoRows +} +func (m *mockPayloadRepository) SavePayload(ctx context.Context, info *types.PayloadInfo) error { + if m.latestPayload != nil && m.latestPayload.PayloadID == info.PayloadID { + return fmt.Errorf("payload already exists") + } + + // Simulate saving the payload + m.latestPayload = info + + if info.BlockHeight == 0 { + return fmt.Errorf("invalid block height") + } + + m.payloadsByHeight[info.BlockHeight] = info + return nil +} + +// Helper function to create a test logger +func createTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{ + Level: slog.LevelError, // Only log errors during tests + })) +} + +// Helper function to create test payload info +func createTestPayloadInfo(id string, height uint64) *types.PayloadInfo { + return &types.PayloadInfo{ + PayloadID: id, + ExecutionPayload: fmt.Sprintf("payload-data-%s", id), + BlockHeight: height, + InsertedAt: time.Now(), + } +} + +func TestNewPayloadServer(t *testing.T) { + logger := createTestLogger() + stateManager := &mockStateManager{} + payloadRepo := &mockPayloadRepository{} + + server := NewPayloadServer("localhost:8080", stateManager, payloadRepo, logger) + + if server == nil { + t.Fatal("Expected non-nil PayloadServer") + } + + if server.server.Addr != "localhost:8080" { + t.Errorf("Expected addr localhost:8080, got %s", server.server.Addr) + } + + if server.server.ReadTimeout != 30*time.Second { + t.Errorf("Expected ReadTimeout 30s, got %v", server.server.ReadTimeout) + } + + if server.server.WriteTimeout != 30*time.Second { + t.Errorf("Expected WriteTimeout 30s, got %v", server.server.WriteTimeout) + } + + if server.server.IdleTimeout != 60*time.Second { + t.Errorf("Expected IdleTimeout 60s, got %v", server.server.IdleTimeout) + } +} + +func TestHandleGetLatestPayload_FromRepository(t *testing.T) { + testPayload := createTestPayloadInfo("test-payload-1", 100) + + stateManager := &mockStateManager{} + payloadRepo := &mockPayloadRepository{ + latestPayload: testPayload, + } + + server := NewPayloadServer("localhost:8080", stateManager, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/latest", nil) + w := httptest.NewRecorder() + + server.handleGetLatestPayload(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var response PayloadResponse + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if response.PayloadID != testPayload.PayloadID { + t.Errorf("Expected PayloadID %s, got %s", testPayload.PayloadID, response.PayloadID) + } + + if response.ExecutionPayload != testPayload.ExecutionPayload { + t.Errorf("Expected ExecutionPayload %s, got %s", testPayload.ExecutionPayload, response.ExecutionPayload) + } + + if response.BlockHeight != testPayload.BlockHeight { + t.Errorf("Expected BlockHeight %d, got %d", testPayload.BlockHeight, response.BlockHeight) + } +} + +func TestHandleGetLatestPayload_FromStateManager(t *testing.T) { + state := types.BlockBuildState{ + PayloadID: "state-payload-1", + ExecutionPayload: "state-execution-data", + } + + stateManager := &mockStateManager{state: state} + payloadRepo := &mockPayloadRepository{ + latestPayload: nil, + latestError: fmt.Errorf("repository error"), + } + + server := NewPayloadServer("localhost:8080", stateManager, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/latest", nil) + w := httptest.NewRecorder() + + server.handleGetLatestPayload(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var response PayloadResponse + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if response.PayloadID != state.PayloadID { + t.Errorf("Expected PayloadID %s, got %s", state.PayloadID, response.PayloadID) + } + + if response.ExecutionPayload != state.ExecutionPayload { + t.Errorf("Expected ExecutionPayload %s, got %s", state.ExecutionPayload, response.ExecutionPayload) + } + + if response.BlockHeight != 0 { + t.Errorf("Expected BlockHeight 0, got %d", response.BlockHeight) + } +} + +func TestHandleGetLatestPayload_NoPayloadAvailable(t *testing.T) { + stateManager := &mockStateManager{ + state: types.BlockBuildState{}, + } + payloadRepo := &mockPayloadRepository{ + latestPayload: nil, + latestError: fmt.Errorf("no payload"), + } + + server := NewPayloadServer("localhost:8080", stateManager, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/latest", nil) + w := httptest.NewRecorder() + + server.handleGetLatestPayload(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status %d, got %d", http.StatusNotFound, w.Code) + } + + var response ErrorResponse + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { + t.Fatalf("Failed to unmarshal error response: %v", err) + } + + if response.Error != "No payload available" { + t.Errorf("Expected error 'No payload available', got %s", response.Error) + } +} + +func TestHandleGetLatestPayload_MethodNotAllowed(t *testing.T) { + server := NewPayloadServer("localhost:8080", &mockStateManager{}, &mockPayloadRepository{}, createTestLogger()) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/payload/latest", nil) + w := httptest.NewRecorder() + + server.handleGetLatestPayload(w, req) + + if w.Code != http.StatusMethodNotAllowed { + t.Errorf("Expected status %d, got %d", http.StatusMethodNotAllowed, w.Code) + } +} + +func TestHandleGetPayloadsSince_Success(t *testing.T) { + payloads := []types.PayloadInfo{ + *createTestPayloadInfo("payload-1", 101), + *createTestPayloadInfo("payload-2", 102), + *createTestPayloadInfo("payload-3", 103), + } + + payloadRepo := &mockPayloadRepository{ + payloadsSince: payloads, + } + + server := NewPayloadServer("localhost:8080", &mockStateManager{}, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/since/100", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadsSince(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var response PayloadListResponse + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if len(response.Payloads) != 3 { + t.Errorf("Expected 3 payloads, got %d", len(response.Payloads)) + } + + if response.HasMore != false { + t.Errorf("Expected HasMore false, got %v", response.HasMore) + } + + if response.NextHeight != 104 { + t.Errorf("Expected NextHeight 104, got %d", response.NextHeight) + } + + if response.TotalCount != 3 { + t.Errorf("Expected TotalCount 3, got %d", response.TotalCount) + } +} + +func TestHandleGetPayloadsSince_WithLimit(t *testing.T) { + // Create 6 payloads but set limit to 2 + payloads := make([]types.PayloadInfo, 3) // Return 3 to test hasMore logic + for i := 0; i < 3; i++ { + payloads[i] = *createTestPayloadInfo(fmt.Sprintf("payload-%d", i+1), uint64(101+i)) + } + + payloadRepo := &mockPayloadRepository{ + payloadsSince: payloads, + } + + server := NewPayloadServer("localhost:8080", &mockStateManager{}, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/since/100?limit=2", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadsSince(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var response PayloadListResponse + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if len(response.Payloads) != 2 { + t.Errorf("Expected 2 payloads, got %d", len(response.Payloads)) + } + + if response.HasMore != true { + t.Errorf("Expected HasMore true, got %v", response.HasMore) + } +} + +func TestHandleGetPayloadsSince_NoRepository(t *testing.T) { + server := NewPayloadServer("localhost:8080", &mockStateManager{}, nil, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/since/100", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadsSince(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Errorf("Expected status %d, got %d", http.StatusServiceUnavailable, w.Code) + } +} + +func TestHandleGetPayloadsSince_InvalidHeight(t *testing.T) { + server := NewPayloadServer("localhost:8080", &mockStateManager{}, &mockPayloadRepository{}, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/since/invalid", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadsSince(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code) + } +} + +func TestHandleGetPayloadsSince_RepositoryError(t *testing.T) { + payloadRepo := &mockPayloadRepository{ + payloadsSinceErr: fmt.Errorf("database error"), + } + + server := NewPayloadServer("localhost:8080", &mockStateManager{}, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/since/100", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadsSince(w, req) + + if w.Code != http.StatusInternalServerError { + t.Errorf("Expected status %d, got %d", http.StatusInternalServerError, w.Code) + } +} + +func TestHandleGetPayloadByHeight_Success(t *testing.T) { + testPayload := createTestPayloadInfo("height-payload", 150) + + payloadRepo := &mockPayloadRepository{ + payloadsByHeight: map[uint64]*types.PayloadInfo{ + 150: testPayload, + }, + } + + server := NewPayloadServer("localhost:8080", &mockStateManager{}, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/height/150", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadByHeight(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var response PayloadResponse + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if response.PayloadID != testPayload.PayloadID { + t.Errorf("Expected PayloadID %s, got %s", testPayload.PayloadID, response.PayloadID) + } + + if response.BlockHeight != testPayload.BlockHeight { + t.Errorf("Expected BlockHeight %d, got %d", testPayload.BlockHeight, response.BlockHeight) + } +} + +func TestHandleGetPayloadByHeight_NotFound(t *testing.T) { + payloadRepo := &mockPayloadRepository{ + payloadsByHeight: map[uint64]*types.PayloadInfo{}, + } + + server := NewPayloadServer("localhost:8080", &mockStateManager{}, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/height/999", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadByHeight(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status %d, got %d", http.StatusNotFound, w.Code) + } +} + +func TestHandleGetPayloadByHeight_InvalidHeight(t *testing.T) { + server := NewPayloadServer("localhost:8080", &mockStateManager{}, &mockPayloadRepository{}, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/height/invalid", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadByHeight(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code) + } +} + +func TestHandleGetPayloadByHeight_NoRepository(t *testing.T) { + server := NewPayloadServer("localhost:8080", &mockStateManager{}, nil, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/height/100", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadByHeight(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Errorf("Expected status %d, got %d", http.StatusServiceUnavailable, w.Code) + } +} + +func TestHandleGetPayloadByHeight_RepositoryError(t *testing.T) { + payloadRepo := &mockPayloadRepository{ + getByHeightErr: fmt.Errorf("database connection error"), + } + + server := NewPayloadServer("localhost:8080", &mockStateManager{}, payloadRepo, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/payload/height/100", nil) + w := httptest.NewRecorder() + + server.handleGetPayloadByHeight(w, req) + + if w.Code != http.StatusInternalServerError { + t.Errorf("Expected status %d, got %d", http.StatusInternalServerError, w.Code) + } +} + +func TestHandleHealth_Success(t *testing.T) { + server := NewPayloadServer("localhost:8080", &mockStateManager{}, &mockPayloadRepository{}, createTestLogger()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/health", nil) + w := httptest.NewRecorder() + + server.handleHealth(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + body := strings.TrimSpace(w.Body.String()) + if body != "OK" { + t.Errorf("Expected body 'OK', got '%s'", body) + } +} + +func TestHandleHealth_MethodNotAllowed(t *testing.T) { + server := NewPayloadServer("localhost:8080", &mockStateManager{}, &mockPayloadRepository{}, createTestLogger()) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/health", nil) + w := httptest.NewRecorder() + + server.handleHealth(w, req) + + if w.Code != http.StatusMethodNotAllowed { + t.Errorf("Expected status %d, got %d", http.StatusMethodNotAllowed, w.Code) + } +} + +func TestConvertToResponse(t *testing.T) { + timestamp := time.Now() + payload := &types.PayloadInfo{ + PayloadID: "test-id", + ExecutionPayload: "test-execution-data", + BlockHeight: 42, + InsertedAt: timestamp, + } + + response := convertToResponse(payload) + + if response.PayloadID != payload.PayloadID { + t.Errorf("Expected PayloadID %s, got %s", payload.PayloadID, response.PayloadID) + } + + if response.ExecutionPayload != payload.ExecutionPayload { + t.Errorf("Expected ExecutionPayload %s, got %s", payload.ExecutionPayload, response.ExecutionPayload) + } + + if response.BlockHeight != payload.BlockHeight { + t.Errorf("Expected BlockHeight %d, got %d", payload.BlockHeight, response.BlockHeight) + } + + if response.Timestamp != timestamp.Unix() { + t.Errorf("Expected Timestamp %d, got %d", timestamp.Unix(), response.Timestamp) + } +} diff --git a/cl/singlenode/membernode/membernode.go b/cl/singlenode/membernode/membernode.go new file mode 100644 index 000000000..cb280af7c --- /dev/null +++ b/cl/singlenode/membernode/membernode.go @@ -0,0 +1,617 @@ +package membernode + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/primev/mev-commit/cl/blockbuilder" + "github.com/primev/mev-commit/cl/ethclient" + "github.com/primev/mev-commit/cl/singlenode/api" +) + +const ( + shutdownTimeout = 5 * time.Second + maxConsecutiveErrors = 5 + batchSize = 10 + maxCatchupPayloads = 100 + // Timeout for API calls to leader and Geth + apiCallTimeout = 30 * time.Second + // Interval for retrying initialization steps + initRetryInterval = 2 * time.Second + // Threshold to exit catch-up mode + catchUpExitThreshold = batchSize / 2 +) + +// Config holds the configuration for the MemberNodeApp +type Config struct { + InstanceID string + LeaderAPIURL string + EthClientURL string + JWTSecret string + HealthAddr string + PollInterval time.Duration +} + +// MemberNodeApp represents a member node that follows the leader sequentially +type MemberNodeApp struct { + logger *slog.Logger + cfg Config + blockBuilder *blockbuilder.BlockBuilder + payloadClient *api.PayloadClient + engineClient blockbuilder.EngineClient + appCtx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + connectionStatus sync.Mutex + leaderAvailable bool + initializedCh chan struct{} + + // Sequential processing state + processingMutex sync.RWMutex + lastProcessedHeight uint64 + isCatchingUp bool + isInitialized bool +} + +// NewMemberNodeApp creates and initializes a new MemberNodeApp +func NewMemberNodeApp( + parentCtx context.Context, + cfg Config, + logger *slog.Logger, +) (*MemberNodeApp, error) { + ctx, cancel := context.WithCancel(parentCtx) + + // Decode JWT secret + jwtBytes, err := hex.DecodeString(cfg.JWTSecret) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to decode JWT secret: %w", err) + } + + // Create Ethereum engine client + engineClient, err := ethclient.NewAuthClient(ctx, cfg.EthClientURL, jwtBytes) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create Ethereum engine client: %w", err) + } + + // Create block builder for member node + bb := blockbuilder.NewMemberBlockBuilder(engineClient, logger.With("component", "BlockBuilder")) + + // Create payload client + payloadClient := api.NewPayloadClient(cfg.LeaderAPIURL, logger) + + return &MemberNodeApp{ + logger: logger, + cfg: cfg, + blockBuilder: bb, + payloadClient: payloadClient, + engineClient: engineClient, + appCtx: ctx, + cancel: cancel, + initializedCh: make(chan struct{}), + leaderAvailable: false, + lastProcessedHeight: 0, + isCatchingUp: false, + isInitialized: false, + }, nil +} + +// getLocalGethHeight gets the current block height from local geth +func (app *MemberNodeApp) getLocalGethHeight(ctx context.Context) (uint64, error) { + header, err := app.engineClient.HeaderByNumber(ctx, nil) // nil = latest + if err != nil { + return 0, fmt.Errorf("failed to get latest header from local geth: %w", err) + } + + height := header.Number.Uint64() + app.logger.Debug("Retrieved local geth height", "height", height) + return height, nil +} + +// initializeStartingHeight determines the starting height from local geth +func (app *MemberNodeApp) initializeStartingHeight() { + defer close(app.initializedCh) // Signal completion regardless of outcome (or handle errors preventing it) + + app.logger.Info("Detecting starting height from local geth...") + + for { + select { + case <-app.appCtx.Done(): + app.logger.Info("Initialization cancelled.") + return + default: + ctx, cancelTimeout := context.WithTimeout(app.appCtx, apiCallTimeout) + + // Check leader availability first + if err := app.payloadClient.CheckHealth(ctx); err != nil { + cancelTimeout() + app.logger.Warn("Leader not available during initialization, retrying...", "error", err) + select { + case <-app.appCtx.Done(): + return + case <-time.After(initRetryInterval): + continue + } + } + + // Get local geth's current height + localHeight, err := app.getLocalGethHeight(ctx) + cancelTimeout() // Release timeout context + + if err != nil { + app.logger.Warn("Failed to get local geth height, retrying...", "error", err) + select { + case <-app.appCtx.Done(): + return + case <-time.After(initRetryInterval): + continue + } + } + + // Set lastProcessedHeight to current local height + // The processing loop will request localHeight + 1 + app.processingMutex.Lock() + app.lastProcessedHeight = localHeight + app.isInitialized = true + app.processingMutex.Unlock() + + app.logger.Info( + "Successfully detected starting height from local geth", + "local_height", localHeight, + "will_start_from", localHeight+1, + ) + return // Initialization successful + } + } +} + +// Start begins the member node operation +func (app *MemberNodeApp) Start() { + app.logger.Info("Starting MemberNodeApp...") + + // Launch health server + app.wg.Add(1) + go func() { + defer app.wg.Done() + mux := http.NewServeMux() + mux.HandleFunc("/health", app.healthHandler) + addr := app.cfg.HealthAddr + server := &http.Server{Addr: addr, Handler: mux} + app.logger.Info("Health endpoint listening", "address", addr) + + go func() { + <-app.appCtx.Done() + ctx, cancelShutdown := context.WithTimeout(context.Background(), shutdownTimeout) + defer cancelShutdown() + if err := server.Shutdown(ctx); err != nil { + // ErrServerClosed is expected on graceful shutdown, + // context.DeadlineExceeded if shutdownTimeout is reached. + if !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, context.DeadlineExceeded) { + app.logger.Warn("Health server shutdown error", "error", err) + } + } + }() + + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + app.logger.Error("Health server error", "error", err) + app.cancel() // Trigger app shutdown if health server fails critically + } + }() + + // Initialize starting height from local geth + app.wg.Add(1) + go func() { + defer app.wg.Done() + app.initializeStartingHeight() + }() + + // Start sequential payload processing loop + app.wg.Add(1) + go func() { + defer app.wg.Done() + defer app.logger.Info("MemberNodeApp run loop finished.") + app.runSequentialLoop() + }() +} + +// healthHandler responds on /health +func (app *MemberNodeApp) healthHandler(w http.ResponseWriter, r *http.Request) { + if err := app.appCtx.Err(); err != nil { + http.Error(w, "unavailable (shutting down)", http.StatusServiceUnavailable) + return + } + + app.connectionStatus.Lock() + leaderAvailable := app.leaderAvailable + app.connectionStatus.Unlock() + + if !leaderAvailable { + app.logger.Warn("Health check failed: leader node is not available") + http.Error(w, "leader node is not available", http.StatusServiceUnavailable) + return + } + + // Optionally, check if initialized + app.processingMutex.RLock() + initialized := app.isInitialized + app.processingMutex.RUnlock() + if !initialized { + http.Error(w, "initializing", http.StatusServiceUnavailable) + return + } + + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("OK")) + if err != nil { + app.logger.Error("Failed to write health response", "error", err) + // No return here, header already sent + } +} + +// runSequentialLoop continuously processes payloads in sequential order +func (app *MemberNodeApp) runSequentialLoop() { + // Wait for initialization + select { + case <-app.appCtx.Done(): + app.logger.Info("Run loop stopping before initialization due to context cancellation.") + return + case <-app.initializedCh: + app.logger.Info("Initialization complete, starting main processing.") + } + + // Check if initialization actually completed successfully (isInitialized flag) + // This is a safeguard in case initializedCh was closed due to appCtx.Done() during init. + app.processingMutex.RLock() + if !app.isInitialized { + app.processingMutex.RUnlock() + app.logger.Error("Initialization failed or was cancelled, run loop cannot start.") + return + } + startingHeight := app.lastProcessedHeight + 1 + app.processingMutex.RUnlock() + + app.logger.Info( + "MemberNodeApp sequential run loop started", + "instanceID", app.cfg.InstanceID, + "starting_height", startingHeight, + ) + + consecutiveErrors := 0 + ticker := time.NewTicker(app.cfg.PollInterval) + defer ticker.Stop() + + for { + select { + case <-app.appCtx.Done(): + app.logger.Info("MemberNodeApp run loop stopping due to context cancellation.") + return + case <-ticker.C: + err := app.processSequentialPayloads() + + if err != nil { + consecutiveErrors++ + app.setLeaderAvailability(false) + + if consecutiveErrors >= maxConsecutiveErrors { + app.logger.Error( + "Too many consecutive errors, member node may be unstable. Check leader connection and local Geth.", + "error", err, + "consecutive_errors", consecutiveErrors, + ) + } else { + app.logger.Warn( + "Failed to process sequential payloads", + "error", err, + "consecutive_errors", consecutiveErrors, + ) + } + } else { + if consecutiveErrors > 0 { + app.logger.Info( + "Recovered from errors", + "previous_consecutive_errors", consecutiveErrors, + ) + } + consecutiveErrors = 0 + app.setLeaderAvailability(true) // Assuming success implies leader is available + } + } + } +} + +// processSequentialPayloads fetches and processes payloads in sequential order +func (app *MemberNodeApp) processSequentialPayloads() error { + ctx, cancel := context.WithTimeout(app.appCtx, apiCallTimeout) // Overall timeout for this processing cycle + defer cancel() + + // Check leader health first + if err := app.payloadClient.CheckHealth(ctx); err != nil { + return fmt.Errorf("leader health check failed: %w", err) + } + app.setLeaderAvailability(true) // Leader is reachable + + app.processingMutex.RLock() + lastProcessedHeight := app.lastProcessedHeight + isCatchingUp := app.isCatchingUp + app.processingMutex.RUnlock() + + // Determine how many payloads to request + var requestLimit int + if isCatchingUp { + requestLimit = maxCatchupPayloads // e.g., 100 + app.logger.Debug("In catch-up mode, requesting more payloads", "limit", requestLimit) + } else { + requestLimit = batchSize // e.g., 10 + app.logger.Debug("In normal mode, requesting standard batch of payloads", "limit", requestLimit) + } + + // Get payloads since our last processed height + nextHeightToRequest := lastProcessedHeight + 1 + payloadsResponse, err := app.payloadClient.GetPayloadsSince(ctx, nextHeightToRequest, requestLimit) + if err != nil { + return fmt.Errorf("failed to get payloads since height %d: %w", nextHeightToRequest, err) + } + + if len(payloadsResponse.Payloads) == 0 { + app.logger.Debug("No new payloads available", "waiting_for_height", nextHeightToRequest) + return nil + } + + // Update catch-up mode status + currentlyCatchingUp := isCatchingUp + if !currentlyCatchingUp && len(payloadsResponse.Payloads) >= batchSize { + app.processingMutex.Lock() + app.isCatchingUp = true + app.processingMutex.Unlock() + app.logger.Info( + "Entering catch-up mode", + "current_height", lastProcessedHeight, + "available_payloads", len(payloadsResponse.Payloads), + ) + } else if currentlyCatchingUp && len(payloadsResponse.Payloads) < catchUpExitThreshold { + app.processingMutex.Lock() + app.isCatchingUp = false + app.processingMutex.Unlock() + app.logger.Info( + "Exiting catch-up mode", + "current_height", lastProcessedHeight, + "available_payloads", len(payloadsResponse.Payloads), + ) + } + + // Process payloads sequentially + processedCount := 0 + for _, payload := range payloadsResponse.Payloads { + select { + case <-app.appCtx.Done(): // Check for shutdown signal before processing each payload + return nil + default: + } + + // Get the most up-to-date lastProcessedHeight for sequence check + app.processingMutex.RLock() + currentSystemHeight := app.lastProcessedHeight + app.processingMutex.RUnlock() + + expectedHeightForThisPayload := currentSystemHeight + 1 + + // Case 1: Gap detected (payload is for a future height) + if payload.BlockHeight > expectedHeightForThisPayload { + app.logger.Warn( + "Gap detected in payload sequence, attempting to fill", + "expected_height", expectedHeightForThisPayload, + "received_payload_height", payload.BlockHeight, + "gap_size", payload.BlockHeight-expectedHeightForThisPayload, + ) + // Try to fill the gap from expectedHeightForThisPayload up to payload.BlockHeight - 1 + if err := app.fillPayloadGap(ctx, expectedHeightForThisPayload, payload.BlockHeight-1); err != nil { + return fmt.Errorf("failed to fill payload gap from %d to %d: %w", + expectedHeightForThisPayload, payload.BlockHeight-1, err) + } + // After gap fill, lastProcessedHeight should be (payload.BlockHeight - 1) + // Re-fetch currentSystemHeight to ensure the next check is correct + app.processingMutex.RLock() + currentSystemHeight = app.lastProcessedHeight + app.processingMutex.RUnlock() + expectedHeightForThisPayload = currentSystemHeight + 1 + } + + // Case 2: Payload is for an already processed or an older, unexpected height + if payload.BlockHeight < expectedHeightForThisPayload { + app.logger.Debug( + "Skipping already processed or out-of-order (older) payload", + "payload_height", payload.BlockHeight, + "expected_at_least", expectedHeightForThisPayload, + "current_system_height", currentSystemHeight, + ) + continue + } + + // Case 3: Payload is for the expected next height (critical check) + if payload.BlockHeight != expectedHeightForThisPayload { + // This should ideally not be reached if gap filling and previous checks are correct + return fmt.Errorf("critical sequence error: payload height %d does not match expected next height %d after potential gap fill. Current system height: %d", + payload.BlockHeight, expectedHeightForThisPayload, currentSystemHeight) + } + + // Process the payload + if err := app.processPayload(ctx, &payload); err != nil { + return fmt.Errorf("failed to process payload at height %d: %w", payload.BlockHeight, err) + } + + // Update processed height (this is critical) + app.processingMutex.Lock() + app.lastProcessedHeight = payload.BlockHeight + app.processingMutex.Unlock() + + processedCount++ + + // In catch-up mode, limit processing per cycle to avoid holding locks for too long + // or starving other operations, and to allow context cancellation checks. + app.processingMutex.RLock() + stillCatchingUp := app.isCatchingUp + currentHeightAfterProcess := app.lastProcessedHeight + app.processingMutex.RUnlock() + + if stillCatchingUp && processedCount >= maxCatchupPayloads { + app.logger.Info( + "Processed maximum catch-up payloads in this cycle, will continue in next cycle", + "processed_count", processedCount, + "current_height", currentHeightAfterProcess, + ) + break // Exit the loop for this batch, will fetch new batch in next ticker + } + } + + if processedCount > 0 { + app.processingMutex.RLock() + finalHeight := app.lastProcessedHeight + catchUpMode := app.isCatchingUp + app.processingMutex.RUnlock() + + app.logger.Info( + "Successfully processed sequential payloads batch", + "processed_count", processedCount, + "final_height", finalHeight, + "catch_up_mode", catchUpMode, + ) + } + return nil +} + +// fillPayloadGap attempts to fetch and process missing payloads in a range +func (app *MemberNodeApp) fillPayloadGap(ctx context.Context, startHeight, endHeight uint64) error { + if startHeight > endHeight { + app.logger.Info( + "No gap to fill or invalid range", + "start", startHeight, "end", endHeight, + ) + return nil + } + app.logger.Info( + "Filling payload gap", + "start_height", startHeight, + "end_height", endHeight, + "gap_size", endHeight-startHeight+1, + ) + + for height := startHeight; height <= endHeight; height++ { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled during gap fill at height %d: %w", height, ctx.Err()) + case <-app.appCtx.Done(): + return fmt.Errorf("application shutting down during gap fill at height %d: %w", height, app.appCtx.Err()) + default: + } + + // Get specific payload by height + payload, err := app.payloadClient.GetPayloadByHeight(ctx, height) + if err != nil { + return fmt.Errorf("failed to get payload for gap at height %d: %w", height, err) + } + + // Process the payload + if err := app.processPayload(ctx, payload); err != nil { + return fmt.Errorf("failed to process gap payload at height %d: %w", height, err) + } + + app.processingMutex.Lock() + if app.lastProcessedHeight != height-1 { + app.processingMutex.Unlock() + // This indicates a severe internal inconsistency or a concurrent modification problem. + // It means another part of the code or a previous iteration did not leave lastProcessedHeight as expected. + return fmt.Errorf("critical sequence error during gap fill: expected lastProcessedHeight %d before processing %d, but got %d", + height-1, height, app.lastProcessedHeight) + } + app.lastProcessedHeight = height + app.processingMutex.Unlock() + + app.logger.Debug("Filled gap payload", "height", height) + } + + app.logger.Info( + "Successfully filled payload gap", + "start_height", startHeight, + "end_height", endHeight, + "final_processed_height_after_gap_fill", endHeight, + ) + return nil +} + +// processPayload applies a single payload to the local geth client +func (app *MemberNodeApp) processPayload(ctx context.Context, payload *api.PayloadResponse) error { + app.logger.Info( + "Processing payload", + "payload_id", payload.PayloadID, + "block_height", payload.BlockHeight, + ) + + // Apply payload to local geth client + err := app.blockBuilder.FinalizeBlock(ctx, payload.PayloadID, payload.ExecutionPayload, "") + if err != nil { + app.logger.Error( + "Failed to finalize block", + "payload_id", payload.PayloadID, + "block_height", payload.BlockHeight, + "error", err, + ) + return fmt.Errorf("blockBuilder.FinalizeBlock failed for height %d: %w", payload.BlockHeight, err) + } + + app.logger.Info( + "Successfully applied payload", + "payload_id", payload.PayloadID, + "block_height", payload.BlockHeight, + ) + return nil +} + +// setLeaderAvailability updates the leader availability status +func (app *MemberNodeApp) setLeaderAvailability(available bool) { + app.connectionStatus.Lock() + defer app.connectionStatus.Unlock() + + if app.leaderAvailable != available { + app.leaderAvailable = available + app.logger.Info("Leader availability changed", "available", available) + } +} + +// GetLastProcessedHeight returns the last successfully processed block height +func (app *MemberNodeApp) GetLastProcessedHeight() uint64 { + app.processingMutex.RLock() + defer app.processingMutex.RUnlock() + return app.lastProcessedHeight +} + +// Stop gracefully stops the member node +func (app *MemberNodeApp) Stop() { + app.logger.Info("Stopping MemberNodeApp...") + app.cancel() // Signal all goroutines to stop + + waitCh := make(chan struct{}) + go func() { + app.wg.Wait() // Wait for all primary goroutines to finish + close(waitCh) + }() + + select { + case <-waitCh: + app.logger.Info("MemberNodeApp goroutines shut down gracefully.") + case <-time.After(shutdownTimeout + 1*time.Second): + app.logger.Warn("MemberNodeApp shutdown timed out waiting for goroutines.") + } + + app.processingMutex.RLock() + finalHeight := app.lastProcessedHeight + app.processingMutex.RUnlock() + + app.logger.Info("MemberNodeApp stopped.", "final_processed_height", finalHeight) +} diff --git a/cl/singlenode/payloadstore/postgres.go b/cl/singlenode/payloadstore/postgres.go new file mode 100644 index 000000000..c0389cf3d --- /dev/null +++ b/cl/singlenode/payloadstore/postgres.go @@ -0,0 +1,266 @@ +package payloadstore + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "time" + + _ "github.com/lib/pq" + "github.com/primev/mev-commit/cl/types" // Import shared types +) + +// PostgresRepository implements the types.PayloadRepository interface using PostgreSQL. +type PostgresRepository struct { + db *sql.DB + logger *slog.Logger +} + +// NewPostgresRepository creates a new PostgresRepository. +// It also attempts to create the necessary table if it doesn't exist. +func NewPostgresRepository(ctx context.Context, dsn string, logger *slog.Logger) (*PostgresRepository, error) { + l := logger.With("component", "PostgresRepository") + + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, fmt.Errorf("failed to open postgres connection: %w", err) + } + + db.SetMaxOpenConns(25) + db.SetMaxIdleConns(25) + db.SetConnMaxLifetime(5 * time.Minute) + + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := db.PingContext(pingCtx); err != nil { + err := db.Close() + if err != nil { + l.Error("Failed to close database connection after error", "error", err) + } + return nil, fmt.Errorf("failed to ping postgres: %w", err) + } + + // Create table with enhanced schema for sequential access + schemaCreationQuery := ` + CREATE TABLE IF NOT EXISTS execution_payloads ( + id SERIAL PRIMARY KEY, + payload_id VARCHAR(66) UNIQUE NOT NULL, -- e.g., 0x... (32 bytes hex + 0x prefix) + raw_execution_payload TEXT NOT NULL, + block_height BIGINT NOT NULL, + inserted_at TIMESTAMPTZ DEFAULT NOW(), + + -- Indexes for efficient querying + UNIQUE(block_height) + ); + + -- Create indexes if they don't exist + CREATE INDEX IF NOT EXISTS idx_block_height ON execution_payloads(block_height); + CREATE INDEX IF NOT EXISTS idx_inserted_at ON execution_payloads(inserted_at); + ` + execCtx, execCancel := context.WithTimeout(ctx, 10*time.Second) + defer execCancel() + if _, err := db.ExecContext(execCtx, schemaCreationQuery); err != nil { + err := db.Close() + if err != nil { + l.Error("Failed to close database connection after error", "error", err) + } + return nil, fmt.Errorf("failed to create execution_payloads table: %w", err) + } + l.Info("Successfully connected to PostgreSQL and ensured table exists.") + return &PostgresRepository{db: db, logger: l}, nil +} + +// SavePayload saves the payload information to the database. +func (r *PostgresRepository) SavePayload(ctx context.Context, info *types.PayloadInfo) error { + query := ` + INSERT INTO execution_payloads (payload_id, raw_execution_payload, block_height) + VALUES ($1, $2, $3) + ON CONFLICT (payload_id) DO NOTHING; + ` // ON CONFLICT DO NOTHING will silently ignore duplicates by payload_id + + insertCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result, err := r.db.ExecContext(insertCtx, query, info.PayloadID, info.ExecutionPayload, info.BlockHeight) + if err != nil { + r.logger.Error( + "Failed to insert payload into postgres", + "payload_id", info.PayloadID, + "block_height", info.BlockHeight, + "error", err, + ) + return fmt.Errorf("failed to insert payload into postgres: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err == nil && rowsAffected > 0 { + r.logger.Debug( + "Payload saved to database", + "payload_id", info.PayloadID, + "block_height", info.BlockHeight, + ) + } else if err == nil && rowsAffected == 0 { + r.logger.Debug( + "Payload already exists in database or no rows affected", + "payload_id", info.PayloadID, + "block_height", info.BlockHeight, + ) + } + + return nil +} + +// GetPayloadsSince retrieves payloads with block height >= sinceHeight, ordered by block height +func (r *PostgresRepository) GetPayloadsSince(ctx context.Context, sinceHeight uint64, limit int) ([]types.PayloadInfo, error) { + query := ` + SELECT payload_id, raw_execution_payload, block_height, inserted_at + FROM execution_payloads + WHERE block_height >= $1 + ORDER BY block_height ASC + LIMIT $2; + ` + + queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + rows, err := r.db.QueryContext(queryCtx, query, sinceHeight, limit) + if err != nil { + r.logger.Error( + "Failed to query payloads since height", + "since_height", sinceHeight, + "limit", limit, + "error", err, + ) + return nil, fmt.Errorf("failed to query payloads since height %d: %w", sinceHeight, err) + } + //nolint:errcheck + defer rows.Close() + + var payloads []types.PayloadInfo + for rows.Next() { + var payload types.PayloadInfo + err := rows.Scan( + &payload.PayloadID, + &payload.ExecutionPayload, + &payload.BlockHeight, + &payload.InsertedAt, + ) + if err != nil { + r.logger.Error( + "Failed to scan payload row", + "error", err, + ) + return nil, fmt.Errorf("failed to scan payload row: %w", err) + } + payloads = append(payloads, payload) + } + + if err := rows.Err(); err != nil { + r.logger.Error( + "Error iterating payload rows", + "error", err, + ) + return nil, fmt.Errorf("error iterating payload rows: %w", err) + } + + r.logger.Debug( + "Retrieved payloads since height", + "since_height", sinceHeight, + "count", len(payloads), + "limit", limit, + ) + + return payloads, nil +} + +// GetPayloadByHeight retrieves a specific payload by block height +func (r *PostgresRepository) GetPayloadByHeight(ctx context.Context, height uint64) (*types.PayloadInfo, error) { + query := ` + SELECT payload_id, raw_execution_payload, block_height, inserted_at + FROM execution_payloads + WHERE block_height = $1; + ` + + queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var payload types.PayloadInfo + err := r.db.QueryRowContext(queryCtx, query, height).Scan( + &payload.PayloadID, + &payload.ExecutionPayload, + &payload.BlockHeight, + &payload.InsertedAt, + ) + + if err != nil { + if err == sql.ErrNoRows { + r.logger.Debug("Payload not found for height", "height", height) + return nil, sql.ErrNoRows + } + r.logger.Error( + "Failed to query payload by height", + "height", height, + "error", err, + ) + return nil, fmt.Errorf("failed to query payload by height %d: %w", height, err) + } + + r.logger.Debug( + "Retrieved payload by height", + "height", height, + "payload_id", payload.PayloadID, + ) + + return &payload, nil +} + +// GetLatestPayload retrieves the most recent payload +func (r *PostgresRepository) GetLatestPayload(ctx context.Context) (*types.PayloadInfo, error) { + query := ` + SELECT payload_id, raw_execution_payload, block_height, inserted_at + FROM execution_payloads + ORDER BY block_height DESC + LIMIT 1; + ` + + queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var payload types.PayloadInfo + err := r.db.QueryRowContext(queryCtx, query).Scan( + &payload.PayloadID, + &payload.ExecutionPayload, + &payload.BlockHeight, + &payload.InsertedAt, + ) + + if err != nil { + if err == sql.ErrNoRows { + r.logger.Debug("No payloads found in database") + return nil, sql.ErrNoRows + } + r.logger.Error( + "Failed to query latest payload", + "error", err, + ) + return nil, fmt.Errorf("failed to query latest payload: %w", err) + } + + r.logger.Debug( + "Retrieved latest payload", + "payload_id", payload.PayloadID, + "block_height", payload.BlockHeight, + ) + + return &payload, nil +} + +// Close closes the database connection. +func (r *PostgresRepository) Close() error { + if r.db != nil { + r.logger.Info("Closing PostgreSQL connection") + return r.db.Close() + } + return nil +} diff --git a/cl/singlenode/singlenode.go b/cl/singlenode/singlenode.go index 9bd7c11c9..db121e210 100644 --- a/cl/singlenode/singlenode.go +++ b/cl/singlenode/singlenode.go @@ -13,7 +13,10 @@ import ( "github.com/primev/mev-commit/cl/blockbuilder" "github.com/primev/mev-commit/cl/ethclient" + "github.com/primev/mev-commit/cl/singlenode/api" + "github.com/primev/mev-commit/cl/singlenode/payloadstore" localstate "github.com/primev/mev-commit/cl/singlenode/state" + "github.com/primev/mev-commit/cl/types" ) const ( @@ -30,11 +33,14 @@ type Config struct { EVMBuildDelayEmptyBlocks time.Duration PriorityFeeReceipt string HealthAddr string + PostgresDSN string + APIAddr string } type BlockBuilder interface { GetPayload(ctx context.Context) error FinalizeBlock(ctx context.Context, payloadID string, executionPayload string, extraData string) error + GetExecutionHead() *types.ExecutionHead } // SingleNodeApp orchestrates block production for a single node. @@ -45,6 +51,8 @@ type SingleNodeApp struct { // stateManager is a local state manager for block production // it's not anticipated to use DB as all the state already in geth client stateManager *localstate.LocalStateManager + payloadRepo types.PayloadRepository + payloadServer *api.PayloadServer appCtx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -90,11 +98,43 @@ func NewSingleNodeApp( cfg.PriorityFeeReceipt, ) + var pRepo types.PayloadRepository + if cfg.PostgresDSN != "" { + repo, err := payloadstore.NewPostgresRepository(ctx, cfg.PostgresDSN, logger) + if err != nil { + cancel() + logger.Error( + "failed to create payload repository", + "error", err, + ) + return nil, fmt.Errorf("failed to initialize payload repository: %w", err) + } + pRepo = repo + logger.Info("Payload repository initialized, payloads will be saved to PostgreSQL.") + } else { + logger.Info("PostgresDSN not provided, payload saving to DB is disabled.") + } + + var payloadServer *api.PayloadServer + if cfg.APIAddr != "" { + payloadServer = api.NewPayloadServer( + cfg.APIAddr, + stateMgr, + pRepo, + logger.With("component", "APIServer"), + ) + logger.Info("API server initialized for member nodes", "addr", cfg.APIAddr) + } else { + logger.Info("API address not provided, member node API is disabled.") + } + return &SingleNodeApp{ logger: logger, cfg: cfg, blockBuilder: bb, stateManager: stateMgr, + payloadRepo: pRepo, + payloadServer: payloadServer, appCtx: ctx, cancel: cancel, connectionRefused: false, @@ -176,6 +216,16 @@ func (app *SingleNodeApp) Start() { } }() + if app.payloadServer != nil { + app.wg.Add(1) + go func() { + defer app.wg.Done() + if err := app.payloadServer.Start(app.appCtx); err != nil { + app.logger.Error("API server error", "error", err) + } + }() + } + // Start block production loop app.wg.Add(1) go func() { @@ -252,8 +302,43 @@ func (app *SingleNodeApp) produceBlock() error { return errors.New("payload ID is empty after GetPayload call") } + // Get current block height from the execution head + executionHead := app.blockBuilder.GetExecutionHead() + var blockHeight uint64 + if executionHead != nil { + blockHeight = executionHead.BlockHeight + 1 // Next block height + } else { + app.logger.Warn("No execution head available, using height 0") + blockHeight = 0 + } + + if app.payloadRepo != nil { + payloadInfo := &types.PayloadInfo{ + PayloadID: currentState.PayloadID, + ExecutionPayload: currentState.ExecutionPayload, + BlockHeight: blockHeight, + } + saveCtx, saveCancel := context.WithTimeout(app.appCtx, 200*time.Millisecond) + defer saveCancel() + + if err := app.payloadRepo.SavePayload(saveCtx, payloadInfo); err != nil { + app.logger.Error( + "Failed to save payload to database", + "payload_id", currentState.PayloadID, + "error", err, + ) + return fmt.Errorf("failed to save payload to database: %w", err) + } else { + app.logger.Info("Payload details submitted to database for saving", "payload_id", currentState.PayloadID) + } + } + // Step 2: Finalize the block - app.logger.Info("finalizing block", "payload_id", currentState.PayloadID) + app.logger.Info( + "finalizing block", + "payload_id", currentState.PayloadID, + "block_height", blockHeight, + ) if err := app.blockBuilder.FinalizeBlock(app.appCtx, currentState.PayloadID, currentState.ExecutionPayload, ""); err != nil { return fmt.Errorf("failed to finalize block: %w", err) } @@ -278,5 +363,14 @@ func (app *SingleNodeApp) Stop() { case <-time.After(shutdownTimeout): app.logger.Warn("SingleNodeApp shutdown timed out waiting for run loop.") } + + if app.payloadRepo != nil { + if err := app.payloadRepo.Close(); err != nil { + app.logger.Error("Error closing payload repository", "error", err) + } else { + app.logger.Info("Payload repository closed.") + } + } + app.logger.Info("SingleNodeApp stopped.") } diff --git a/cl/singlenode/singlenode_test.go b/cl/singlenode/singlenode_test.go index 838a0d9b7..e11a7bec6 100644 --- a/cl/singlenode/singlenode_test.go +++ b/cl/singlenode/singlenode_test.go @@ -34,6 +34,15 @@ func (m *MockBlockBuilder) FinalizeBlock(ctx context.Context, payloadID string, return args.Error(0) } +// Add missing method to satisfy BlockBuilder interface +func (m *MockBlockBuilder) GetExecutionHead() *types.ExecutionHead { + args := m.Called() + if head, ok := args.Get(0).(*types.ExecutionHead); ok { + return head + } + return nil +} + // MockConnectionRefused provides a safe implementation for testing type MockConnectionRefused struct{} @@ -166,6 +175,7 @@ func TestProduceBlock(t *testing.T) { }) require.NoError(t, err) + mockBuilder.On("GetExecutionHead").Return((*types.ExecutionHead)(nil)) mockBuilder.On("GetPayload", mock.Anything).Return(nil) mockBuilder.On("FinalizeBlock", mock.Anything, "test-payload-id", "test-execution-payload", "").Return(nil) @@ -210,6 +220,7 @@ func TestProduceBlock(t *testing.T) { }) require.NoError(t, err) + mockBuilder.On("GetExecutionHead").Return((*types.ExecutionHead)(nil)) mockBuilder.On("GetPayload", mock.Anything).Return(nil) mockBuilder.On("FinalizeBlock", mock.Anything, "test-payload-id", "test-execution-payload", "").Return(assert.AnError) diff --git a/cl/types/types.go b/cl/types/types.go index bca82812f..51a5a2a91 100644 --- a/cl/types/types.go +++ b/cl/types/types.go @@ -1,5 +1,10 @@ package types +import ( + "context" + "time" +) + type ExecutionHead struct { BlockHeight uint64 BlockHash []byte @@ -36,3 +41,18 @@ const ( RedisMsgTypePending RedisMsgType = "0" RedisMsgTypeNew RedisMsgType = ">" ) + +type PayloadInfo struct { + PayloadID string + ExecutionPayload string + BlockHeight uint64 + InsertedAt time.Time +} + +type PayloadRepository interface { + SavePayload(ctx context.Context, info *PayloadInfo) error + GetPayloadsSince(ctx context.Context, sinceHeight uint64, limit int) ([]PayloadInfo, error) + GetPayloadByHeight(ctx context.Context, height uint64) (*PayloadInfo, error) + GetLatestPayload(ctx context.Context) (*PayloadInfo, error) + Close() error +}