Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) er
return err
}

negativeLocks, err := redis.Strings(redisReapLocksScript.Do(conn, scriptArgs...))
if err != nil {
return err
}
if len(negativeLocks) > 0 {
Logger.Printf("Reaper: negative locks: %v", negativeLocks)
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,10 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) {
assert.EqualValues(t, 0, getInt64(pool, lock2))
// worker pool ID 2 removed from both lock info hashes
v, err = conn.Do("HGET", lockInfo1, workerPoolID2)
assert.NoError(t, err)
assert.Nil(t, v)
v, err = conn.Do("HGET", lockInfo2, workerPoolID2)
assert.NoError(t, err)
assert.Nil(t, v)
}

Expand Down
7 changes: 6 additions & 1 deletion periodic_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ func (pe *periodicEnqueuer) enqueue() error {
Name: pj.jobName,
ID: id,

// This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history.
// This is technically wrong, but this lets the bytes be
// identical for the same periodic job instance. If we don't do
// this, we'd need to use a different approach -- probably
// giving each periodic job its own history of the past 100
// periodic jobs, and only scheduling a job if it's not in the
// history.
EnqueuedAt: epoch,
Args: nil,

Expand Down
56 changes: 53 additions & 3 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,61 @@ for i=1,keylen,%d do
end
return nil`, fetchKeysPerJobType)

// Used to remove job from the in-progress queue.
//
// KEYS[1] = in-progress job queue
// KEYS[2] = job's lock key
// KEYS[3] = job's lock info key
// KEYS[4] = forward queue
// ARGV[1] = worker pool id
// ARGV[2] = job value
// ARGV[3] = should the failed job be redirected to another queue?
// ARGV[4] = failed job score
// ARGV[5] = failed job value
var redisRemoveJobFromInProgress = redis.NewScript(4, `
local function releaseLock(lockKey, lockInfoKey, workerPoolID)
redis.call('decr', lockKey)
redis.call('hincrby', lockInfoKey, workerPoolID, -1)
end

local inProgQueue = KEYS[1]
local lockKey = KEYS[2]
local lockInfoKey = KEYS[3]
local workerPoolID = ARGV[1]
local job = ARGV[2]
local forward = ARGV[3]
local result = tonumber(redis.call('lrem', inProgQueue, 1, job))

if result ~= 0 then
releaseLock(lockKey, lockInfoKey, workerPoolID)

if forward then
local queue = KEYS[4]
local score = ARGV[4]
local failedJob = ARGV[5]

redis.call('zadd', queue, score, failedJob)
end
end

return nil
`)

// Used by the reaper to re-enqueue jobs that were in progress
//
// KEYS[1] = the 1st job's in progress queue
// KEYS[2] = the 1st job's job queue
// KEYS[3] = the 2nd job's in progress queue
// KEYS[4] = the 2nd job's job queue
// KEYS[3] = the 1nd job's lock key
// KEYS[4] = the 1nd job's lock info key
// KEYS[5] = the 2st job's in progress queue
// KEYS[6] = the 2st job's job queue
// KEYS[7] = the 2nd job's lock key
// KEYS[8] = the 2nd job's lock info key
// ...
// KEYS[N] = the last job's in progress queue
// KEYS[N+1] = the last job's job queue
// KEYS[N+2] = the last job's lock key
// KEYS[N+3] = the last job's lock info key
// ARGV[1] = workerPoolID for job queue
var redisLuaReenqueueJob = fmt.Sprintf(`
local function releaseLock(lockKey, lockInfoKey, workerPoolID)
Expand Down Expand Up @@ -211,10 +257,12 @@ return nil`, requeueKeysPerJob)
// KEYS[N] = the last job's lock
// KEYS[N+1] = the last job's lock info haash
// ARGV[1] = the dead worker pool id
// Returns: list of negative locks (which were set to zero)
var redisLuaReapStaleLocks = `
local keylen = #KEYS
local lock, lockInfo, deadLockCount
local deadPoolID = ARGV[1]
local negativeLocks = {}

for i=1,keylen,2 do
lock = KEYS[i]
Expand All @@ -226,11 +274,13 @@ for i=1,keylen,2 do
redis.call('hdel', lockInfo, deadPoolID)

if tonumber(redis.call('get', lock)) < 0 then
table.insert(negativeLocks, lock)
redis.call('set', lock, 0)
end
end
end
return nil

return negativeLocks
`

// KEYS[1] = zset of jobs (retry or scheduled), eg work:retry
Expand Down
99 changes: 46 additions & 53 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,14 @@ func (w *worker) processJob(job *Job) {
w.observeDone(job.Name, job.ID, runErr)
}

fate := terminateOnly
if runErr != nil {
job.failed(runErr)
fate = w.jobFate(jt, job)
}

// Since we've taken the task and completed it, we must keep retrying commits
// until we succeed, otherwise we'll end up with block job.
retryErr(sleepBackoffs, func() error {
err := w.removeJobFromInProgress(job, fate)
err := w.removeJobFromInProgress(job, jt, runErr)
if err != nil {
logError("worker.remove_job_from_in_progress.lrem", err)
}
Expand All @@ -245,63 +243,58 @@ func (w *worker) deleteUniqueJob(job *Job) {
}
}

func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) error {
conn := w.pool.Get()
defer conn.Close()

conn.Send("MULTI")
conn.Send("LREM", job.inProgQueue, 1, job.rawJSON)
conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name))
conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1)
fate(conn)

_, err := conn.Do("EXEC")

return err
}
func (w *worker) removeJobFromInProgress(job *Job, jt *jobType, runErr error) error {
var (
forward bool
queue string
score int64
failedJobRawJSON []byte
)

func (w *worker) jobFate(jt *jobType, job *Job) terminateOp {
if jt != nil {
failsRemaining := int64(jt.MaxFails) - job.Fails
if failsRemaining > 0 {
return terminateAndRetry(w, jt, job)
if runErr != nil {
switch {
case jt != nil && jt.SkipDead:
forward = false
case jt != nil && int64(jt.MaxFails)-job.Fails > 0:
forward = true
queue = redisKeyRetry(w.namespace)
score = nowEpochSeconds() + jt.calcBackoff(job)
default:
// NOTE: sidekiq limits the # of jobs: only keep jobs for 6 months, and only keep a max # of jobs
// The max # of jobs seems really horrible. Seems like operations should be on top of it.
// conn.Send("ZREMRANGEBYSCORE", redisKeyDead(w.namespace), "-inf", now - keepInterval)
// conn.Send("ZREMRANGEBYRANK", redisKeyDead(w.namespace), 0, -maxJobs)
forward = true
queue = redisKeyDead(w.namespace)
score = nowEpochSeconds()
}
if jt.SkipDead {
return terminateOnly

if forward {
var err error
failedJobRawJSON, err = job.serialize()
if err != nil {
logError("worker.removeJobFromInProgress.serialize", err)
forward = false
}
}
}
return terminateAndDead(w, job)
}

type terminateOp func(conn redis.Conn)

func terminateOnly(_ redis.Conn) {}

func terminateAndRetry(w *worker, jt *jobType, job *Job) terminateOp {
rawJSON, err := job.serialize()
if err != nil {
logError("worker.terminate_and_retry.serialize", err)
return terminateOnly
}
return func(conn redis.Conn) {
conn.Send("ZADD", redisKeyRetry(w.namespace), nowEpochSeconds()+jt.calcBackoff(job), rawJSON)
}
}
conn := w.pool.Get()
defer conn.Close()

func terminateAndDead(w *worker, job *Job) terminateOp {
rawJSON, err := job.serialize()
if err != nil {
logError("worker.terminate_and_dead.serialize", err)
return terminateOnly
}
return func(conn redis.Conn) {
// NOTE: sidekiq limits the # of jobs: only keep jobs for 6 months, and only keep a max # of jobs
// The max # of jobs seems really horrible. Seems like operations should be on top of it.
// conn.Send("ZREMRANGEBYSCORE", redisKeyDead(w.namespace), "-inf", now - keepInterval)
// conn.Send("ZREMRANGEBYRANK", redisKeyDead(w.namespace), 0, -maxJobs)
_, err := redisRemoveJobFromInProgress.Do(conn,
job.inProgQueue,
redisKeyJobsLock(w.namespace, job.Name),
redisKeyJobsLockInfo(w.namespace, job.Name),
queue,
w.poolID,
job.rawJSON,
forward,
score,
failedJobRawJSON,
)

conn.Send("ZADD", redisKeyDead(w.namespace), nowEpochSeconds(), rawJSON)
}
return err
}

// Default algorithm returns an fastly increasing backoff counter which grows in an unbounded fashion
Expand Down
5 changes: 3 additions & 2 deletions worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,10 @@ func (wp *WorkerPool) Drain() {

func (wp *WorkerPool) startRequeuers() {
jobNames := make([]string, 0, len(wp.jobTypes))
for k := range wp.jobTypes {
jobNames = append(jobNames, k)
for name := range wp.jobTypes {
jobNames = append(jobNames, name)
}

wp.retrier = newRequeuer(wp.namespace, wp.pool, redisKeyRetry(wp.namespace), jobNames)
wp.scheduler = newRequeuer(wp.namespace, wp.pool, redisKeyScheduled(wp.namespace), jobNames)
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, jobNames, wp.reapPeriod)
Expand Down