Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 41 additions & 41 deletions cl/singlenode/payloadstore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,32 @@ func NewRedisRepositoryFromURL(ctx context.Context, redisURL string, logger *slo
return NewRedisRepository(rdb, logger), nil
}

const zKeyPayloads = "execution_payloads:z"
const (
zKeyPayloads = "execution_payloads:z"
keyLatestHeight = "execution_payloads:latest_height"
)

var savePayloadScript = redis.NewScript(`
-- KEYS[1] = zset key
-- KEYS[2] = latest height string key
-- ARGV[1] = score (height)
-- ARGV[2] = member (JSON string)

local zkey = KEYS[1]
local skey = KEYS[2]
local score = tonumber(ARGV[1])
local member = ARGV[2]

redis.call('ZREMRANGEBYSCORE', zkey, score, score)
redis.call('ZADD', zkey, score, member)

local cur = redis.call('GET', skey)
if not cur or score > tonumber(cur) then
redis.call('SET', skey, tostring(score))
end

return 1
`)

func (r *RedisRepository) SavePayload(ctx context.Context, info *types.PayloadInfo) error {
if info.InsertedAt.IsZero() {
Expand All @@ -50,17 +75,8 @@ func (r *RedisRepository) SavePayload(ctx context.Context, info *types.PayloadIn

score := float64(info.BlockHeight)

pipe := r.redisClient.TxPipeline()
pipe.ZRemRangeByScore(ctx, // Remove existing payload at this height with min=max=height
zKeyPayloads,
strconv.FormatFloat(score, 'f', -1, 64), // min
strconv.FormatFloat(score, 'f', -1, 64), // max
)
pipe.ZAdd(ctx, zKeyPayloads, redis.Z{
Score: score,
Member: string(data),
})
if _, err := pipe.Exec(ctx); err != nil {
res := savePayloadScript.Run(ctx, r.redisClient, []string{zKeyPayloads, keyLatestHeight}, score, string(data))
if err := res.Err(); err != nil {
r.logger.Error("Failed to save payload to Redis",
"payload_id", info.PayloadID,
"block_height", info.BlockHeight,
Expand Down Expand Up @@ -131,45 +147,29 @@ func (r *RedisRepository) GetPayloadByHeight(ctx context.Context, height uint64)
}

func (r *RedisRepository) GetLatestPayload(ctx context.Context) (*types.PayloadInfo, error) {
items, err := r.redisClient.ZRevRangeWithScores(ctx, zKeyPayloads, 0, 0).Result()
h, err := r.GetLatestHeight(ctx)
if err != nil {
return nil, fmt.Errorf("ZRevRangeWithScores: %w", err)
return nil, err
}
if len(items) == 0 {
if h == 0 {
return nil, nil
}
top := items[0]
if top.Score < 0 {
return nil, fmt.Errorf("negative height score: %v", top.Score)
}
str, ok := top.Member.(string)
if !ok {
return nil, fmt.Errorf("unexpected member type %T, expected string", top.Member)
}
var pi types.PayloadInfo
if err := json.Unmarshal([]byte(str), &pi); err != nil {
return nil, fmt.Errorf("unmarshal payload: %w", err)
}
if pi.BlockHeight == 0 {
pi.BlockHeight = uint64(top.Score)
}
return &pi, nil
return r.GetPayloadByHeight(ctx, h)
}

func (r *RedisRepository) GetLatestHeight(ctx context.Context) (uint64, error) {
startIdx := int64(0)
stopIdx := int64(0)
items, err := r.redisClient.ZRevRangeWithScores(ctx, zKeyPayloads, startIdx, stopIdx).Result()
if err != nil {
return 0, fmt.Errorf("ZRevRangeWithScores: %w", err)
}
if len(items) == 0 {
s, err := r.redisClient.Get(ctx, keyLatestHeight).Result()
if err == redis.Nil || s == "" {
return 0, nil
}
if items[0].Score < 0 {
return 0, fmt.Errorf("negative height score: %v", items[0].Score)
if err != nil {
return 0, fmt.Errorf("GET latest_height: %w", err)
}
h, perr := strconv.ParseUint(s, 10, 64)
if perr != nil {
return 0, fmt.Errorf("parse uint: %w", perr)
}
return uint64(items[0].Score), nil
return h, nil
}

func (r *RedisRepository) Close() error {
Expand Down
Loading