diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index f3d5cdf6..a2deca76 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -199,6 +199,14 @@ func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) er return err } + negativeLocks, err := redis.Strings(redisReapLocksScript.Do(conn, scriptArgs...)) + if err != nil { + return err + } + if len(negativeLocks) > 0 { + Logger.Printf("Reaper: negative locks: %v", negativeLocks) + } + return nil } diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index fb576b6a..2fb0404f 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -344,8 +344,10 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { assert.EqualValues(t, 0, getInt64(pool, lock2)) // worker pool ID 2 removed from both lock info hashes v, err = conn.Do("HGET", lockInfo1, workerPoolID2) + assert.NoError(t, err) assert.Nil(t, v) v, err = conn.Do("HGET", lockInfo2, workerPoolID2) + assert.NoError(t, err) assert.Nil(t, v) } diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index 68e19ad5..74c8b9d6 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -100,7 +100,12 @@ func (pe *periodicEnqueuer) enqueue() error { Name: pj.jobName, ID: id, - // This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history. + // This is technically wrong, but this lets the bytes be + // identical for the same periodic job instance. If we don't do + // this, we'd need to use a different approach -- probably + // giving each periodic job its own history of the past 100 + // periodic jobs, and only scheduling a job if it's not in the + // history. EnqueuedAt: epoch, Args: nil, diff --git a/redis.go b/redis.go index 71dfabd2..b63cc8de 100644 --- a/redis.go +++ b/redis.go @@ -168,15 +168,61 @@ for i=1,keylen,%d do end return nil`, fetchKeysPerJobType) +// Used to remove job from the in-progress queue. +// +// KEYS[1] = in-progress job queue +// KEYS[2] = job's lock key +// KEYS[3] = job's lock info key +// KEYS[4] = forward queue +// ARGV[1] = worker pool id +// ARGV[2] = job value +// ARGV[3] = should the failed job be redirected to another queue? +// ARGV[4] = failed job score +// ARGV[5] = failed job value +var redisRemoveJobFromInProgress = redis.NewScript(4, ` +local function releaseLock(lockKey, lockInfoKey, workerPoolID) + redis.call('decr', lockKey) + redis.call('hincrby', lockInfoKey, workerPoolID, -1) +end + +local inProgQueue = KEYS[1] +local lockKey = KEYS[2] +local lockInfoKey = KEYS[3] +local workerPoolID = ARGV[1] +local job = ARGV[2] +local forward = ARGV[3] +local result = tonumber(redis.call('lrem', inProgQueue, 1, job)) + +if result ~= 0 then + releaseLock(lockKey, lockInfoKey, workerPoolID) + + if forward then + local queue = KEYS[4] + local score = ARGV[4] + local failedJob = ARGV[5] + + redis.call('zadd', queue, score, failedJob) + end +end + +return nil +`) + // Used by the reaper to re-enqueue jobs that were in progress // // KEYS[1] = the 1st job's in progress queue // KEYS[2] = the 1st job's job queue -// KEYS[3] = the 2nd job's in progress queue -// KEYS[4] = the 2nd job's job queue +// KEYS[3] = the 1nd job's lock key +// KEYS[4] = the 1nd job's lock info key +// KEYS[5] = the 2st job's in progress queue +// KEYS[6] = the 2st job's job queue +// KEYS[7] = the 2nd job's lock key +// KEYS[8] = the 2nd job's lock info key // ... // KEYS[N] = the last job's in progress queue // KEYS[N+1] = the last job's job queue +// KEYS[N+2] = the last job's lock key +// KEYS[N+3] = the last job's lock info key // ARGV[1] = workerPoolID for job queue var redisLuaReenqueueJob = fmt.Sprintf(` local function releaseLock(lockKey, lockInfoKey, workerPoolID) @@ -211,10 +257,12 @@ return nil`, requeueKeysPerJob) // KEYS[N] = the last job's lock // KEYS[N+1] = the last job's lock info haash // ARGV[1] = the dead worker pool id +// Returns: list of negative locks (which were set to zero) var redisLuaReapStaleLocks = ` local keylen = #KEYS local lock, lockInfo, deadLockCount local deadPoolID = ARGV[1] +local negativeLocks = {} for i=1,keylen,2 do lock = KEYS[i] @@ -226,11 +274,13 @@ for i=1,keylen,2 do redis.call('hdel', lockInfo, deadPoolID) if tonumber(redis.call('get', lock)) < 0 then + table.insert(negativeLocks, lock) redis.call('set', lock, 0) end end end -return nil + +return negativeLocks ` // KEYS[1] = zset of jobs (retry or scheduled), eg work:retry diff --git a/worker.go b/worker.go index e9131074..0f6c762f 100644 --- a/worker.go +++ b/worker.go @@ -211,16 +211,14 @@ func (w *worker) processJob(job *Job) { w.observeDone(job.Name, job.ID, runErr) } - fate := terminateOnly if runErr != nil { job.failed(runErr) - fate = w.jobFate(jt, job) } // Since we've taken the task and completed it, we must keep retrying commits // until we succeed, otherwise we'll end up with block job. retryErr(sleepBackoffs, func() error { - err := w.removeJobFromInProgress(job, fate) + err := w.removeJobFromInProgress(job, jt, runErr) if err != nil { logError("worker.remove_job_from_in_progress.lrem", err) } @@ -245,63 +243,58 @@ func (w *worker) deleteUniqueJob(job *Job) { } } -func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) error { - conn := w.pool.Get() - defer conn.Close() - - conn.Send("MULTI") - conn.Send("LREM", job.inProgQueue, 1, job.rawJSON) - conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) - conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) - fate(conn) - - _, err := conn.Do("EXEC") - - return err -} +func (w *worker) removeJobFromInProgress(job *Job, jt *jobType, runErr error) error { + var ( + forward bool + queue string + score int64 + failedJobRawJSON []byte + ) -func (w *worker) jobFate(jt *jobType, job *Job) terminateOp { - if jt != nil { - failsRemaining := int64(jt.MaxFails) - job.Fails - if failsRemaining > 0 { - return terminateAndRetry(w, jt, job) + if runErr != nil { + switch { + case jt != nil && jt.SkipDead: + forward = false + case jt != nil && int64(jt.MaxFails)-job.Fails > 0: + forward = true + queue = redisKeyRetry(w.namespace) + score = nowEpochSeconds() + jt.calcBackoff(job) + default: + // NOTE: sidekiq limits the # of jobs: only keep jobs for 6 months, and only keep a max # of jobs + // The max # of jobs seems really horrible. Seems like operations should be on top of it. + // conn.Send("ZREMRANGEBYSCORE", redisKeyDead(w.namespace), "-inf", now - keepInterval) + // conn.Send("ZREMRANGEBYRANK", redisKeyDead(w.namespace), 0, -maxJobs) + forward = true + queue = redisKeyDead(w.namespace) + score = nowEpochSeconds() } - if jt.SkipDead { - return terminateOnly + + if forward { + var err error + failedJobRawJSON, err = job.serialize() + if err != nil { + logError("worker.removeJobFromInProgress.serialize", err) + forward = false + } } } - return terminateAndDead(w, job) -} -type terminateOp func(conn redis.Conn) - -func terminateOnly(_ redis.Conn) {} - -func terminateAndRetry(w *worker, jt *jobType, job *Job) terminateOp { - rawJSON, err := job.serialize() - if err != nil { - logError("worker.terminate_and_retry.serialize", err) - return terminateOnly - } - return func(conn redis.Conn) { - conn.Send("ZADD", redisKeyRetry(w.namespace), nowEpochSeconds()+jt.calcBackoff(job), rawJSON) - } -} + conn := w.pool.Get() + defer conn.Close() -func terminateAndDead(w *worker, job *Job) terminateOp { - rawJSON, err := job.serialize() - if err != nil { - logError("worker.terminate_and_dead.serialize", err) - return terminateOnly - } - return func(conn redis.Conn) { - // NOTE: sidekiq limits the # of jobs: only keep jobs for 6 months, and only keep a max # of jobs - // The max # of jobs seems really horrible. Seems like operations should be on top of it. - // conn.Send("ZREMRANGEBYSCORE", redisKeyDead(w.namespace), "-inf", now - keepInterval) - // conn.Send("ZREMRANGEBYRANK", redisKeyDead(w.namespace), 0, -maxJobs) + _, err := redisRemoveJobFromInProgress.Do(conn, + job.inProgQueue, + redisKeyJobsLock(w.namespace, job.Name), + redisKeyJobsLockInfo(w.namespace, job.Name), + queue, + w.poolID, + job.rawJSON, + forward, + score, + failedJobRawJSON, + ) - conn.Send("ZADD", redisKeyDead(w.namespace), nowEpochSeconds(), rawJSON) - } + return err } // Default algorithm returns an fastly increasing backoff counter which grows in an unbounded fashion diff --git a/worker_pool.go b/worker_pool.go index 27e5a33b..26016d73 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -274,9 +274,10 @@ func (wp *WorkerPool) Drain() { func (wp *WorkerPool) startRequeuers() { jobNames := make([]string, 0, len(wp.jobTypes)) - for k := range wp.jobTypes { - jobNames = append(jobNames, k) + for name := range wp.jobTypes { + jobNames = append(jobNames, name) } + wp.retrier = newRequeuer(wp.namespace, wp.pool, redisKeyRetry(wp.namespace), jobNames) wp.scheduler = newRequeuer(wp.namespace, wp.pool, redisKeyScheduled(wp.namespace), jobNames) wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, jobNames, wp.reapPeriod)