From e301728b39c023641876be87d929e8f708bb039d Mon Sep 17 00:00:00 2001 From: hageshtrem Date: Fri, 7 Jul 2023 21:11:19 +0300 Subject: [PATCH] Remove stale periodic job from queue on requeue cycle --- redis.go | 14 +++++++++++++- requeuer_test.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/redis.go b/redis.go index b63cc8de..1660b232 100644 --- a/redis.go +++ b/redis.go @@ -299,7 +299,8 @@ if #res > 0 then redis.call('zrem', KEYS[1], res[1]) queue = ARGV[1] .. j['name'] - for _,v in pairs(KEYS) do + for i=3,#KEYS do + local v = KEYS[i] if v == queue then -- If for some reason (e.g., the service was offline) the periodic job was -- not executed, skip the execution. @@ -307,6 +308,17 @@ if #res > 0 then return 'ok' end + -- If the next task in the queue has expired, discard it. + local nextTask = redis.call('rpop', queue) + if nextTask then + local decNextTask = cjson.decode(nextTask) + local deadline = decNextTask['d'] + -- Return to the queue if the deadline is not set or has not expired. + if deadline == nil or nowTs < deadline then + redis.call('rpush', queue, nextTask) + end + end + j['t'] = nowTs redis.call('lpush', queue, cjson.encode(j)) diff --git a/requeuer_test.go b/requeuer_test.go index 055bf1d8..e01008ad 100644 --- a/requeuer_test.go +++ b/requeuer_test.go @@ -1,7 +1,9 @@ package work import ( + "context" "testing" + "time" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" @@ -112,3 +114,38 @@ func TestRequeuePeriodic(t *testing.T) { llen := listSize(pool, redisKeyJobs(ns, jobName)) assert.Equal(t, int64(0), llen) } + +func TestRequeueSlowJob(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + + jobName := "test_job" + jobSpec := "*/1 * * * * *" + + wp := NewWorkerPool(struct{}{}, 1, ns, pool) + defer wp.Stop() + + wp.PeriodicallyEnqueue(jobSpec, jobName) + + block := make(chan struct{}) + runned := make(chan struct{}) + + wp.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, func(context.Context, *Job) error { + close(runned) + <-block + return nil + }) + + wp.Start() + defer func() { + close(block) + wp.Stop() + }() + <-runned + + time.Sleep(time.Second * 2) + + llen := listSize(pool, redisKeyJobs(ns, jobName)) + assert.LessOrEqual(t, llen, int64(1)) +}