diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index a9ef4656..5d5259b1 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -2,6 +2,7 @@ package work import ( "fmt" + "log/slog" "math/rand" "time" @@ -120,6 +121,12 @@ func (pe *periodicEnqueuer) enqueue() error { StartingDeadline: pj.schedule.Next(t).Unix(), } + pe.logger.Debug("periodic_enqueuer.enqueue", + slog.Time("job_scheduled_time", t), + slog.String("job_name", pj.jobName), + slog.String("job_id", id), + ) + rawJSON, err := job.serialize() if err != nil { return err diff --git a/watchdog.go b/watchdog.go new file mode 100644 index 00000000..d91c0a56 --- /dev/null +++ b/watchdog.go @@ -0,0 +1,274 @@ +package work + +import ( + "container/heap" + "log/slog" + "sync" + "sync/atomic" + "time" +) + +// WatchdogFailCheckingTimeout a default checking timeout that marks task as failed. +const WatchdogFailCheckingTimeout = 60 * time.Second + +const processedJobsBuffer = 256 + +// The WatchdogStat struct represents statistics for a periodic jobs, including the name, counter, +type WatchdogStat struct { + Name string + Processed int64 + Skipped int64 +} + +// watchdog a struct that checks that periodic tasks are running. +// It is based on data about planned tasks and how they are actually processed. +type watchdog struct { + periodicJobs []*periodicJob + jobs map[string]*watchdogJob + processedJobs chan *Job + failCheckingTimeout time.Duration + stopChan chan struct{} + logger StructuredLogger +} + +type watchdogOption func(w *watchdog) + +func watchdogWithLogger(logger StructuredLogger) watchdogOption { + return func(w *watchdog) { + w.logger = logger + } +} + +func watchdogWithFailCheckingTimeout(t time.Duration) watchdogOption { + return func(w *watchdog) { + w.failCheckingTimeout = t + } +} + +func newWatchdog(opts ...watchdogOption) *watchdog { + w := &watchdog{ + jobs: make(map[string]*watchdogJob), + processedJobs: make(chan *Job, processedJobsBuffer), + stopChan: make(chan struct{}), + } + + for _, opt := range opts { + opt(w) + } + + if w.logger == nil { + w.logger = noopLogger + } + + if w.failCheckingTimeout == 0 { + w.failCheckingTimeout = WatchdogFailCheckingTimeout + } + + return w +} + +func (w *watchdog) addPeriodicJobs(jobs ...*periodicJob) { + w.periodicJobs = append(w.periodicJobs, jobs...) + for _, j := range w.periodicJobs { + w.jobs[j.jobName] = &watchdogJob{ + checkTimes: newCheckTimesHeap(), + } + } +} + +func (w *watchdog) start() { + const checkTimeout = time.Second + + go func() { + timer := time.NewTicker(checkTimeout) + defer timer.Stop() + + for { + select { + case t := <-timer.C: + w.planning(t) + w.checking(t) + case j := <-w.processedJobs: + w.processed(j) + case <-w.stopChan: + return + } + } + }() +} + +func (w *watchdog) stop() { + w.stopChan <- struct{}{} +} + +// planning method is responsible for planning the execution of periodic jobs. +// It iterates over the list of periodic jobs, calculates the next scheduled time for each job +// based on the current time `t`, and updates the check list for each job with the new scheduled time. +func (w *watchdog) planning(t time.Time) { + for _, j := range w.periodicJobs { + n := j.schedule.Next(t) + h := w.jobs[j.jobName].checkTimes + if h.Push(n) { + w.logger.Debug("Watchdog: planning job", + slog.String("job_name", j.jobName), + slog.Time("job_next_time", n), + slog.Int("jobs_total_planned", h.Len()), + ) + } + } +} + +// checking checks for skipped jobs based on the time `t`. +// It iterates over the scheduled times for each job and compares them with the +// current time plus the fail checking timeout. If a job's scheduled time has passed the fail checking +// timeout, it is considered as skipped, removed from the check list, and the `skip` method is called +// to increment the skipped count for that job. +func (w *watchdog) checking(t time.Time) { + for name, job := range w.jobs { + job.each(func(h *checkTimesHeap) bool { + n, _ := h.Peek() + if n.Add(w.failCheckingTimeout).Before(t) { + h.Pop() + job.skipped.Add(1) + + w.logger.Error("Watchdog: skipped job", + slog.String("job_name", name), + slog.Time("job_next_time", n), + slog.Int64("jobs_skipped", job.skipped.Load()), + ) + + return false + } + + return true + }) + } +} + +// processed method is responsible for handling a processed job in the watchdog system. +// It iterates over the scheduled times for each job and check if job was successfully processed. +func (w *watchdog) processed(j *Job) { + job, ok := w.jobs[j.Name] + if !ok { + return + } + + job.each(func(h *checkTimesHeap) bool { + n, _ := h.Peek() + if n.Before(time.Unix(j.EnqueuedAt, 0)) { + h.Pop() + job.processed.Add(1) + + w.logger.Debug("Watchdog: successfully processed job", + slog.String("job_name", j.Name), + slog.Time("job_next_time", n), + slog.Int64("jobs_processed", job.processed.Load()), + ) + return false + } + return true + }) +} + +func (w *watchdog) stats() []WatchdogStat { + res := make([]WatchdogStat, 0, len(w.jobs)) + + for k, v := range w.jobs { + res = append(res, WatchdogStat{ + Name: k, + Processed: v.processed.Load(), + Skipped: v.skipped.Load(), + }) + } + + return res +} + +type watchdogJob struct { + checkTimes *checkTimesHeap + processed atomic.Int64 + skipped atomic.Int64 +} + +func (w *watchdogJob) each(cb func(h *checkTimesHeap) bool) { + h := w.checkTimes + for h.Len() > 0 { + if cb(h) { + return + } + } +} + +type checkTimes []time.Time + +func (h checkTimes) Len() int { return len(h) } +func (h checkTimes) Less(i, j int) bool { return h[i].Before(h[j]) } +func (h checkTimes) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *checkTimes) Push(x any) { + *h = append(*h, x.(time.Time)) +} + +func (h *checkTimes) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type checkTimesHeap struct { + mu sync.RWMutex + uniqTimes map[time.Time]struct{} + checkTimes *checkTimes +} + +func newCheckTimesHeap() *checkTimesHeap { + h := &checkTimes{} + heap.Init(h) + + return &checkTimesHeap{ + uniqTimes: make(map[time.Time]struct{}), + checkTimes: h, + } +} + +func (h *checkTimesHeap) Len() int { + h.mu.RLock() + defer h.mu.RUnlock() + + return h.checkTimes.Len() +} + +func (h *checkTimesHeap) Peek() (time.Time, bool) { + h.mu.RLock() + defer h.mu.RUnlock() + + if h.checkTimes.Len() > 0 { + return (*h.checkTimes)[0], true + } + return time.Time{}, false +} + +func (h *checkTimesHeap) Push(t time.Time) bool { + h.mu.Lock() + defer h.mu.Unlock() + + if _, exists := h.uniqTimes[t]; exists { + return false + } + + h.uniqTimes[t] = struct{}{} + heap.Push(h.checkTimes, t) + + return true +} + +func (h *checkTimesHeap) Pop() time.Time { + h.mu.Lock() + defer h.mu.Unlock() + + t := heap.Pop(h.checkTimes).(time.Time) + delete(h.uniqTimes, t) + return t +} diff --git a/watchdog_test.go b/watchdog_test.go new file mode 100644 index 00000000..90b5408e --- /dev/null +++ b/watchdog_test.go @@ -0,0 +1,99 @@ +package work + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCheckTimesHeap(t *testing.T) { + t1, _ := time.Parse(time.RFC3339, "2019-01-01T00:00:00Z") + t2, _ := time.Parse(time.RFC3339, "2020-01-01T00:00:00Z") + t3, _ := time.Parse(time.RFC3339, "2021-01-01T00:00:00Z") + + tests := []struct { + data []time.Time + expected []time.Time + }{ + { + []time.Time{}, + []time.Time{}, + }, + { + []time.Time{t1}, + []time.Time{t1}, + }, + { + []time.Time{t1, t1}, + []time.Time{t1}, + }, + { + []time.Time{t1, t2, t3}, + []time.Time{t1, t2, t3}, + }, + { + []time.Time{t3, t3, t3}, + []time.Time{t3}, + }, + { + []time.Time{t2, t1}, + []time.Time{t1, t2}, + }, + { + []time.Time{t3, t2, t1}, + []time.Time{t1, t2, t3}, + }, + { + []time.Time{t3, t3, t3, t2, t2, t1}, + []time.Time{t1, t2, t3}, + }, + } + + for i, tt := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + h := newCheckTimesHeap() + for _, v := range tt.data { + h.Push(v) + } + require.Equal(t, len(tt.expected), h.Len()) + + for i := 0; h.Len() > 0; i++ { + tm := h.Pop() + require.Equal(t, tt.expected[i], tm) + } + }) + } +} + +func TestWatchdog(t *testing.T) { + t.Parallel() + + require := require.New(t) + + const jobName = "test" + j, err := newPeriodicJob("* * * * * *", jobName) + require.NoError(err) + + w := newWatchdog( + watchdogWithFailCheckingTimeout(time.Millisecond * 2000), + ) + defer w.stop() + + w.addPeriodicJobs(j) + w.start() + w.planning(time.Now()) + + time.Sleep(time.Millisecond * 2000) + w.processedJobs <- &Job{ + Name: jobName, + EnqueuedAt: time.Now().Unix(), + } + time.Sleep(time.Millisecond * 500) + + require.Equal(WatchdogStat{Name: "test", Processed: 1, Skipped: 0}, w.stats()[0]) + + time.Sleep(time.Millisecond * 1600) + require.Equal(WatchdogStat{Name: "test", Processed: 1, Skipped: 1}, w.stats()[0]) +} diff --git a/worker.go b/worker.go index 7382332b..33ca77cc 100644 --- a/worker.go +++ b/worker.go @@ -20,13 +20,14 @@ var sleepBackoffs = []time.Duration{ } type worker struct { - workerID string - poolID string - namespace string - pool Pool - jobTypes map[string]*jobType - middleware []*middlewareHandler - contextType reflect.Type + workerID string + poolID string + namespace string + pool Pool + jobTypes map[string]*jobType + middleware []*middlewareHandler + contextType reflect.Type + processedJobs chan<- *Job redisFetchScript *redis.Script sampler prioritySampler @@ -54,16 +55,18 @@ func newWorker( middleware []*middlewareHandler, jobTypes map[string]*jobType, logger StructuredLogger, + processedJobs chan<- *Job, ) *worker { workerID := makeIdentifier() ob := newObserver(namespace, pool, workerID, logger) w := &worker{ - workerID: workerID, - poolID: poolID, - namespace: namespace, - pool: pool, - contextType: contextType, + workerID: workerID, + poolID: poolID, + namespace: namespace, + pool: pool, + contextType: contextType, + processedJobs: processedJobs, observer: ob, @@ -139,6 +142,9 @@ func (w *worker) loop() { w.logger.Error("worker.fetch", errAttr(err)) timer.Reset(10 * time.Millisecond) } else if job != nil { + if w.processedJobs != nil { + w.processedJobs <- job + } w.processJob(job) consequtiveNoJobs = 0 timer.Reset(0) diff --git a/worker_pool.go b/worker_pool.go index 23823522..9f05bd67 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -2,6 +2,7 @@ package work import ( "context" + "log/slog" "reflect" "sort" "strings" @@ -20,11 +21,13 @@ type WorkerPool struct { namespace string // eg, "myapp-work" pool Pool - contextType reflect.Type - jobTypes map[string]*jobType - middleware []*middlewareHandler - started bool - periodicJobs []*periodicJob + contextType reflect.Type + jobTypes map[string]*jobType + middleware []*middlewareHandler + started bool + periodicJobs []*periodicJob + watchdog *watchdog + watchdogFailCheckingTimeout time.Duration workers []*worker heartbeater *workerPoolHeartbeater @@ -121,6 +124,11 @@ func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool Poo opt(wp) } + wp.watchdog = newWatchdog( + watchdogWithLogger(wp.logger), + watchdogWithFailCheckingTimeout(wp.watchdogFailCheckingTimeout), + ) + for i := uint(0); i < wp.concurrency; i++ { w := newWorker( wp.namespace, @@ -130,6 +138,7 @@ func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool Poo nil, wp.jobTypes, wp.logger, + wp.watchdog.processedJobs, ) wp.workers = append(wp.workers, w) } @@ -211,17 +220,26 @@ func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interfa return wp } +func newPeriodicJob(spec string, jobName string) (*periodicJob, error) { + schedule, err := cron.NewParser(cronFormat).Parse(spec) + if err != nil { + return nil, err + } + + return &periodicJob{jobName: jobName, spec: spec, schedule: schedule}, nil +} + // PeriodicallyEnqueue will periodically enqueue jobName according to the cron-based spec. // The spec format is based on github.com/robfig/cron/v3, which is a relatively standard cron format. // Note that the first value can be seconds! // If you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once. func (wp *WorkerPool) PeriodicallyEnqueue(spec string, jobName string) *WorkerPool { - schedule, err := cron.NewParser(cronFormat).Parse(spec) + j, err := newPeriodicJob(spec, jobName) if err != nil { panic(err) } - wp.periodicJobs = append(wp.periodicJobs, &periodicJob{jobName: jobName, spec: spec, schedule: schedule}) + wp.periodicJobs = append(wp.periodicJobs, j) return wp } @@ -259,6 +277,13 @@ func (wp *WorkerPool) Start() { wp.logger, ) wp.periodicEnqueuer.start() + + wp.watchdog.addPeriodicJobs(wp.periodicJobs...) + wp.watchdog.start() +} + +func (wp *WorkerPool) WatchdogStats() []WatchdogStat { + return wp.watchdog.stats() } // Stop stops the workers and associated processes. @@ -282,6 +307,7 @@ func (wp *WorkerPool) Stop() { wp.scheduler.stop() wp.deadPoolReaper.stop() wp.periodicEnqueuer.stop() + wp.watchdog.stop() } // Drain drains all jobs in the queue before returning. Note that if jobs are added faster than we can process them, this function wouldn't return. @@ -341,6 +367,7 @@ func (wp *WorkerPool) writeKnownJobsToRedis() { jobNames = append(jobNames, k) } + wp.logger.Debug("write_known_jobs", slog.Any("job_names", jobNames)) if _, err := conn.Do("SADD", jobNames...); err != nil { wp.logger.Error("write_known_jobs", errAttr(err)) } @@ -596,3 +623,11 @@ func WithLogger(l StructuredLogger) WorkerPoolOption { wp.logger = l } } + +// WithWatchdogFailCheckingTimeout defines the watchdog checking timeout +// that marks task as failed (default WatchdogFailCheckingTimeout). +func WithWatchdogFailCheckingTimeout(p time.Duration) WorkerPoolOption { + return func(wp *WorkerPool) { + wp.watchdogFailCheckingTimeout = p + } +} diff --git a/worker_test.go b/worker_test.go index f63201cb..eb9ee370 100644 --- a/worker_test.go +++ b/worker_test.go @@ -63,7 +63,7 @@ func TestWorkerBasics(t *testing.T) { _, err = enqueuer.Enqueue(job3, Q{"a": 3}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger, nil) w.start() w.drain() w.stop() @@ -113,7 +113,7 @@ func TestWorkerInProgress(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger, nil) w.start() // instead of w.forceIter(), we'll wait for 10 milliseconds to let the job start @@ -164,7 +164,7 @@ func TestWorkerRetry(t *testing.T) { enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger, nil) w.start() w.drain() w.stop() @@ -216,7 +216,7 @@ func TestWorkerRetryWithCustomBackoff(t *testing.T) { enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger, nil) w.start() w.drain() w.stop() @@ -273,7 +273,7 @@ func TestWorkerDead(t *testing.T) { assert.Nil(t, err) _, err = enqueuer.Enqueue(job2, nil) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger, nil) w.start() w.drain() w.stop() @@ -326,7 +326,7 @@ func TestWorkersPaused(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger, nil) // pause the jobs prior to starting err = pauseJobs(ns, job1, pool) assert.Nil(t, err) @@ -668,7 +668,7 @@ func TestWorkerRetryRemoveFromInProgress(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger) + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, noopLogger, nil) w.start() defer w.stop()