diff --git a/client.go b/client.go index 99781daa..5a881a42 100644 --- a/client.go +++ b/client.go @@ -21,14 +21,22 @@ var ErrNotRetried = fmt.Errorf("nothing retried") type Client struct { namespace string pool Pool + logger StructuredLogger } // NewClient creates a new Client with the specified redis namespace and connection pool. -func NewClient(namespace string, pool Pool) *Client { - return &Client{ +func NewClient(namespace string, pool Pool, opts ...ClientOption) *Client { + c := &Client{ namespace: namespace, pool: pool, + logger: noopLogger, } + + for _, o := range opts { + o(c) + } + + return c } // WorkerPoolHeartbeat represents the heartbeat from a worker pool. WorkerPool's write a heartbeat every 5 seconds so we know they're alive and includes config information. @@ -62,7 +70,7 @@ func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error) { } if err := conn.Flush(); err != nil { - logError("worker_pool_statuses.flush", err) + c.logger.Error("worker_pool_statuses.flush", errAttr(err)) return nil, err } @@ -71,7 +79,7 @@ func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error) { for _, wpid := range workerPoolIDs { vals, err := redis.Strings(conn.Receive()) if err != nil { - logError("worker_pool_statuses.receive", err) + c.logger.Error("worker_pool_statuses.receive", errAttr(err)) return nil, err } @@ -106,7 +114,7 @@ func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error) { sort.Strings(heartbeat.WorkerIDs) } if err != nil { - logError("worker_pool_statuses.parse", err) + c.logger.Error("worker_pool_statuses.parse", errAttr(err)) return nil, err } } @@ -138,7 +146,7 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) { hbs, err := c.WorkerPoolHeartbeats() if err != nil { - logError("worker_observations.worker_pool_heartbeats", err) + c.logger.Error("worker_observations.worker_pool_heartbeats", errAttr(err)) return nil, err } @@ -153,7 +161,7 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) { } if err := conn.Flush(); err != nil { - logError("worker_observations.flush", err) + c.logger.Error("worker_observations.flush", errAttr(err)) return nil, err } @@ -162,7 +170,7 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) { for _, wid := range workerIDs { vals, err := redis.Strings(conn.Receive()) if err != nil { - logError("worker_observations.receive", err) + c.logger.Error("worker_observations.receive", errAttr(err)) return nil, err } @@ -191,7 +199,7 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) { ob.CheckinAt, err = strconv.ParseInt(value, 10, 64) } if err != nil { - logError("worker_observations.parse", err) + c.logger.Error("worker_observations.parse", errAttr(err)) return nil, err } } @@ -226,7 +234,7 @@ func (c *Client) Queues() ([]*Queue, error) { } if err := conn.Flush(); err != nil { - logError("client.queues.flush", err) + c.logger.Error("client.queues.flush", errAttr(err)) return nil, err } @@ -235,7 +243,7 @@ func (c *Client) Queues() ([]*Queue, error) { for _, jobName := range jobNames { count, err := redis.Int64(conn.Receive()) if err != nil { - logError("client.queues.receive", err) + c.logger.Error("client.queues.receive", errAttr(err)) return nil, err } @@ -254,7 +262,7 @@ func (c *Client) Queues() ([]*Queue, error) { } if err := conn.Flush(); err != nil { - logError("client.queues.flush2", err) + c.logger.Error("client.queues.flush2", errAttr(err)) return nil, err } @@ -264,13 +272,13 @@ func (c *Client) Queues() ([]*Queue, error) { if s.Count > 0 { b, err := redis.Bytes(conn.Receive()) if err != nil { - logError("client.queues.receive2", err) + c.logger.Error("client.queues.receive2", errAttr(err)) return nil, err } job, err := newJob(b, nil, nil) if err != nil { - logError("client.queues.new_job", err) + c.logger.Error("client.queues.new_job", errAttr(err)) } s.Latency = now - job.EnqueuedAt } @@ -302,7 +310,7 @@ func (c *Client) ScheduledJobs(page uint) ([]*ScheduledJob, int64, error) { key := redisKeyScheduled(c.namespace) jobsWithScores, count, err := c.getZsetPage(key, page) if err != nil { - logError("client.scheduled_jobs.get_zset_page", err) + c.logger.Error("client.scheduled_jobs.get_zset_page", errAttr(err)) return nil, 0, err } @@ -320,7 +328,7 @@ func (c *Client) RetryJobs(page uint) ([]*RetryJob, int64, error) { key := redisKeyRetry(c.namespace) jobsWithScores, count, err := c.getZsetPage(key, page) if err != nil { - logError("client.retry_jobs.get_zset_page", err) + c.logger.Error("client.retry_jobs.get_zset_page", errAttr(err)) return nil, 0, err } @@ -338,7 +346,7 @@ func (c *Client) DeadJobs(page uint) ([]*DeadJob, int64, error) { key := redisKeyDead(c.namespace) jobsWithScores, count, err := c.getZsetPage(key, page) if err != nil { - logError("client.dead_jobs.get_zset_page", err) + c.logger.Error("client.dead_jobs.get_zset_page", errAttr(err)) return nil, 0, err } @@ -368,7 +376,7 @@ func (c *Client) RetryDeadJob(diedAt int64, jobID string) error { // Get queues for job names queues, err := c.Queues() if err != nil { - logError("client.retry_all_dead_jobs.queues", err) + c.logger.Error("client.retry_all_dead_jobs.queues", errAttr(err)) return err } @@ -395,7 +403,7 @@ func (c *Client) RetryDeadJob(diedAt int64, jobID string) error { cnt, err := redis.Int64(script.Do(conn, args...)) if err != nil { - logError("client.retry_dead_job.do", err) + c.logger.Error("client.retry_dead_job.do", errAttr(err)) return err } @@ -411,7 +419,7 @@ func (c *Client) RetryAllDeadJobs() error { // Get queues for job names queues, err := c.Queues() if err != nil { - logError("client.retry_all_dead_jobs.queues", err) + c.logger.Error("client.retry_all_dead_jobs.queues", errAttr(err)) return err } @@ -440,7 +448,7 @@ func (c *Client) RetryAllDeadJobs() error { for i := 0; i < 1000; i++ { res, err := redis.Int64(script.Do(conn, args...)) if err != nil { - logError("client.retry_all_dead_jobs.do", err) + c.logger.Error("client.retry_all_dead_jobs.do", errAttr(err)) return err } @@ -458,7 +466,7 @@ func (c *Client) DeleteAllDeadJobs() error { defer conn.Close() _, err := conn.Do("DEL", redisKeyDead(c.namespace)) if err != nil { - logError("client.delete_all_dead_jobs", err) + c.logger.Error("client.delete_all_dead_jobs", errAttr(err)) return err } @@ -476,14 +484,14 @@ func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error { if len(jobBytes) > 0 { job, err := newJob(jobBytes, nil, nil) if err != nil { - logError("client.delete_scheduled_job.new_job", err) + c.logger.Error("client.delete_scheduled_job.new_job", errAttr(err)) return err } if job.Unique { uniqueKey, err := redisKeyUniqueJob(c.namespace, job.Name, job.Args) if err != nil { - logError("client.delete_scheduled_job.redis_key_unique_job", err) + c.logger.Error("client.delete_scheduled_job.redis_key_unique_job", errAttr(err)) return err } conn := c.pool.Get() @@ -491,7 +499,7 @@ func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error { _, err = conn.Do("DEL", uniqueKey) if err != nil { - logError("worker.delete_unique_job.del", err) + c.logger.Error("worker.delete_unique_job.del", errAttr(err)) return err } } @@ -534,7 +542,7 @@ func (c *Client) deleteZsetJob(zsetKey string, zscore int64, jobID string) (bool cnt, err := redis.Int64(values[0], err) jobBytes, err := redis.Bytes(values[1], err) if err != nil { - logError("client.delete_zset_job.do", err) + c.logger.Error("client.delete_zset_job.do", errAttr(err)) return false, nil, err } @@ -557,21 +565,21 @@ func (c *Client) getZsetPage(key string, page uint) ([]jobScore, int64, error) { values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, "-inf", "+inf", "WITHSCORES", "LIMIT", (page-1)*20, 20)) if err != nil { - logError("client.get_zset_page.values", err) + c.logger.Error("client.get_zset_page.values", errAttr(err)) return nil, 0, err } var jobsWithScores []jobScore if err := redis.ScanSlice(values, &jobsWithScores); err != nil { - logError("client.get_zset_page.scan_slice", err) + c.logger.Error("client.get_zset_page.scan_slice", errAttr(err)) return nil, 0, err } for i, jws := range jobsWithScores { job, err := newJob(jws.JobBytes, nil, nil) if err != nil { - logError("client.get_zset_page.new_job", err) + c.logger.Error("client.get_zset_page.new_job", errAttr(err)) return nil, 0, err } @@ -580,9 +588,19 @@ func (c *Client) getZsetPage(key string, page uint) ([]jobScore, int64, error) { count, err := redis.Int64(conn.Do("ZCARD", key)) if err != nil { - logError("client.get_zset_page.int64", err) + c.logger.Error("client.get_zset_page.int64", errAttr(err)) return nil, 0, err } return jobsWithScores, count, nil } + +// WorkerPoolOption is an optional option for WorkerPool. +type ClientOption func(*Client) + +// WithClientLogger registers logger. +func WithClientLogger(l StructuredLogger) ClientOption { + return func(c *Client) { + c.logger = l + } +} diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index 95bc275b..60a0ab61 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "log/slog" "math/rand" "strings" "time" @@ -48,7 +49,8 @@ type deadPoolReaper struct { stopChan chan struct{} doneStoppingChan chan struct{} - hook ReaperHook + hook ReaperHook + logger StructuredLogger } func newDeadPoolReaper( @@ -57,6 +59,7 @@ func newDeadPoolReaper( curJobTypes []string, reapPeriod time.Duration, hook ReaperHook, + logger StructuredLogger, ) *deadPoolReaper { if reapPeriod == 0 { reapPeriod = defaultReapPeriod @@ -71,6 +74,7 @@ func newDeadPoolReaper( stopChan: make(chan struct{}), doneStoppingChan: make(chan struct{}), hook: hook, + logger: logger, } } @@ -84,7 +88,7 @@ func (r *deadPoolReaper) stop() { } func (r *deadPoolReaper) loop() { - Logger.Printf("Reaper: started with a period of %v", r.reapPeriod) + r.logger.Info("Reaper started", slog.Duration("period", r.reapPeriod)) // Reap immediately after we provide some time for initialization timer := time.NewTimer(r.deadTime) @@ -100,7 +104,8 @@ func (r *deadPoolReaper) loop() { timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second) if err := r.reap(); err != nil { - logError("dead_pool_reaper.reap", err) + r.logger.Error("dead_pool_reaper.reap", errAttr(err)) + // logError("dead_pool_reaper.reap", err) } } } @@ -112,21 +117,20 @@ func (r *deadPoolReaper) reap() (err error) { return err } - Logger.Printf("Reaper: trying to acquire lock...") + r.logger.Info("Reaper: trying to acquire lock...") acquired, err := r.acquireLock(lockValue) if err != nil { - Logger.Printf("Reaper: acquiring lock: %v", err) - return err + return fmt.Errorf("acquiring lock: %w", err) } // Another reaper is already running if !acquired { - Logger.Printf("Reaper: locked by another process") + r.logger.Info("Reaper: locked by another process") return nil } - Logger.Printf("Reaper: lock is acquired") + r.logger.Info("Reaper: lock is acquired") defer func() { err = r.releaseLock(lockValue) @@ -143,14 +147,14 @@ func (r *deadPoolReaper) reap() (err error) { deadPools, rErr := r.reapDeadPools() if jobs := deadPools.getAllJobs(); len(jobs) != 0 { - Logger.Printf("Reaper: dead pools: %v", deadPools) + r.logger.Info("Reaper: dead pools", slog.Any("dead", deadPools)) reapResult.NoPoolHeartBeatJobs = jobs } unknownPools, cErr := r.clearUnknownPools() if jobs := unknownPools.getAllJobs(); len(jobs) != 0 { - Logger.Printf("Reaper: unknown pools: %v", unknownPools) + r.logger.Info("Reaper: unknown pools", slog.Any("unknown", unknownPools)) reapResult.UnknownPoolJobs = jobs } @@ -160,7 +164,7 @@ func (r *deadPoolReaper) reap() (err error) { // lock_info is 0. jobs, dErr := r.removeDanglingLocks() if len(jobs) != 0 { - Logger.Printf("Reaper: dangling locks: %v", jobs) + r.logger.Info("Reaper: dangling locks", slog.Any("dangling", jobs)) reapResult.DanglingLockJobs = jobs } diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index 86a7736b..73ab9c71 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -49,7 +49,7 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil, noopLogger) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, poolsJobs{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools) @@ -128,7 +128,7 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { assert.EqualValues(t, 3, numPools) // Test getting dead pool ids - reaper := newDeadPoolReaper(ns, pool, []string{"type1"}, 0, nil) + reaper := newDeadPoolReaper(ns, pool, []string{"type1"}, 0, nil, noopLogger) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, poolsJobs{"1": nil, "2": nil, "3": nil}, deadPools) @@ -211,7 +211,7 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil, noopLogger) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, poolsJobs{"2": {"type1", "type2"}}, deadPools) @@ -272,7 +272,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { _, err = conn.Do("LPUSH", redisKeyJobsInProgress(ns, stalePoolID, job1), `{"sleep": 10}`) assert.NoError(t, err) jobTypes := map[string]*jobType{"job1": nil} - staleHeart := newWorkerPoolHeartbeater(ns, pool, stalePoolID, jobTypes, 1, []string{"id1"}) + staleHeart := newWorkerPoolHeartbeater(ns, pool, stalePoolID, jobTypes, 1, []string{"id1"}, noopLogger) staleHeart.start() // heartbeat dispatched immediately but reaper waits for deadTime before first run @@ -284,7 +284,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { // setup a worker pool and start the reaper, which should restart the stale job above wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1}) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}, 0, nil) + wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}, 0, nil, noopLogger) wp.deadPoolReaper.deadTime = expectedDeadTime wp.deadPoolReaper.start() @@ -328,7 +328,7 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { err = conn.Flush() assert.NoError(t, err) - reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil, noopLogger) // clean lock info for workerPoolID1 err = reaper.cleanStaleLockInfo(workerPoolID1, jobNames) assert.NoError(t, err) @@ -387,7 +387,7 @@ func TestDeadPoolReaperTakeDeadPools(t *testing.T) { assert.NoError(t, err) // Test getting dead pools - reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil, noopLogger) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, poolsJobs{"2": {"type1", "type2"}, "3": nil}, deadPools) @@ -398,7 +398,7 @@ func TestReaperLock(t *testing.T) { ns := "work" cleanKeyspace(ns, pool) - reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil, noopLogger) value, err := genValue() assert.NoError(t, err) @@ -475,7 +475,7 @@ func TestDeadPoolReaperGetUnknownPools(t *testing.T) { assert.NoError(t, conn.Flush()) // Run test - reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil, noopLogger) unknownPools, err := reaper.getUnknownPools() assert.NoError(t, err) assert.Equal(t, poolsJobs{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, unknownPools) @@ -534,7 +534,7 @@ func TestDeadPoolReaperClearUnknownPool(t *testing.T) { assert.NoError(t, conn.Flush()) // Run test - reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil, noopLogger) _, err = reaper.clearUnknownPools() assert.NoError(t, err) @@ -596,7 +596,7 @@ func TestDeadPoolReaperRemoveDanglingLocks(t *testing.T) { assert.NoError(t, conn.Flush()) - reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil, noopLogger) _, err = reaper.removeDanglingLocks() assert.NoError(t, err) @@ -663,6 +663,6 @@ func TestDeadPoolReaperHook(t *testing.T) { assert.Equal(t, unknownPoolJobs, rr.UnknownPoolJobs) assert.Equal(t, danglingLockJobs, rr.DanglingLockJobs) } - }) + }, noopLogger) require.NoError(t, reaper.reap()) } diff --git a/heartbeater.go b/heartbeater.go index 0307e95e..4d9d7efe 100644 --- a/heartbeater.go +++ b/heartbeater.go @@ -25,9 +25,19 @@ type workerPoolHeartbeater struct { stopChan chan struct{} doneStoppingChan chan struct{} + + logger StructuredLogger } -func newWorkerPoolHeartbeater(namespace string, pool Pool, workerPoolID string, jobTypes map[string]*jobType, concurrency uint, workerIDs []string) *workerPoolHeartbeater { +func newWorkerPoolHeartbeater( + namespace string, + pool Pool, + workerPoolID string, + jobTypes map[string]*jobType, + concurrency uint, + workerIDs []string, + logger StructuredLogger, +) *workerPoolHeartbeater { h := &workerPoolHeartbeater{ workerPoolID: workerPoolID, namespace: namespace, @@ -36,6 +46,7 @@ func newWorkerPoolHeartbeater(namespace string, pool Pool, workerPoolID string, concurrency: concurrency, stopChan: make(chan struct{}), doneStoppingChan: make(chan struct{}), + logger: logger, } jobNames := make([]string, 0, len(jobTypes)) @@ -51,7 +62,7 @@ func newWorkerPoolHeartbeater(namespace string, pool Pool, workerPoolID string, h.pid = os.Getpid() host, err := os.Hostname() if err != nil { - logError("heartbeat.hostname", err) + h.logger.Error("heartbeat.hostname", errAttr(err)) host = "hostname_errored" } h.hostname = host @@ -103,7 +114,7 @@ func (h *workerPoolHeartbeater) heartbeat() { ) if err := conn.Flush(); err != nil { - logError("heartbeat", err) + h.logger.Error("heartbeat", errAttr(err)) } } @@ -118,6 +129,6 @@ func (h *workerPoolHeartbeater) removeHeartbeat() { conn.Send("DEL", heartbeatKey) if err := conn.Flush(); err != nil { - logError("remove_heartbeat", err) + h.logger.Error("remove_heartbeat", errAttr(err)) } } diff --git a/heartbeater_test.go b/heartbeater_test.go index 2af2d2a9..f83e14f8 100644 --- a/heartbeater_test.go +++ b/heartbeater_test.go @@ -21,7 +21,7 @@ func TestHeartbeater(t *testing.T) { "bar": nil, } - heart := newWorkerPoolHeartbeater(ns, pool, "abcd", jobTypes, 10, []string{"ccc", "bbb"}) + heart := newWorkerPoolHeartbeater(ns, pool, "abcd", jobTypes, 10, []string{"ccc", "bbb"}, noopLogger) heart.start() time.Sleep(20 * time.Millisecond) diff --git a/log.go b/log.go index c6342b96..062128d9 100644 --- a/log.go +++ b/log.go @@ -1,10 +1,13 @@ package work import ( + "context" "io" "log" + "log/slog" ) +// Deprecated: use StructuredLogger. // StdLogger is used to log error messages. type StdLogger interface { Print(v ...interface{}) @@ -12,11 +15,25 @@ type StdLogger interface { Println(v ...interface{}) } +// Deprecated: provide a logger using the WithLogger option. // Logger is the instance of a StdLogger interface that Worker writes connection // management events to. By default it is set to discard all log messages via // io.Discard, but you can set it to redirect wherever you want. var Logger StdLogger = log.New(io.Discard, "[Work] ", log.LstdFlags) -func logError(key string, err error) { - Logger.Printf("key: %s, err: %s", key, err.Error()) +type StructuredLogger interface { + Error(msg string, args ...any) + ErrorContext(ctx context.Context, msg string, args ...any) + Warn(msg string, args ...any) + WarnContext(ctx context.Context, msg string, args ...any) + Info(msg string, args ...any) + InfoContext(ctx context.Context, msg string, args ...any) + Debug(msg string, args ...any) + DebugContext(ctx context.Context, msg string, args ...any) +} + +var noopLogger = slog.New(slog.NewTextHandler(io.Discard, nil)) + +func errAttr(e error) slog.Attr { + return slog.Any("error", e) } diff --git a/observer.go b/observer.go index cff1fc35..7b7dde19 100644 --- a/observer.go +++ b/observer.go @@ -2,7 +2,7 @@ package work import ( "encoding/json" - "fmt" + "log/slog" "time" ) @@ -29,6 +29,8 @@ type observer struct { drainChan chan struct{} doneDrainingChan chan struct{} + + logger StructuredLogger } type observationKind int @@ -60,7 +62,7 @@ type observation struct { const observerBufferSize = 1024 -func newObserver(namespace string, pool Pool, workerID string) *observer { +func newObserver(namespace string, pool Pool, workerID string, logger StructuredLogger) *observer { return &observer{ namespace: namespace, workerID: workerID, @@ -72,6 +74,8 @@ func newObserver(namespace string, pool Pool, workerID string) *observer { drainChan: make(chan struct{}), doneDrainingChan: make(chan struct{}), + + logger: logger, } } @@ -137,7 +141,7 @@ func (o *observer) loop() { o.process(obv) default: if err := o.writeStatus(o.currentStartedObservation); err != nil { - logError("observer.write", err) + o.logger.Error("observer.write", errAttr(err)) } o.doneDrainingChan <- struct{}{} break DRAIN_LOOP @@ -146,7 +150,7 @@ func (o *observer) loop() { case <-ticker: if o.lastWrittenVersion != o.version { if err := o.writeStatus(o.currentStartedObservation); err != nil { - logError("observer.write", err) + o.logger.Error("observer.write", errAttr(err)) } o.lastWrittenVersion = o.version } @@ -166,7 +170,10 @@ func (o *observer) process(obv *observation) { o.currentStartedObservation.checkin = obv.checkin o.currentStartedObservation.checkinAt = obv.checkinAt } else { - logError("observer.checkin_mismatch", fmt.Errorf("got checkin but mismatch on job ID or no job")) + o.logger.Error("observer.checkin_mismatch", slog.String( + "error", + "got checkin but mismatch on job ID or no job", + )) } } o.version++ @@ -174,7 +181,7 @@ func (o *observer) process(obv *observation) { // If this is the version observation we got, just go ahead and write it. if o.version == 1 { if err := o.writeStatus(o.currentStartedObservation); err != nil { - logError("observer.first_write", err) + o.logger.Error("observer.first_write", errAttr(err)) } o.lastWrittenVersion = o.version } diff --git a/observer_test.go b/observer_test.go index d95e729b..75225906 100644 --- a/observer_test.go +++ b/observer_test.go @@ -16,7 +16,7 @@ func TestObserverStarted(t *testing.T) { setNowEpochSecondsMock(tMock) defer resetNowEpochSecondsMock() - observer := newObserver(ns, pool, "abcd") + observer := newObserver(ns, pool, "abcd", noopLogger) observer.start() observer.observeStarted("foo", "bar", Q{"a": 1, "b": "wat"}) //observer.observeDone("foo", "bar", nil) @@ -38,7 +38,7 @@ func TestObserverStartedDone(t *testing.T) { setNowEpochSecondsMock(tMock) defer resetNowEpochSecondsMock() - observer := newObserver(ns, pool, "abcd") + observer := newObserver(ns, pool, "abcd", noopLogger) observer.start() observer.observeStarted("foo", "bar", Q{"a": 1, "b": "wat"}) observer.observeDone("foo", "bar", nil) @@ -53,7 +53,7 @@ func TestObserverCheckin(t *testing.T) { pool := newTestPool(":6379") ns := "work" - observer := newObserver(ns, pool, "abcd") + observer := newObserver(ns, pool, "abcd", noopLogger) observer.start() tMock := int64(1425263401) @@ -80,7 +80,7 @@ func TestObserverCheckinFromJob(t *testing.T) { pool := newTestPool(":6379") ns := "work" - observer := newObserver(ns, pool, "abcd") + observer := newObserver(ns, pool, "abcd", noopLogger) observer.start() tMock := int64(1425263401) diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index 74c8b9d6..a9ef4656 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -21,6 +21,7 @@ type periodicEnqueuer struct { scheduledPeriodicJobs []*scheduledPeriodicJob stopChan chan struct{} doneStoppingChan chan struct{} + logger StructuredLogger } type periodicJob struct { @@ -35,13 +36,19 @@ type scheduledPeriodicJob struct { *periodicJob } -func newPeriodicEnqueuer(namespace string, pool Pool, periodicJobs []*periodicJob) *periodicEnqueuer { +func newPeriodicEnqueuer( + namespace string, + pool Pool, + periodicJobs []*periodicJob, + logger StructuredLogger, +) *periodicEnqueuer { return &periodicEnqueuer{ namespace: namespace, pool: pool, periodicJobs: periodicJobs, stopChan: make(chan struct{}), doneStoppingChan: make(chan struct{}), + logger: logger, } } @@ -62,7 +69,7 @@ func (pe *periodicEnqueuer) loop() { if pe.shouldEnqueue() { err := pe.enqueue() if err != nil { - logError("periodic_enqueuer.loop.enqueue", err) + pe.logger.Error("periodic_enqueuer.loop.enqueue", errAttr(err)) } } @@ -76,7 +83,7 @@ func (pe *periodicEnqueuer) loop() { if pe.shouldEnqueue() { err := pe.enqueue() if err != nil { - logError("periodic_enqueuer.loop.enqueue", err) + pe.logger.Error("periodic_enqueuer.loop.enqueue", errAttr(err)) } } } @@ -138,7 +145,7 @@ func (pe *periodicEnqueuer) shouldEnqueue() bool { if err == redis.ErrNil { return true } else if err != nil { - logError("periodic_enqueuer.should_enqueue", err) + pe.logger.Error("periodic_enqueuer.should_enqueue", errAttr(err)) return true } diff --git a/periodic_enqueuer_test.go b/periodic_enqueuer_test.go index ff80fe23..b2916e43 100644 --- a/periodic_enqueuer_test.go +++ b/periodic_enqueuer_test.go @@ -22,7 +22,7 @@ func TestPeriodicEnqueuer(t *testing.T) { setNowEpochSecondsMock(1468359453) defer resetNowEpochSecondsMock() - pe := newPeriodicEnqueuer(ns, pool, pjs) + pe := newPeriodicEnqueuer(ns, pool, pjs, noopLogger) err := pe.enqueue() assert.NoError(t, err) @@ -104,7 +104,7 @@ func TestPeriodicEnqueuerSpawn(t *testing.T) { ns := "work" cleanKeyspace(ns, pool) - pe := newPeriodicEnqueuer(ns, pool, nil) + pe := newPeriodicEnqueuer(ns, pool, nil, noopLogger) pe.start() pe.stop() } diff --git a/requeuer.go b/requeuer.go index 3da30d35..65580658 100644 --- a/requeuer.go +++ b/requeuer.go @@ -1,7 +1,7 @@ package work import ( - "fmt" + "log/slog" "time" "github.com/gomodule/redigo/redis" @@ -19,9 +19,17 @@ type requeuer struct { drainChan chan struct{} doneDrainingChan chan struct{} + + logger StructuredLogger } -func newRequeuer(namespace string, pool Pool, requeueKey string, jobNames []string) *requeuer { +func newRequeuer( + namespace string, + pool Pool, + requeueKey string, + jobNames []string, + logger StructuredLogger, +) *requeuer { args := make([]interface{}, 0, len(jobNames)+2+2) args = append(args, requeueKey) // KEY[1] args = append(args, redisKeyDead(namespace)) // KEY[2] @@ -43,6 +51,8 @@ func newRequeuer(namespace string, pool Pool, requeueKey string, jobNames []stri drainChan: make(chan struct{}), doneDrainingChan: make(chan struct{}), + + logger: logger, } } @@ -94,14 +104,14 @@ func (r *requeuer) process() bool { if err == redis.ErrNil { return false } else if err != nil { - logError("requeuer.process", err) + r.logger.Error("requeuer.process", errAttr(err)) return false } if res == "" { return false } else if res == "dead" { - logError("requeuer.process.dead", fmt.Errorf("no job name")) + r.logger.Error("requeuer.process.dead", slog.String("error", "no job name")) return true } else if res == "ok" { return true diff --git a/requeuer_test.go b/requeuer_test.go index e01008ad..8a1efc76 100644 --- a/requeuer_test.go +++ b/requeuer_test.go @@ -32,7 +32,7 @@ func TestRequeue(t *testing.T) { resetNowEpochSecondsMock() - re := newRequeuer(ns, pool, redisKeyScheduled(ns), []string{"wat", "foo", "bar"}) + re := newRequeuer(ns, pool, redisKeyScheduled(ns), []string{"wat", "foo", "bar"}, noopLogger) re.start() re.drain() re.stop() @@ -68,7 +68,7 @@ func TestRequeueUnknown(t *testing.T) { nowish := nowEpochSeconds() setNowEpochSecondsMock(nowish) - re := newRequeuer(ns, pool, redisKeyScheduled(ns), []string{"bar"}) + re := newRequeuer(ns, pool, redisKeyScheduled(ns), []string{"bar"}, noopLogger) re.start() re.drain() re.stop() @@ -97,7 +97,7 @@ func TestRequeuePeriodic(t *testing.T) { {jobName: jobName, spec: jobSpec, schedule: shedule}, } - enq := newPeriodicEnqueuer(ns, pool, jobs) + enq := newPeriodicEnqueuer(ns, pool, jobs, noopLogger) enq.start() enq.stop() @@ -106,7 +106,7 @@ func TestRequeuePeriodic(t *testing.T) { setNowEpochSecondsMock(tMock) defer resetNowEpochSecondsMock() - re := newRequeuer(ns, pool, redisKeyScheduled(ns), []string{jobName}) + re := newRequeuer(ns, pool, redisKeyScheduled(ns), []string{jobName}, noopLogger) re.start() re.drain() re.stop() diff --git a/run.go b/run.go index fefee7f6..0a8043b4 100644 --- a/run.go +++ b/run.go @@ -8,7 +8,13 @@ import ( // runJob returns an error if the job fails, or there's a panic, or we couldn't // reflect correctly. if we return an error, it signals we want the job to be retried. -func runJob(job *Job, ctxType reflect.Type, middlewares []*middlewareHandler, jt *jobType) (returnCtx reflect.Value, returnError error) { +func runJob( + job *Job, + ctxType reflect.Type, + middlewares []*middlewareHandler, + jt *jobType, + logger StructuredLogger, +) (returnCtx reflect.Value, returnError error) { returnCtx = reflect.New(ctxType) ctx := job.extractTraceContext(context.Background()) @@ -41,7 +47,7 @@ func runJob(job *Job, ctxType reflect.Type, middlewares []*middlewareHandler, jt // err turns out to be interface{}, of actual type "runtime.errorCString" // Luckily, the err sprints nicely via fmt. errorishError := fmt.Errorf("%v", panicErr) - logError("runJob.panic", errorishError) + logger.Error("runJob.panic", errAttr(errorishError)) returnError = errorishError } }() diff --git a/run_test.go b/run_test.go index e3ccc167..bb0ba65a 100644 --- a/run_test.go +++ b/run_test.go @@ -55,7 +55,7 @@ func TestRunBasicMiddleware(t *testing.T) { Args: map[string]interface{}{"a": "foo"}, } - v, err := runJob(job, tstCtxType, middleware, jt) + v, err := runJob(job, tstCtxType, middleware, jt, noopLogger) assert.NoError(t, err) c := v.Interface().(*tstCtx) assert.Equal(t, "mw1mw2mw3h1foo", c.String()) @@ -84,7 +84,7 @@ func TestRunHandlerError(t *testing.T) { Name: "foo", } - v, err := runJob(job, tstCtxType, middleware, jt) + v, err := runJob(job, tstCtxType, middleware, jt, noopLogger) assert.Error(t, err) assert.Equal(t, "h1_err", err.Error()) @@ -115,7 +115,7 @@ func TestRunMwError(t *testing.T) { Name: "foo", } - _, err := runJob(job, tstCtxType, middleware, jt) + _, err := runJob(job, tstCtxType, middleware, jt, noopLogger) assert.Error(t, err) assert.Equal(t, "mw1_err", err.Error()) } @@ -144,7 +144,7 @@ func TestRunHandlerPanic(t *testing.T) { Name: "foo", } - _, err := runJob(job, tstCtxType, middleware, jt) + _, err := runJob(job, tstCtxType, middleware, jt, noopLogger) assert.Error(t, err) assert.Equal(t, "dayam", err.Error()) } @@ -172,7 +172,7 @@ func TestRunMiddlewarePanic(t *testing.T) { Name: "foo", } - _, err := runJob(job, tstCtxType, middleware, jt) + _, err := runJob(job, tstCtxType, middleware, jt, noopLogger) assert.Error(t, err) assert.Equal(t, "dayam", err.Error()) } @@ -202,7 +202,7 @@ func TestRunGenericHandler(t *testing.T) { genericHandler: h.handler, } - _, err := runJob(job, tstCtxType, middleware, jt) + _, err := runJob(job, tstCtxType, middleware, jt, noopLogger) if !h.mustFail { assert.NoErrorf(t, err, "case: %d", i) } else { diff --git a/worker.go b/worker.go index 0f6c762f..7382332b 100644 --- a/worker.go +++ b/worker.go @@ -37,6 +37,8 @@ type worker struct { drainChan chan struct{} doneDrainingChan chan struct{} + + logger StructuredLogger } // Pool represents a pool of connections to a Redis server. @@ -44,9 +46,17 @@ type Pool interface { Get() redis.Conn } -func newWorker(namespace string, poolID string, pool Pool, contextType reflect.Type, middleware []*middlewareHandler, jobTypes map[string]*jobType) *worker { +func newWorker( + namespace string, + poolID string, + pool Pool, + contextType reflect.Type, + middleware []*middlewareHandler, + jobTypes map[string]*jobType, + logger StructuredLogger, +) *worker { workerID := makeIdentifier() - ob := newObserver(namespace, pool, workerID) + ob := newObserver(namespace, pool, workerID, logger) w := &worker{ workerID: workerID, @@ -62,6 +72,8 @@ func newWorker(namespace string, poolID string, pool Pool, contextType reflect.T drainChan: make(chan struct{}), doneDrainingChan: make(chan struct{}), + + logger: logger, } w.updateMiddlewareAndJobTypes(middleware, jobTypes) @@ -124,7 +136,7 @@ func (w *worker) loop() { case <-timer.C: job, err := w.fetchJob() if err != nil { - logError("worker.fetch", err) + w.logger.Error("worker.fetch", errAttr(err)) timer.Reset(10 * time.Millisecond) } else if job != nil { w.processJob(job) @@ -203,11 +215,11 @@ func (w *worker) processJob(job *Job) { jt := w.jobTypes[job.Name] if jt == nil { runErr = fmt.Errorf("stray job: no handler") - logError("process_job.stray", runErr) + w.logger.Error("process_job.stray", errAttr(runErr)) } else { w.observeStarted(job.Name, job.ID, job.Args) job.observer = w.observer // for Checkin - _, runErr = runJob(job, w.contextType, w.middleware, jt) + _, runErr = runJob(job, w.contextType, w.middleware, jt, w.logger) w.observeDone(job.Name, job.ID, runErr) } @@ -220,7 +232,7 @@ func (w *worker) processJob(job *Job) { retryErr(sleepBackoffs, func() error { err := w.removeJobFromInProgress(job, jt, runErr) if err != nil { - logError("worker.remove_job_from_in_progress.lrem", err) + w.logger.Warn("worker.remove_job_from_in_progress.lrem", errAttr(err)) } return err @@ -230,7 +242,7 @@ func (w *worker) processJob(job *Job) { func (w *worker) deleteUniqueJob(job *Job) { uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Args) if err != nil { - logError("worker.delete_unique_job.key", err) + w.logger.Error("worker.delete_unique_job.key", errAttr(err)) return } @@ -239,7 +251,7 @@ func (w *worker) deleteUniqueJob(job *Job) { _, err = conn.Do("DEL", uniqueKey) if err != nil { - logError("worker.delete_unique_job.del", err) + w.logger.Error("worker.delete_unique_job.del", errAttr(err)) } } @@ -273,7 +285,7 @@ func (w *worker) removeJobFromInProgress(job *Job, jt *jobType, runErr error) er var err error failedJobRawJSON, err = job.serialize() if err != nil { - logError("worker.removeJobFromInProgress.serialize", err) + w.logger.Error("worker.removeJobFromInProgress.serialize", errAttr(err)) forward = false } } diff --git a/worker_pool.go b/worker_pool.go index 88875e23..23823522 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -35,6 +35,7 @@ type WorkerPool struct { periodicEnqueuer *periodicEnqueuer reaperHook ReaperHook + logger StructuredLogger } type jobType struct { @@ -113,6 +114,7 @@ func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool Poo pool: pool, contextType: ctxType, jobTypes: make(map[string]*jobType), + logger: noopLogger, } for _, opt := range opts { @@ -120,7 +122,15 @@ func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool Poo } for i := uint(0); i < wp.concurrency; i++ { - w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, nil, wp.jobTypes) + w := newWorker( + wp.namespace, + wp.workerPoolID, + wp.pool, + wp.contextType, + nil, + wp.jobTypes, + wp.logger, + ) wp.workers = append(wp.workers, w) } @@ -231,10 +241,23 @@ func (wp *WorkerPool) Start() { go w.start() } - wp.heartbeater = newWorkerPoolHeartbeater(wp.namespace, wp.pool, wp.workerPoolID, wp.jobTypes, wp.concurrency, wp.workerIDs()) + wp.heartbeater = newWorkerPoolHeartbeater( + wp.namespace, + wp.pool, + wp.workerPoolID, + wp.jobTypes, + wp.concurrency, + wp.workerIDs(), + wp.logger, + ) wp.heartbeater.start() wp.startRequeuers() - wp.periodicEnqueuer = newPeriodicEnqueuer(wp.namespace, wp.pool, wp.periodicJobs) + wp.periodicEnqueuer = newPeriodicEnqueuer( + wp.namespace, + wp.pool, + wp.periodicJobs, + wp.logger, + ) wp.periodicEnqueuer.start() } @@ -280,14 +303,15 @@ func (wp *WorkerPool) startRequeuers() { jobNames = append(jobNames, name) } - wp.retrier = newRequeuer(wp.namespace, wp.pool, redisKeyRetry(wp.namespace), jobNames) - wp.scheduler = newRequeuer(wp.namespace, wp.pool, redisKeyScheduled(wp.namespace), jobNames) + wp.retrier = newRequeuer(wp.namespace, wp.pool, redisKeyRetry(wp.namespace), jobNames, wp.logger) + wp.scheduler = newRequeuer(wp.namespace, wp.pool, redisKeyScheduled(wp.namespace), jobNames, wp.logger) wp.deadPoolReaper = newDeadPoolReaper( wp.namespace, wp.pool, jobNames, wp.reapPeriod, wp.reaperHook, + wp.logger, ) wp.retrier.start() wp.scheduler.start() @@ -318,7 +342,7 @@ func (wp *WorkerPool) writeKnownJobsToRedis() { } if _, err := conn.Do("SADD", jobNames...); err != nil { - logError("write_known_jobs", err) + wp.logger.Error("write_known_jobs", errAttr(err)) } } @@ -331,7 +355,7 @@ func (wp *WorkerPool) writeConcurrencyControlsToRedis() { defer conn.Close() for jobName, jobType := range wp.jobTypes { if _, err := conn.Do("SET", redisKeyJobsConcurrency(wp.namespace, jobName), jobType.MaxConcurrency); err != nil { - logError("write_concurrency_controls_max_concurrency", err) + wp.logger.Error("write_concurrency_controls_max_concurrency", errAttr(err)) } } } @@ -565,3 +589,10 @@ func WithReaperHook(h ReaperHook) WorkerPoolOption { wp.reaperHook = h } } + +// WithLogger registers logger. +func WithLogger(l StructuredLogger) WorkerPoolOption { + return func(wp *WorkerPool) { + wp.logger = l + } +} diff --git a/worker_test.go b/worker_test.go index 7b1affa0..edd52310 100644 --- a/worker_test.go +++ b/worker_test.go @@ -64,7 +64,7 @@ func TestWorkerBasics(t *testing.T) { _, err = enqueuer.Enqueue(job3, Q{"a": 3}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) w.start() w.drain() w.stop() @@ -114,7 +114,7 @@ func TestWorkerInProgress(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) w.start() // instead of w.forceIter(), we'll wait for 10 milliseconds to let the job start @@ -165,7 +165,7 @@ func TestWorkerRetry(t *testing.T) { enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) w.start() w.drain() w.stop() @@ -217,7 +217,7 @@ func TestWorkerRetryWithCustomBackoff(t *testing.T) { enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) w.start() w.drain() w.stop() @@ -274,7 +274,7 @@ func TestWorkerDead(t *testing.T) { assert.Nil(t, err) _, err = enqueuer.Enqueue(job2, nil) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) w.start() w.drain() w.stop() @@ -327,7 +327,7 @@ func TestWorkersPaused(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) // pause the jobs prior to starting err = pauseJobs(ns, job1, pool) assert.Nil(t, err) @@ -671,7 +671,7 @@ func TestWorkerRetryRemoveFromInProgress(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) w.start() defer w.stop()