Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions cl/cmd/singlenode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ var (
Category: categoryDatabase,
})

redisURLFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "redis-url",
Usage: "Redis URL for storing payloads. If empty, saving to Redis is disabled. (e.g., 'redis://localhost:6379/2')",

EnvVars: []string{"LEADER_REDIS_URL"},
Value: "",
Category: categoryDatabase,
})

apiAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{
Name: "api-addr",
Usage: "Address for member node API endpoint (e.g., ':9090'). If empty, API is disabled.",
Expand Down Expand Up @@ -237,6 +246,7 @@ func main() {
priorityFeeReceiptFlag,
healthAddrPortFlag,
postgresDSNFlag,
redisURLFlag,
apiAddrFlag,
nonAuthRpcUrlFlag,
txPoolPollingIntervalFlag,
Expand All @@ -252,6 +262,7 @@ func main() {
logTagsFlag,
healthAddrPortFlag,
postgresDSNFlag,
redisURLFlag,
syncBatchSizeFlag,
}

Expand Down Expand Up @@ -338,6 +349,7 @@ func startLeaderNode(c *cli.Context) error {
PriorityFeeReceipt: c.String(priorityFeeReceiptFlag.Name),
HealthAddr: c.String(healthAddrPortFlag.Name),
PostgresDSN: c.String(postgresDSNFlag.Name),
RedisURL: c.String(redisURLFlag.Name),
APIAddr: c.String(apiAddrFlag.Name),
NonAuthRpcURL: c.String(nonAuthRpcUrlFlag.Name),
TxPoolPollingInterval: c.Duration(txPoolPollingIntervalFlag.Name),
Expand Down Expand Up @@ -384,13 +396,24 @@ func startFollowerNode(c *cli.Context) error {
defer rootCancel()

postgresDSN := c.String(postgresDSNFlag.Name)
if postgresDSN == "" {
return fmt.Errorf("postgresDSN is required")
}
repo, err := payloadstore.NewPostgresFollower(rootCtx, postgresDSN, logger)
if err != nil {
return fmt.Errorf("failed to initialize payload repository: %w", err)
redisURL := c.String(redisURLFlag.Name)
var sharedDB follower.PayloadDB
if postgresDSN != "" {
pgRepo, err := payloadstore.NewPostgresFollower(rootCtx, postgresDSN, logger)
if err != nil {
return fmt.Errorf("failed to create postgres repository: %w", err)
}
sharedDB = pgRepo
} else if redisURL != "" {
redisRepo, err := payloadstore.NewRedisRepositoryFromURL(rootCtx, redisURL, logger)
if err != nil {
return fmt.Errorf("failed to create redis repository: %w", err)
}
sharedDB = redisRepo
} else {
return fmt.Errorf("postgresDSN or redisURL must be provided")
}

syncBatchSize := c.Uint64(syncBatchSizeFlag.Name)
if syncBatchSize == 0 {
return fmt.Errorf("sync-batch-size is required")
Expand Down Expand Up @@ -420,7 +443,7 @@ func startFollowerNode(c *cli.Context) error {

followerNode, err := follower.NewFollower(
logger,
repo,
sharedDB,
syncBatchSize,
bb,
healthAddr,
Expand Down
2 changes: 2 additions & 0 deletions cl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23.0
toolchain go1.24.0

require (
github.com/alicebob/miniredis/v2 v2.35.0
github.com/go-redis/redismock/v9 v9.2.0
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang/mock v1.6.0
Expand Down Expand Up @@ -56,6 +57,7 @@ require (
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.22.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions cl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDO
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI=
github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.20.0 h1:2F+rfL86jE2d/bmw7OhqUg2Sj/1rURkBn3MdfoPyRVU=
Expand Down Expand Up @@ -205,6 +207,8 @@ github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGC
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
Expand Down
9 changes: 3 additions & 6 deletions cl/singlenode/follower/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

type Follower struct {
logger *slog.Logger
sharedDB payloadDB
sharedDB PayloadDB
syncBatchSize uint64
payloadCh chan types.PayloadInfo
bbMutex sync.RWMutex
Expand All @@ -30,7 +30,7 @@ const (
defaultBackoff = 200 * time.Millisecond
)

type payloadDB interface {
type PayloadDB interface {
GetPayloadsSince(ctx context.Context, sinceHeight uint64, limit int) ([]types.PayloadInfo, error)
GetLatestHeight(ctx context.Context) (uint64, error)
}
Expand All @@ -43,14 +43,11 @@ type blockBuilder interface {

func NewFollower(
logger *slog.Logger,
sharedDB payloadDB,
sharedDB PayloadDB,
syncBatchSize uint64,
bb blockBuilder,
healthAddr string,
) (*Follower, error) {
if sharedDB == nil {
return nil, errors.New("payload repository not provided")
}
if syncBatchSize == 0 {
return nil, errors.New("sync batch size must be greater than 0")
}
Expand Down
178 changes: 178 additions & 0 deletions cl/singlenode/payloadstore/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package payloadstore

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strconv"
"time"

"github.com/primev/mev-commit/cl/types"
"github.com/redis/go-redis/v9"
)

type RedisRepository struct {
redisClient *redis.Client
logger *slog.Logger
}

func NewRedisRepository(redisClient *redis.Client, logger *slog.Logger) *RedisRepository {
return &RedisRepository{
redisClient: redisClient,
logger: logger.With("component", "RedisRepository"),
}
}

func NewRedisRepositoryFromURL(ctx context.Context, redisURL string, logger *slog.Logger) (*RedisRepository, error) {
opts, err := redis.ParseURL(redisURL)
if err != nil {
return nil, fmt.Errorf("invalid redis url: %w", err)
}
rdb := redis.NewClient(opts)
if err := rdb.Ping(ctx).Err(); err != nil {
_ = rdb.Close()
return nil, fmt.Errorf("redis ping failed: %w", err)
}
return NewRedisRepository(rdb, logger), nil
}

const zKeyPayloads = "execution_payloads:z"

func (r *RedisRepository) SavePayload(ctx context.Context, info *types.PayloadInfo) error {
if info.InsertedAt.IsZero() {
info.InsertedAt = time.Now().UTC()
}
data, err := json.Marshal(info)
if err != nil {
return fmt.Errorf("marshal payload: %w", err)
}

score := float64(info.BlockHeight)

pipe := r.redisClient.TxPipeline()
pipe.ZRemRangeByScore(ctx, // Remove existing payload at this height with min=max=height
zKeyPayloads,
strconv.FormatFloat(score, 'f', -1, 64), // min
strconv.FormatFloat(score, 'f', -1, 64), // max
)
pipe.ZAdd(ctx, zKeyPayloads, redis.Z{
Score: score,
Member: string(data),
})
if _, err := pipe.Exec(ctx); err != nil {
r.logger.Error("Failed to save payload to Redis",
"payload_id", info.PayloadID,
"block_height", info.BlockHeight,
"error", err,
)
return fmt.Errorf("save payload: %w", err)
}

r.logger.Debug("Payload saved to Redis",
"payload_id", info.PayloadID,
"block_height", info.BlockHeight,
)
return nil
}

func (r *RedisRepository) GetPayloadsSince(ctx context.Context, sinceHeight uint64, limit int) ([]types.PayloadInfo, error) {
if limit <= 0 {
return nil, fmt.Errorf("limit must be greater than 0")
}

rangeBy := &redis.ZRangeBy{
Min: strconv.FormatUint(sinceHeight, 10),
Max: "+inf",
Offset: 0,
Count: int64(limit),
}
members, err := r.redisClient.ZRangeByScore(ctx, zKeyPayloads, rangeBy).Result()
if err != nil {
return nil, fmt.Errorf("ZRangeByScore: %w", err)
}

result := make([]types.PayloadInfo, 0, len(members))
for _, m := range members {
var pi types.PayloadInfo
if err := json.Unmarshal([]byte(m), &pi); err != nil {
return nil, fmt.Errorf("unmarshal payload: %w", err)
}
result = append(result, pi)
}

r.logger.Debug("Retrieved payloads since height",
"since_height", sinceHeight,
"count", len(result),
"limit", limit,
)
return result, nil
}

func (r *RedisRepository) GetPayloadByHeight(ctx context.Context, height uint64) (*types.PayloadInfo, error) {
hStr := strconv.FormatUint(height, 10)
members, err := r.redisClient.ZRangeByScore(ctx, zKeyPayloads, &redis.ZRangeBy{
Min: hStr,
Max: hStr,
Offset: 0,
Count: 1,
}).Result()
if err != nil {
return nil, fmt.Errorf("ZRangeByScore: %w", err)
}
if len(members) == 0 {
return nil, fmt.Errorf("payload not found")
}
var pi types.PayloadInfo
if err := json.Unmarshal([]byte(members[0]), &pi); err != nil {
return nil, fmt.Errorf("unmarshal payload: %w", err)
}
return &pi, nil
}

func (r *RedisRepository) GetLatestPayload(ctx context.Context) (*types.PayloadInfo, error) {
items, err := r.redisClient.ZRevRangeWithScores(ctx, zKeyPayloads, 0, 0).Result()
if err != nil {
return nil, fmt.Errorf("ZRevRangeWithScores: %w", err)
}
if len(items) == 0 {
return nil, nil
}
top := items[0]
if top.Score < 0 {
return nil, fmt.Errorf("negative height score: %v", top.Score)
}
str, ok := top.Member.(string)
if !ok {
return nil, fmt.Errorf("unexpected member type %T, expected string", top.Member)
}
var pi types.PayloadInfo
if err := json.Unmarshal([]byte(str), &pi); err != nil {
return nil, fmt.Errorf("unmarshal payload: %w", err)
}
if pi.BlockHeight == 0 {
pi.BlockHeight = uint64(top.Score)
}
return &pi, nil
}

func (r *RedisRepository) GetLatestHeight(ctx context.Context) (uint64, error) {
startIdx := int64(0)
stopIdx := int64(0)
items, err := r.redisClient.ZRevRangeWithScores(ctx, zKeyPayloads, startIdx, stopIdx).Result()
if err != nil {
return 0, fmt.Errorf("ZRevRangeWithScores: %w", err)
}
if len(items) == 0 {
return 0, nil
}
if items[0].Score < 0 {
return 0, fmt.Errorf("negative height score: %v", items[0].Score)
}
return uint64(items[0].Score), nil
}

func (r *RedisRepository) Close() error {
r.logger.Info("Closing Redis client")
return r.redisClient.Close()
}
Loading
Loading