Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 48 additions & 30 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@ var ErrNotRetried = fmt.Errorf("nothing retried")
type Client struct {
namespace string
pool Pool
logger StructuredLogger
}

// NewClient creates a new Client with the specified redis namespace and connection pool.
func NewClient(namespace string, pool Pool) *Client {
return &Client{
func NewClient(namespace string, pool Pool, opts ...ClientOption) *Client {
c := &Client{
namespace: namespace,
pool: pool,
logger: noopLogger,
}

for _, o := range opts {
o(c)
}

return c
}

// WorkerPoolHeartbeat represents the heartbeat from a worker pool. WorkerPool's write a heartbeat every 5 seconds so we know they're alive and includes config information.
Expand Down Expand Up @@ -62,7 +70,7 @@ func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error) {
}

if err := conn.Flush(); err != nil {
logError("worker_pool_statuses.flush", err)
c.logger.Error("worker_pool_statuses.flush", errAttr(err))
return nil, err
}

Expand All @@ -71,7 +79,7 @@ func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error) {
for _, wpid := range workerPoolIDs {
vals, err := redis.Strings(conn.Receive())
if err != nil {
logError("worker_pool_statuses.receive", err)
c.logger.Error("worker_pool_statuses.receive", errAttr(err))
return nil, err
}

Expand Down Expand Up @@ -106,7 +114,7 @@ func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error) {
sort.Strings(heartbeat.WorkerIDs)
}
if err != nil {
logError("worker_pool_statuses.parse", err)
c.logger.Error("worker_pool_statuses.parse", errAttr(err))
return nil, err
}
}
Expand Down Expand Up @@ -138,7 +146,7 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) {

hbs, err := c.WorkerPoolHeartbeats()
if err != nil {
logError("worker_observations.worker_pool_heartbeats", err)
c.logger.Error("worker_observations.worker_pool_heartbeats", errAttr(err))
return nil, err
}

Expand All @@ -153,7 +161,7 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) {
}

if err := conn.Flush(); err != nil {
logError("worker_observations.flush", err)
c.logger.Error("worker_observations.flush", errAttr(err))
return nil, err
}

Expand All @@ -162,7 +170,7 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) {
for _, wid := range workerIDs {
vals, err := redis.Strings(conn.Receive())
if err != nil {
logError("worker_observations.receive", err)
c.logger.Error("worker_observations.receive", errAttr(err))
return nil, err
}

Expand Down Expand Up @@ -191,7 +199,7 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) {
ob.CheckinAt, err = strconv.ParseInt(value, 10, 64)
}
if err != nil {
logError("worker_observations.parse", err)
c.logger.Error("worker_observations.parse", errAttr(err))
return nil, err
}
}
Expand Down Expand Up @@ -226,7 +234,7 @@ func (c *Client) Queues() ([]*Queue, error) {
}

if err := conn.Flush(); err != nil {
logError("client.queues.flush", err)
c.logger.Error("client.queues.flush", errAttr(err))
return nil, err
}

Expand All @@ -235,7 +243,7 @@ func (c *Client) Queues() ([]*Queue, error) {
for _, jobName := range jobNames {
count, err := redis.Int64(conn.Receive())
if err != nil {
logError("client.queues.receive", err)
c.logger.Error("client.queues.receive", errAttr(err))
return nil, err
}

Expand All @@ -254,7 +262,7 @@ func (c *Client) Queues() ([]*Queue, error) {
}

if err := conn.Flush(); err != nil {
logError("client.queues.flush2", err)
c.logger.Error("client.queues.flush2", errAttr(err))
return nil, err
}

Expand All @@ -264,13 +272,13 @@ func (c *Client) Queues() ([]*Queue, error) {
if s.Count > 0 {
b, err := redis.Bytes(conn.Receive())
if err != nil {
logError("client.queues.receive2", err)
c.logger.Error("client.queues.receive2", errAttr(err))
return nil, err
}

job, err := newJob(b, nil, nil)
if err != nil {
logError("client.queues.new_job", err)
c.logger.Error("client.queues.new_job", errAttr(err))
}
s.Latency = now - job.EnqueuedAt
}
Expand Down Expand Up @@ -302,7 +310,7 @@ func (c *Client) ScheduledJobs(page uint) ([]*ScheduledJob, int64, error) {
key := redisKeyScheduled(c.namespace)
jobsWithScores, count, err := c.getZsetPage(key, page)
if err != nil {
logError("client.scheduled_jobs.get_zset_page", err)
c.logger.Error("client.scheduled_jobs.get_zset_page", errAttr(err))
return nil, 0, err
}

Expand All @@ -320,7 +328,7 @@ func (c *Client) RetryJobs(page uint) ([]*RetryJob, int64, error) {
key := redisKeyRetry(c.namespace)
jobsWithScores, count, err := c.getZsetPage(key, page)
if err != nil {
logError("client.retry_jobs.get_zset_page", err)
c.logger.Error("client.retry_jobs.get_zset_page", errAttr(err))
return nil, 0, err
}

Expand All @@ -338,7 +346,7 @@ func (c *Client) DeadJobs(page uint) ([]*DeadJob, int64, error) {
key := redisKeyDead(c.namespace)
jobsWithScores, count, err := c.getZsetPage(key, page)
if err != nil {
logError("client.dead_jobs.get_zset_page", err)
c.logger.Error("client.dead_jobs.get_zset_page", errAttr(err))
return nil, 0, err
}

Expand Down Expand Up @@ -368,7 +376,7 @@ func (c *Client) RetryDeadJob(diedAt int64, jobID string) error {
// Get queues for job names
queues, err := c.Queues()
if err != nil {
logError("client.retry_all_dead_jobs.queues", err)
c.logger.Error("client.retry_all_dead_jobs.queues", errAttr(err))
return err
}

Expand All @@ -395,7 +403,7 @@ func (c *Client) RetryDeadJob(diedAt int64, jobID string) error {

cnt, err := redis.Int64(script.Do(conn, args...))
if err != nil {
logError("client.retry_dead_job.do", err)
c.logger.Error("client.retry_dead_job.do", errAttr(err))
return err
}

Expand All @@ -411,7 +419,7 @@ func (c *Client) RetryAllDeadJobs() error {
// Get queues for job names
queues, err := c.Queues()
if err != nil {
logError("client.retry_all_dead_jobs.queues", err)
c.logger.Error("client.retry_all_dead_jobs.queues", errAttr(err))
return err
}

Expand Down Expand Up @@ -440,7 +448,7 @@ func (c *Client) RetryAllDeadJobs() error {
for i := 0; i < 1000; i++ {
res, err := redis.Int64(script.Do(conn, args...))
if err != nil {
logError("client.retry_all_dead_jobs.do", err)
c.logger.Error("client.retry_all_dead_jobs.do", errAttr(err))
return err
}

Expand All @@ -458,7 +466,7 @@ func (c *Client) DeleteAllDeadJobs() error {
defer conn.Close()
_, err := conn.Do("DEL", redisKeyDead(c.namespace))
if err != nil {
logError("client.delete_all_dead_jobs", err)
c.logger.Error("client.delete_all_dead_jobs", errAttr(err))
return err
}

Expand All @@ -476,22 +484,22 @@ func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error {
if len(jobBytes) > 0 {
job, err := newJob(jobBytes, nil, nil)
if err != nil {
logError("client.delete_scheduled_job.new_job", err)
c.logger.Error("client.delete_scheduled_job.new_job", errAttr(err))
return err
}

if job.Unique {
uniqueKey, err := redisKeyUniqueJob(c.namespace, job.Name, job.Args)
if err != nil {
logError("client.delete_scheduled_job.redis_key_unique_job", err)
c.logger.Error("client.delete_scheduled_job.redis_key_unique_job", errAttr(err))
return err
}
conn := c.pool.Get()
defer conn.Close()

_, err = conn.Do("DEL", uniqueKey)
if err != nil {
logError("worker.delete_unique_job.del", err)
c.logger.Error("worker.delete_unique_job.del", errAttr(err))
return err
}
}
Expand Down Expand Up @@ -534,7 +542,7 @@ func (c *Client) deleteZsetJob(zsetKey string, zscore int64, jobID string) (bool
cnt, err := redis.Int64(values[0], err)
jobBytes, err := redis.Bytes(values[1], err)
if err != nil {
logError("client.delete_zset_job.do", err)
c.logger.Error("client.delete_zset_job.do", errAttr(err))
return false, nil, err
}

Expand All @@ -557,21 +565,21 @@ func (c *Client) getZsetPage(key string, page uint) ([]jobScore, int64, error) {

values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, "-inf", "+inf", "WITHSCORES", "LIMIT", (page-1)*20, 20))
if err != nil {
logError("client.get_zset_page.values", err)
c.logger.Error("client.get_zset_page.values", errAttr(err))
return nil, 0, err
}

var jobsWithScores []jobScore

if err := redis.ScanSlice(values, &jobsWithScores); err != nil {
logError("client.get_zset_page.scan_slice", err)
c.logger.Error("client.get_zset_page.scan_slice", errAttr(err))
return nil, 0, err
}

for i, jws := range jobsWithScores {
job, err := newJob(jws.JobBytes, nil, nil)
if err != nil {
logError("client.get_zset_page.new_job", err)
c.logger.Error("client.get_zset_page.new_job", errAttr(err))
return nil, 0, err
}

Expand All @@ -580,9 +588,19 @@ func (c *Client) getZsetPage(key string, page uint) ([]jobScore, int64, error) {

count, err := redis.Int64(conn.Do("ZCARD", key))
if err != nil {
logError("client.get_zset_page.int64", err)
c.logger.Error("client.get_zset_page.int64", errAttr(err))
return nil, 0, err
}

return jobsWithScores, count, nil
}

// WorkerPoolOption is an optional option for WorkerPool.
type ClientOption func(*Client)

// WithClientLogger registers logger.
func WithClientLogger(l StructuredLogger) ClientOption {
return func(c *Client) {
c.logger = l
}
}
26 changes: 15 additions & 11 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"math/rand"
"strings"
"time"
Expand Down Expand Up @@ -48,7 +49,8 @@ type deadPoolReaper struct {
stopChan chan struct{}
doneStoppingChan chan struct{}

hook ReaperHook
hook ReaperHook
logger StructuredLogger
}

func newDeadPoolReaper(
Expand All @@ -57,6 +59,7 @@ func newDeadPoolReaper(
curJobTypes []string,
reapPeriod time.Duration,
hook ReaperHook,
logger StructuredLogger,
) *deadPoolReaper {
if reapPeriod == 0 {
reapPeriod = defaultReapPeriod
Expand All @@ -71,6 +74,7 @@ func newDeadPoolReaper(
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
hook: hook,
logger: logger,
}
}

Expand All @@ -84,7 +88,7 @@ func (r *deadPoolReaper) stop() {
}

func (r *deadPoolReaper) loop() {
Logger.Printf("Reaper: started with a period of %v", r.reapPeriod)
r.logger.Info("Reaper started", slog.Duration("period", r.reapPeriod))

// Reap immediately after we provide some time for initialization
timer := time.NewTimer(r.deadTime)
Expand All @@ -100,7 +104,8 @@ func (r *deadPoolReaper) loop() {
timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second)

if err := r.reap(); err != nil {
logError("dead_pool_reaper.reap", err)
r.logger.Error("dead_pool_reaper.reap", errAttr(err))
// logError("dead_pool_reaper.reap", err)
}
}
}
Expand All @@ -112,21 +117,20 @@ func (r *deadPoolReaper) reap() (err error) {
return err
}

Logger.Printf("Reaper: trying to acquire lock...")
r.logger.Info("Reaper: trying to acquire lock...")

acquired, err := r.acquireLock(lockValue)
if err != nil {
Logger.Printf("Reaper: acquiring lock: %v", err)
return err
return fmt.Errorf("acquiring lock: %w", err)
}

// Another reaper is already running
if !acquired {
Logger.Printf("Reaper: locked by another process")
r.logger.Info("Reaper: locked by another process")
return nil
}

Logger.Printf("Reaper: lock is acquired")
r.logger.Info("Reaper: lock is acquired")

defer func() {
err = r.releaseLock(lockValue)
Expand All @@ -143,14 +147,14 @@ func (r *deadPoolReaper) reap() (err error) {

deadPools, rErr := r.reapDeadPools()
if jobs := deadPools.getAllJobs(); len(jobs) != 0 {
Logger.Printf("Reaper: dead pools: %v", deadPools)
r.logger.Info("Reaper: dead pools", slog.Any("dead", deadPools))

reapResult.NoPoolHeartBeatJobs = jobs
}

unknownPools, cErr := r.clearUnknownPools()
if jobs := unknownPools.getAllJobs(); len(jobs) != 0 {
Logger.Printf("Reaper: unknown pools: %v", unknownPools)
r.logger.Info("Reaper: unknown pools", slog.Any("unknown", unknownPools))

reapResult.UnknownPoolJobs = jobs
}
Expand All @@ -160,7 +164,7 @@ func (r *deadPoolReaper) reap() (err error) {
// lock_info is 0.
jobs, dErr := r.removeDanglingLocks()
if len(jobs) != 0 {
Logger.Printf("Reaper: dangling locks: %v", jobs)
r.logger.Info("Reaper: dangling locks", slog.Any("dangling", jobs))

reapResult.DanglingLockJobs = jobs
}
Expand Down
Loading