diff --git a/cl/cmd/singlenode/main.go b/cl/cmd/singlenode/main.go index b259e73ab..c5b11dcc4 100644 --- a/cl/cmd/singlenode/main.go +++ b/cl/cmd/singlenode/main.go @@ -191,6 +191,15 @@ var ( Category: categoryDatabase, }) + redisURLFlag = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "redis-url", + Usage: "Redis URL for storing payloads. If empty, saving to Redis is disabled. (e.g., 'redis://localhost:6379/2')", + + EnvVars: []string{"LEADER_REDIS_URL"}, + Value: "", + Category: categoryDatabase, + }) + apiAddrFlag = altsrc.NewStringFlag(&cli.StringFlag{ Name: "api-addr", Usage: "Address for member node API endpoint (e.g., ':9090'). If empty, API is disabled.", @@ -237,6 +246,7 @@ func main() { priorityFeeReceiptFlag, healthAddrPortFlag, postgresDSNFlag, + redisURLFlag, apiAddrFlag, nonAuthRpcUrlFlag, txPoolPollingIntervalFlag, @@ -252,6 +262,7 @@ func main() { logTagsFlag, healthAddrPortFlag, postgresDSNFlag, + redisURLFlag, syncBatchSizeFlag, } @@ -338,6 +349,7 @@ func startLeaderNode(c *cli.Context) error { PriorityFeeReceipt: c.String(priorityFeeReceiptFlag.Name), HealthAddr: c.String(healthAddrPortFlag.Name), PostgresDSN: c.String(postgresDSNFlag.Name), + RedisURL: c.String(redisURLFlag.Name), APIAddr: c.String(apiAddrFlag.Name), NonAuthRpcURL: c.String(nonAuthRpcUrlFlag.Name), TxPoolPollingInterval: c.Duration(txPoolPollingIntervalFlag.Name), @@ -384,13 +396,24 @@ func startFollowerNode(c *cli.Context) error { defer rootCancel() postgresDSN := c.String(postgresDSNFlag.Name) - if postgresDSN == "" { - return fmt.Errorf("postgresDSN is required") - } - repo, err := payloadstore.NewPostgresFollower(rootCtx, postgresDSN, logger) - if err != nil { - return fmt.Errorf("failed to initialize payload repository: %w", err) + redisURL := c.String(redisURLFlag.Name) + var sharedDB follower.PayloadDB + if postgresDSN != "" { + pgRepo, err := payloadstore.NewPostgresFollower(rootCtx, postgresDSN, logger) + if err != nil { + return fmt.Errorf("failed to create postgres repository: %w", err) + } + sharedDB = pgRepo + } else if redisURL != "" { + redisRepo, err := payloadstore.NewRedisRepositoryFromURL(rootCtx, redisURL, logger) + if err != nil { + return fmt.Errorf("failed to create redis repository: %w", err) + } + sharedDB = redisRepo + } else { + return fmt.Errorf("postgresDSN or redisURL must be provided") } + syncBatchSize := c.Uint64(syncBatchSizeFlag.Name) if syncBatchSize == 0 { return fmt.Errorf("sync-batch-size is required") @@ -420,7 +443,7 @@ func startFollowerNode(c *cli.Context) error { followerNode, err := follower.NewFollower( logger, - repo, + sharedDB, syncBatchSize, bb, healthAddr, diff --git a/cl/go.mod b/cl/go.mod index 3325d7dd5..f41eeec4a 100644 --- a/cl/go.mod +++ b/cl/go.mod @@ -5,6 +5,7 @@ go 1.23.0 toolchain go1.24.0 require ( + github.com/alicebob/miniredis/v2 v2.35.0 github.com/go-redis/redismock/v9 v9.2.0 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/golang/mock v1.6.0 @@ -56,6 +57,7 @@ require ( github.com/tklauser/numcpus v0.7.0 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.22.0 // indirect rsc.io/tmplfunc v0.0.3 // indirect diff --git a/cl/go.sum b/cl/go.sum index f3f5a9ab3..458300df7 100644 --- a/cl/go.sum +++ b/cl/go.sum @@ -8,6 +8,8 @@ github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDO github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= +github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI= +github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.20.0 h1:2F+rfL86jE2d/bmw7OhqUg2Sj/1rURkBn3MdfoPyRVU= @@ -205,6 +207,8 @@ github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGC github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= diff --git a/cl/singlenode/follower/follower.go b/cl/singlenode/follower/follower.go index 5abea967e..3c8da67f0 100644 --- a/cl/singlenode/follower/follower.go +++ b/cl/singlenode/follower/follower.go @@ -15,7 +15,7 @@ import ( type Follower struct { logger *slog.Logger - sharedDB payloadDB + sharedDB PayloadDB syncBatchSize uint64 payloadCh chan types.PayloadInfo bbMutex sync.RWMutex @@ -30,7 +30,7 @@ const ( defaultBackoff = 200 * time.Millisecond ) -type payloadDB interface { +type PayloadDB interface { GetPayloadsSince(ctx context.Context, sinceHeight uint64, limit int) ([]types.PayloadInfo, error) GetLatestHeight(ctx context.Context) (uint64, error) } @@ -43,14 +43,11 @@ type blockBuilder interface { func NewFollower( logger *slog.Logger, - sharedDB payloadDB, + sharedDB PayloadDB, syncBatchSize uint64, bb blockBuilder, healthAddr string, ) (*Follower, error) { - if sharedDB == nil { - return nil, errors.New("payload repository not provided") - } if syncBatchSize == 0 { return nil, errors.New("sync batch size must be greater than 0") } diff --git a/cl/singlenode/payloadstore/redis.go b/cl/singlenode/payloadstore/redis.go new file mode 100644 index 000000000..79df04409 --- /dev/null +++ b/cl/singlenode/payloadstore/redis.go @@ -0,0 +1,178 @@ +package payloadstore + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strconv" + "time" + + "github.com/primev/mev-commit/cl/types" + "github.com/redis/go-redis/v9" +) + +type RedisRepository struct { + redisClient *redis.Client + logger *slog.Logger +} + +func NewRedisRepository(redisClient *redis.Client, logger *slog.Logger) *RedisRepository { + return &RedisRepository{ + redisClient: redisClient, + logger: logger.With("component", "RedisRepository"), + } +} + +func NewRedisRepositoryFromURL(ctx context.Context, redisURL string, logger *slog.Logger) (*RedisRepository, error) { + opts, err := redis.ParseURL(redisURL) + if err != nil { + return nil, fmt.Errorf("invalid redis url: %w", err) + } + rdb := redis.NewClient(opts) + if err := rdb.Ping(ctx).Err(); err != nil { + _ = rdb.Close() + return nil, fmt.Errorf("redis ping failed: %w", err) + } + return NewRedisRepository(rdb, logger), nil +} + +const zKeyPayloads = "execution_payloads:z" + +func (r *RedisRepository) SavePayload(ctx context.Context, info *types.PayloadInfo) error { + if info.InsertedAt.IsZero() { + info.InsertedAt = time.Now().UTC() + } + data, err := json.Marshal(info) + if err != nil { + return fmt.Errorf("marshal payload: %w", err) + } + + score := float64(info.BlockHeight) + + pipe := r.redisClient.TxPipeline() + pipe.ZRemRangeByScore(ctx, // Remove existing payload at this height with min=max=height + zKeyPayloads, + strconv.FormatFloat(score, 'f', -1, 64), // min + strconv.FormatFloat(score, 'f', -1, 64), // max + ) + pipe.ZAdd(ctx, zKeyPayloads, redis.Z{ + Score: score, + Member: string(data), + }) + if _, err := pipe.Exec(ctx); err != nil { + r.logger.Error("Failed to save payload to Redis", + "payload_id", info.PayloadID, + "block_height", info.BlockHeight, + "error", err, + ) + return fmt.Errorf("save payload: %w", err) + } + + r.logger.Debug("Payload saved to Redis", + "payload_id", info.PayloadID, + "block_height", info.BlockHeight, + ) + return nil +} + +func (r *RedisRepository) GetPayloadsSince(ctx context.Context, sinceHeight uint64, limit int) ([]types.PayloadInfo, error) { + if limit <= 0 { + return nil, fmt.Errorf("limit must be greater than 0") + } + + rangeBy := &redis.ZRangeBy{ + Min: strconv.FormatUint(sinceHeight, 10), + Max: "+inf", + Offset: 0, + Count: int64(limit), + } + members, err := r.redisClient.ZRangeByScore(ctx, zKeyPayloads, rangeBy).Result() + if err != nil { + return nil, fmt.Errorf("ZRangeByScore: %w", err) + } + + result := make([]types.PayloadInfo, 0, len(members)) + for _, m := range members { + var pi types.PayloadInfo + if err := json.Unmarshal([]byte(m), &pi); err != nil { + return nil, fmt.Errorf("unmarshal payload: %w", err) + } + result = append(result, pi) + } + + r.logger.Debug("Retrieved payloads since height", + "since_height", sinceHeight, + "count", len(result), + "limit", limit, + ) + return result, nil +} + +func (r *RedisRepository) GetPayloadByHeight(ctx context.Context, height uint64) (*types.PayloadInfo, error) { + hStr := strconv.FormatUint(height, 10) + members, err := r.redisClient.ZRangeByScore(ctx, zKeyPayloads, &redis.ZRangeBy{ + Min: hStr, + Max: hStr, + Offset: 0, + Count: 1, + }).Result() + if err != nil { + return nil, fmt.Errorf("ZRangeByScore: %w", err) + } + if len(members) == 0 { + return nil, fmt.Errorf("payload not found") + } + var pi types.PayloadInfo + if err := json.Unmarshal([]byte(members[0]), &pi); err != nil { + return nil, fmt.Errorf("unmarshal payload: %w", err) + } + return &pi, nil +} + +func (r *RedisRepository) GetLatestPayload(ctx context.Context) (*types.PayloadInfo, error) { + items, err := r.redisClient.ZRevRangeWithScores(ctx, zKeyPayloads, 0, 0).Result() + if err != nil { + return nil, fmt.Errorf("ZRevRangeWithScores: %w", err) + } + if len(items) == 0 { + return nil, nil + } + top := items[0] + if top.Score < 0 { + return nil, fmt.Errorf("negative height score: %v", top.Score) + } + str, ok := top.Member.(string) + if !ok { + return nil, fmt.Errorf("unexpected member type %T, expected string", top.Member) + } + var pi types.PayloadInfo + if err := json.Unmarshal([]byte(str), &pi); err != nil { + return nil, fmt.Errorf("unmarshal payload: %w", err) + } + if pi.BlockHeight == 0 { + pi.BlockHeight = uint64(top.Score) + } + return &pi, nil +} + +func (r *RedisRepository) GetLatestHeight(ctx context.Context) (uint64, error) { + startIdx := int64(0) + stopIdx := int64(0) + items, err := r.redisClient.ZRevRangeWithScores(ctx, zKeyPayloads, startIdx, stopIdx).Result() + if err != nil { + return 0, fmt.Errorf("ZRevRangeWithScores: %w", err) + } + if len(items) == 0 { + return 0, nil + } + if items[0].Score < 0 { + return 0, fmt.Errorf("negative height score: %v", items[0].Score) + } + return uint64(items[0].Score), nil +} + +func (r *RedisRepository) Close() error { + r.logger.Info("Closing Redis client") + return r.redisClient.Close() +} diff --git a/cl/singlenode/payloadstore/redis_test.go b/cl/singlenode/payloadstore/redis_test.go new file mode 100644 index 000000000..545c93066 --- /dev/null +++ b/cl/singlenode/payloadstore/redis_test.go @@ -0,0 +1,277 @@ +package payloadstore + +import ( + "context" + "io" + "log/slog" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/primev/mev-commit/cl/types" + "github.com/redis/go-redis/v9" +) + +func newTestRepo(t *testing.T) (*RedisRepository, *miniredis.Miniredis, func()) { + t.Helper() + + mr, err := miniredis.Run() + if err != nil { + t.Fatalf("miniredis.Run error: %v", err) + } + + rdb := redis.NewClient(&redis.Options{ + Addr: mr.Addr(), + }) + + logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelDebug})) + + repo := NewRedisRepository(rdb, logger) + + cleanup := func() { + _ = repo.Close() + mr.Close() + } + return repo, mr, cleanup +} + +func TestEmptyRepoLatestHeightIsZero(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + + ctx := context.Background() + + h, err := repo.GetLatestHeight(ctx) + if err != nil { + t.Fatalf("GetLatestHeight error: %v", err) + } + if h != 0 { + t.Fatalf("expected latest height 0 for empty repo, got %d", h) + } +} + +func TestSaveAndGetLatest(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + + ctx := context.Background() + + now := time.Now().UTC() + p1 := &types.PayloadInfo{PayloadID: "a", ExecutionPayload: "pa", BlockHeight: 10, InsertedAt: now} + p2 := &types.PayloadInfo{PayloadID: "b", ExecutionPayload: "pb", BlockHeight: 12, InsertedAt: now} + p3 := &types.PayloadInfo{PayloadID: "c", ExecutionPayload: "pc", BlockHeight: 15, InsertedAt: now} + + if err := repo.SavePayload(ctx, p1); err != nil { + t.Fatalf("SavePayload p1 error: %v", err) + } + if err := repo.SavePayload(ctx, p2); err != nil { + t.Fatalf("SavePayload p2 error: %v", err) + } + if err := repo.SavePayload(ctx, p3); err != nil { + t.Fatalf("SavePayload p3 error: %v", err) + } + + latest, err := repo.GetLatestHeight(ctx) + if err != nil { + t.Fatalf("GetLatestHeight error: %v", err) + } + if latest != 15 { + t.Fatalf("expected latest height 15, got %d", latest) + } +} + +func TestGetPayloadsSince(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + + ctx := context.Background() + now := time.Now().UTC() + + payloads := []*types.PayloadInfo{ + {PayloadID: "h10", ExecutionPayload: "p10", BlockHeight: 10, InsertedAt: now}, + {PayloadID: "h12", ExecutionPayload: "p12", BlockHeight: 12, InsertedAt: now}, + {PayloadID: "h15", ExecutionPayload: "p15", BlockHeight: 15, InsertedAt: now}, + {PayloadID: "h20", ExecutionPayload: "p20", BlockHeight: 20, InsertedAt: now}, + } + for _, p := range payloads { + if err := repo.SavePayload(ctx, p); err != nil { + t.Fatalf("SavePayload error: %v", err) + } + } + + got, err := repo.GetPayloadsSince(ctx, 12, 100) + if err != nil { + t.Fatalf("GetPayloadsSince error: %v", err) + } + if len(got) != 3 { + t.Fatalf("expected 3 payloads, got %d", len(got)) + } + if got[0].BlockHeight != 12 || got[1].BlockHeight != 15 || got[2].BlockHeight != 20 { + t.Fatalf("unexpected order or heights: %#v", got) + } + + got, err = repo.GetPayloadsSince(ctx, 10, 2) + if err != nil { + t.Fatalf("GetPayloadsSince error: %v", err) + } + if len(got) != 2 { + t.Fatalf("expected 2 payloads, got %d", len(got)) + } + if got[0].BlockHeight != 10 || got[1].BlockHeight != 12 { + t.Fatalf("unexpected order or heights with limit=2: %#v", got) + } + if got[0].PayloadID != "h10" || got[1].PayloadID != "h12" { + t.Fatalf("unexpected order or payload IDs: %#v", got) + } + if got[0].ExecutionPayload != "p10" || got[1].ExecutionPayload != "p12" { + t.Fatalf("unexpected order or execution payloads: %#v", got) + } + if got[0].InsertedAt != now || got[1].InsertedAt != now { + t.Fatalf("unexpected order or inserted at times: %#v", got) + } +} + +func TestUpsertByHeight(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + + ctx := context.Background() + now := time.Now().UTC() + + orig := &types.PayloadInfo{PayloadID: "orig", ExecutionPayload: "p1", BlockHeight: 12, InsertedAt: now} + if err := repo.SavePayload(ctx, orig); err != nil { + t.Fatalf("SavePayload orig error: %v", err) + } + + updated := &types.PayloadInfo{PayloadID: "updated", ExecutionPayload: "p2", BlockHeight: 12, InsertedAt: now.Add(time.Second)} + if err := repo.SavePayload(ctx, updated); err != nil { + t.Fatalf("SavePayload updated error: %v", err) + } + + got, err := repo.GetPayloadsSince(ctx, 12, 10) + if err != nil { + t.Fatalf("GetPayloadsSince error: %v", err) + } + if len(got) != 1 { + t.Fatalf("expected 1 payload at height 12, got %d", len(got)) + } + if got[0].PayloadID != "updated" || got[0].ExecutionPayload != "p2" { + t.Fatalf("upsert failed, got %#v", got[0]) + } +} + +func TestClose(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + if err := repo.Close(); err != nil { + t.Fatalf("Close error: %v", err) + } +} + +func TestGetPayloadByHeight_Found(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + + ctx := context.Background() + now := time.Now().UTC() + + p5 := &types.PayloadInfo{PayloadID: "id5", ExecutionPayload: "p5", BlockHeight: 5, InsertedAt: now} + p7 := &types.PayloadInfo{PayloadID: "id7", ExecutionPayload: "p7", BlockHeight: 7, InsertedAt: now} + + if err := repo.SavePayload(ctx, p5); err != nil { + t.Fatalf("SavePayload p5 error: %v", err) + } + if err := repo.SavePayload(ctx, p7); err != nil { + t.Fatalf("SavePayload p7 error: %v", err) + } + + got, err := repo.GetPayloadByHeight(ctx, 7) + if err != nil { + t.Fatalf("GetPayloadByHeight error: %v", err) + } + if got == nil { + t.Fatalf("expected payload, got nil") + } + if got.PayloadID != "id7" || got.ExecutionPayload != "p7" || got.BlockHeight != 7 || got.InsertedAt != now { + t.Fatalf("unexpected payload: %#v", got) + } + + got, err = repo.GetPayloadByHeight(ctx, 5) + if err != nil { + t.Fatalf("GetPayloadByHeight error: %v", err) + } + if got == nil { + t.Fatalf("expected payload, got nil") + } + if got.PayloadID != "id5" || got.ExecutionPayload != "p5" || got.BlockHeight != 5 || got.InsertedAt != now { + t.Fatalf("unexpected payload: %#v", got) + } +} + +func TestGetPayloadByHeight_NotFound(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + + ctx := context.Background() + now := time.Now().UTC() + + p5 := &types.PayloadInfo{PayloadID: "id5", ExecutionPayload: "p5", BlockHeight: 5, InsertedAt: now} + if err := repo.SavePayload(ctx, p5); err != nil { + t.Fatalf("SavePayload p5 error: %v", err) + } + + got, err := repo.GetPayloadByHeight(ctx, 6) + if err == nil { + t.Fatalf("expected error for missing height, got nil (payload: %#v)", got) + } + if got != nil { + t.Fatalf("expected nil payload on not found, got: %#v", got) + } +} + +func TestGetLatestPayload_Empty(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + + ctx := context.Background() + got, err := repo.GetLatestPayload(ctx) + if err != nil { + t.Fatalf("GetLatestPayload error: %v", err) + } + if got != nil { + t.Fatalf("expected nil payload for empty repo, got %#v", got) + } +} + +func TestGetLatestPayload_ReturnsHighest(t *testing.T) { + repo, _, cleanup := newTestRepo(t) + defer cleanup() + + ctx := context.Background() + now := time.Now().UTC() + + p1 := &types.PayloadInfo{PayloadID: "a", ExecutionPayload: "pa", BlockHeight: 10, InsertedAt: now} + p2 := &types.PayloadInfo{PayloadID: "b", ExecutionPayload: "pb", BlockHeight: 12, InsertedAt: now} + p3 := &types.PayloadInfo{PayloadID: "c", ExecutionPayload: "pc", BlockHeight: 15, InsertedAt: now} + + if err := repo.SavePayload(ctx, p1); err != nil { + t.Fatalf("SavePayload p1 error: %v", err) + } + if err := repo.SavePayload(ctx, p2); err != nil { + t.Fatalf("SavePayload p2 error: %v", err) + } + if err := repo.SavePayload(ctx, p3); err != nil { + t.Fatalf("SavePayload p3 error: %v", err) + } + + got, err := repo.GetLatestPayload(ctx) + if err != nil { + t.Fatalf("GetLatestPayload error: %v", err) + } + if got == nil { + t.Fatalf("expected payload, got nil") + } + if got.BlockHeight != 15 || got.PayloadID != "c" || got.ExecutionPayload != "pc" || got.InsertedAt != now { + t.Fatalf("unexpected latest payload: %#v", got) + } +} diff --git a/cl/singlenode/singlenode.go b/cl/singlenode/singlenode.go index 34434019f..126606b48 100644 --- a/cl/singlenode/singlenode.go +++ b/cl/singlenode/singlenode.go @@ -35,6 +35,7 @@ type Config struct { PriorityFeeReceipt string HealthAddr string PostgresDSN string + RedisURL string APIAddr string NonAuthRpcURL string TxPoolPollingInterval time.Duration @@ -127,8 +128,20 @@ func NewSingleNodeApp( } pRepo = repo logger.Info("Payload repository initialized, payloads will be saved to PostgreSQL.") + } else if cfg.RedisURL != "" { + repo, err := payloadstore.NewRedisRepositoryFromURL(ctx, cfg.RedisURL, logger) + if err != nil { + cancel() + logger.Error( + "failed to create payload repository", + "error", err, + ) + return nil, fmt.Errorf("failed to initialize payload repository: %w", err) + } + pRepo = repo + logger.Info("Payload repository initialized, payloads will be saved to Redis.") } else { - logger.Info("PostgresDSN not provided, payload saving to DB is disabled.") + logger.Info("PostgresDSN or RedisURL not provided, payload saving to DB is disabled.") } var payloadServer *api.PayloadServer diff --git a/cl/types/types.go b/cl/types/types.go index 51a5a2a91..dd4a0557c 100644 --- a/cl/types/types.go +++ b/cl/types/types.go @@ -43,10 +43,10 @@ const ( ) type PayloadInfo struct { - PayloadID string - ExecutionPayload string - BlockHeight uint64 - InsertedAt time.Time + PayloadID string `json:"payload_id"` + ExecutionPayload string `json:"execution_payload"` + BlockHeight uint64 `json:"block_height"` + InsertedAt time.Time `json:"inserted_at"` } type PayloadRepository interface { diff --git a/go.work.sum b/go.work.sum index bcbb7f26f..6fe9a7538 100644 --- a/go.work.sum +++ b/go.work.sum @@ -735,8 +735,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= -github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= -github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= @@ -810,7 +808,6 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/checkpoint-restore/go-criu/v5 v5.3.0 h1:wpFFOoomK3389ue2lAb0Boag6XPht5QYpipxmSNL4d8= github.com/checkpoint-restore/go-criu/v5 v5.3.0/go.mod h1:E/eQpaFtUKGOOSEBZgmKAcn+zUUwWxqcaKZlF54wK8E= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764= @@ -1553,7 +1550,6 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= -github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mmcloughlin/profile v0.1.1 h1:jhDmAqPyebOsVDOCICJoINoLb/AnLBaUw58nFzxWS2w= github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= @@ -1699,6 +1695,8 @@ github.com/quic-go/qtls-go1-19 v0.2.0 h1:Cvn2WdhyViFUHoOqK52i51k4nDX8EwIh5VJiVM4 github.com/quic-go/qtls-go1-19 v0.2.0/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05RMAlajtnyOI= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -2197,7 +2195,6 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=