Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions periodic_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package work

import (
"fmt"
"log/slog"
"math/rand"
"time"

Expand Down Expand Up @@ -120,6 +121,12 @@ func (pe *periodicEnqueuer) enqueue() error {
StartingDeadline: pj.schedule.Next(t).Unix(),
}

pe.logger.Debug("periodic_enqueuer.enqueue",
slog.Time("job_scheduled_time", t),
slog.String("job_name", pj.jobName),
slog.String("job_id", id),
)

rawJSON, err := job.serialize()
if err != nil {
return err
Expand Down
274 changes: 274 additions & 0 deletions watchdog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package work

import (
"container/heap"
"log/slog"
"sync"
"sync/atomic"
"time"
)

// WatchdogFailCheckingTimeout a default checking timeout that marks task as failed.
const WatchdogFailCheckingTimeout = 60 * time.Second

const processedJobsBuffer = 256

// The WatchdogStat struct represents statistics for a periodic jobs, including the name, counter,
type WatchdogStat struct {
Name string
Processed int64
Skipped int64
}

// watchdog a struct that checks that periodic tasks are running.
// It is based on data about planned tasks and how they are actually processed.
type watchdog struct {
periodicJobs []*periodicJob
jobs map[string]*watchdogJob
processedJobs chan *Job
failCheckingTimeout time.Duration
stopChan chan struct{}
logger StructuredLogger
}

type watchdogOption func(w *watchdog)

func watchdogWithLogger(logger StructuredLogger) watchdogOption {
return func(w *watchdog) {
w.logger = logger
}
}

func watchdogWithFailCheckingTimeout(t time.Duration) watchdogOption {
return func(w *watchdog) {
w.failCheckingTimeout = t
}
}

func newWatchdog(opts ...watchdogOption) *watchdog {
w := &watchdog{
jobs: make(map[string]*watchdogJob),
processedJobs: make(chan *Job, processedJobsBuffer),
stopChan: make(chan struct{}),
}

for _, opt := range opts {
opt(w)
}

if w.logger == nil {
w.logger = noopLogger
}

if w.failCheckingTimeout == 0 {
w.failCheckingTimeout = WatchdogFailCheckingTimeout
}

return w
}

func (w *watchdog) addPeriodicJobs(jobs ...*periodicJob) {
w.periodicJobs = append(w.periodicJobs, jobs...)
for _, j := range w.periodicJobs {
w.jobs[j.jobName] = &watchdogJob{
checkTimes: newCheckTimesHeap(),
}
}
}

func (w *watchdog) start() {
const checkTimeout = time.Second

go func() {
Comment thread
lzakharov marked this conversation as resolved.
timer := time.NewTicker(checkTimeout)
defer timer.Stop()

for {
select {
case t := <-timer.C:
w.planning(t)
w.checking(t)
case j := <-w.processedJobs:
w.processed(j)
case <-w.stopChan:
return
}
}
}()
}

func (w *watchdog) stop() {
w.stopChan <- struct{}{}
}

// planning method is responsible for planning the execution of periodic jobs.
// It iterates over the list of periodic jobs, calculates the next scheduled time for each job
// based on the current time `t`, and updates the check list for each job with the new scheduled time.
func (w *watchdog) planning(t time.Time) {
for _, j := range w.periodicJobs {
n := j.schedule.Next(t)
h := w.jobs[j.jobName].checkTimes
if h.Push(n) {
w.logger.Debug("Watchdog: planning job",
slog.String("job_name", j.jobName),
slog.Time("job_next_time", n),
slog.Int("jobs_total_planned", h.Len()),
)
}
}
}

// checking checks for skipped jobs based on the time `t`.
// It iterates over the scheduled times for each job and compares them with the
// current time plus the fail checking timeout. If a job's scheduled time has passed the fail checking
// timeout, it is considered as skipped, removed from the check list, and the `skip` method is called
// to increment the skipped count for that job.
func (w *watchdog) checking(t time.Time) {
for name, job := range w.jobs {
job.each(func(h *checkTimesHeap) bool {
n, _ := h.Peek()
if n.Add(w.failCheckingTimeout).Before(t) {
h.Pop()
job.skipped.Add(1)

w.logger.Error("Watchdog: skipped job",
slog.String("job_name", name),
slog.Time("job_next_time", n),
slog.Int64("jobs_skipped", job.skipped.Load()),
)

return false
}

return true
})
}
}

// processed method is responsible for handling a processed job in the watchdog system.
// It iterates over the scheduled times for each job and check if job was successfully processed.
func (w *watchdog) processed(j *Job) {
job, ok := w.jobs[j.Name]
if !ok {
return
}

job.each(func(h *checkTimesHeap) bool {
n, _ := h.Peek()
if n.Before(time.Unix(j.EnqueuedAt, 0)) {
h.Pop()
job.processed.Add(1)

w.logger.Debug("Watchdog: successfully processed job",
slog.String("job_name", j.Name),
slog.Time("job_next_time", n),
slog.Int64("jobs_processed", job.processed.Load()),
)
return false
}
return true
})
}

func (w *watchdog) stats() []WatchdogStat {
res := make([]WatchdogStat, 0, len(w.jobs))

for k, v := range w.jobs {
res = append(res, WatchdogStat{
Name: k,
Processed: v.processed.Load(),
Skipped: v.skipped.Load(),
})
}

return res
}

type watchdogJob struct {
checkTimes *checkTimesHeap
processed atomic.Int64
skipped atomic.Int64
}

func (w *watchdogJob) each(cb func(h *checkTimesHeap) bool) {
h := w.checkTimes
for h.Len() > 0 {
if cb(h) {
return
}
}
}

type checkTimes []time.Time

func (h checkTimes) Len() int { return len(h) }
func (h checkTimes) Less(i, j int) bool { return h[i].Before(h[j]) }
func (h checkTimes) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *checkTimes) Push(x any) {
*h = append(*h, x.(time.Time))
}

func (h *checkTimes) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

type checkTimesHeap struct {
mu sync.RWMutex
uniqTimes map[time.Time]struct{}
checkTimes *checkTimes
}

func newCheckTimesHeap() *checkTimesHeap {
h := &checkTimes{}
heap.Init(h)

return &checkTimesHeap{
uniqTimes: make(map[time.Time]struct{}),
checkTimes: h,
}
}

func (h *checkTimesHeap) Len() int {
h.mu.RLock()
defer h.mu.RUnlock()

return h.checkTimes.Len()
}

func (h *checkTimesHeap) Peek() (time.Time, bool) {
h.mu.RLock()
defer h.mu.RUnlock()

if h.checkTimes.Len() > 0 {
return (*h.checkTimes)[0], true
}
return time.Time{}, false
}

func (h *checkTimesHeap) Push(t time.Time) bool {
h.mu.Lock()
defer h.mu.Unlock()

if _, exists := h.uniqTimes[t]; exists {
return false
}

h.uniqTimes[t] = struct{}{}
heap.Push(h.checkTimes, t)

return true
}

func (h *checkTimesHeap) Pop() time.Time {
h.mu.Lock()
defer h.mu.Unlock()

t := heap.Pop(h.checkTimes).(time.Time)
delete(h.uniqTimes, t)
return t
}
99 changes: 99 additions & 0 deletions watchdog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package work

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestCheckTimesHeap(t *testing.T) {
t1, _ := time.Parse(time.RFC3339, "2019-01-01T00:00:00Z")
t2, _ := time.Parse(time.RFC3339, "2020-01-01T00:00:00Z")
t3, _ := time.Parse(time.RFC3339, "2021-01-01T00:00:00Z")

tests := []struct {
data []time.Time
expected []time.Time
}{
{
[]time.Time{},
[]time.Time{},
},
{
[]time.Time{t1},
[]time.Time{t1},
},
{
[]time.Time{t1, t1},
[]time.Time{t1},
},
{
[]time.Time{t1, t2, t3},
[]time.Time{t1, t2, t3},
},
{
[]time.Time{t3, t3, t3},
[]time.Time{t3},
},
{
[]time.Time{t2, t1},
[]time.Time{t1, t2},
},
{
[]time.Time{t3, t2, t1},
[]time.Time{t1, t2, t3},
},
{
[]time.Time{t3, t3, t3, t2, t2, t1},
[]time.Time{t1, t2, t3},
},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
h := newCheckTimesHeap()
for _, v := range tt.data {
h.Push(v)
}
require.Equal(t, len(tt.expected), h.Len())

for i := 0; h.Len() > 0; i++ {
tm := h.Pop()
require.Equal(t, tt.expected[i], tm)
}
})
}
}

func TestWatchdog(t *testing.T) {
t.Parallel()

require := require.New(t)

const jobName = "test"
j, err := newPeriodicJob("* * * * * *", jobName)
require.NoError(err)

w := newWatchdog(
watchdogWithFailCheckingTimeout(time.Millisecond * 2000),
)
defer w.stop()

w.addPeriodicJobs(j)
w.start()
w.planning(time.Now())

time.Sleep(time.Millisecond * 2000)
w.processedJobs <- &Job{
Name: jobName,
EnqueuedAt: time.Now().Unix(),
}
time.Sleep(time.Millisecond * 500)

require.Equal(WatchdogStat{Name: "test", Processed: 1, Skipped: 0}, w.stats()[0])

time.Sleep(time.Millisecond * 1600)
require.Equal(WatchdogStat{Name: "test", Processed: 1, Skipped: 1}, w.stats()[0])
}
Loading