Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,26 @@ if #res > 0 then
redis.call('zrem', KEYS[1], res[1])
queue = ARGV[1] .. j['name']

for _,v in pairs(KEYS) do
for i=3,#KEYS do
local v = KEYS[i]
if v == queue then
-- If for some reason (e.g., the service was offline) the periodic job was
-- not executed, skip the execution.
if j['d'] ~= nil and nowTs > j['d'] then
return 'ok'
end

-- If the next task in the queue has expired, discard it.
local nextTask = redis.call('rpop', queue)
if nextTask then
local decNextTask = cjson.decode(nextTask)
local deadline = decNextTask['d']
-- Return to the queue if the deadline is not set or has not expired.
if deadline == nil or nowTs < deadline then
redis.call('rpush', queue, nextTask)
end
end

j['t'] = nowTs
redis.call('lpush', queue, cjson.encode(j))

Expand Down
37 changes: 37 additions & 0 deletions requeuer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package work

import (
"context"
"testing"
"time"

"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -112,3 +114,38 @@ func TestRequeuePeriodic(t *testing.T) {
llen := listSize(pool, redisKeyJobs(ns, jobName))
assert.Equal(t, int64(0), llen)
}

func TestRequeueSlowJob(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

jobName := "test_job"
jobSpec := "*/1 * * * * *"

wp := NewWorkerPool(struct{}{}, 1, ns, pool)
defer wp.Stop()

wp.PeriodicallyEnqueue(jobSpec, jobName)

block := make(chan struct{})
runned := make(chan struct{})

wp.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, func(context.Context, *Job) error {
close(runned)
<-block
return nil
})

wp.Start()
defer func() {
close(block)
wp.Stop()
}()
<-runned

time.Sleep(time.Second * 2)

llen := listSize(pool, redisKeyJobs(ns, jobName))
assert.LessOrEqual(t, llen, int64(1))
}