From a871b8172ac15893379acd74c14af28047e957c3 Mon Sep 17 00:00:00 2001 From: hageshtrem Date: Wed, 22 Mar 2023 15:18:34 +0300 Subject: [PATCH 1/2] Add retries for committing completed jobs --- dead_pool_reaper.go | 19 ++++++- dead_pool_reaper_test.go | 18 +++---- worker.go | 87 +++++++++++++++++++++++--------- worker_pool.go | 20 +++++++- worker_pool_test.go | 3 +- worker_test.go | 104 ++++++++++++++++++++++++++++++++++++++- 6 files changed, 213 insertions(+), 38 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index bdb2e407..4af71622 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -15,7 +15,7 @@ import ( const ( deadTime = 10 * time.Second // 2 x heartbeat - reapPeriod = 10 * time.Minute + defaultReapPeriod = 1 * time.Minute reapJitterSecs = 30 requeueKeysPerJob = 4 ) @@ -31,7 +31,11 @@ type deadPoolReaper struct { doneStoppingChan chan struct{} } -func newDeadPoolReaper(namespace string, pool Pool, curJobTypes []string) *deadPoolReaper { +func newDeadPoolReaper(namespace string, pool Pool, curJobTypes []string, reapPeriod time.Duration) *deadPoolReaper { + if reapPeriod == 0 { + reapPeriod = defaultReapPeriod + } + return &deadPoolReaper{ namespace: namespace, pool: pool, @@ -53,6 +57,8 @@ func (r *deadPoolReaper) stop() { } func (r *deadPoolReaper) loop() { + Logger.Printf("Reaper: started with a period of %v", r.reapPeriod) + // Reap immediately after we provide some time for initialization timer := time.NewTimer(r.deadTime) defer timer.Stop() @@ -80,6 +86,8 @@ func (r *deadPoolReaper) reap() (err error) { return err } + Logger.Printf("Reaper: trying to acquire lock...") + acquired, err := r.acquireLock(lockValue) if err != nil { return err @@ -87,9 +95,12 @@ func (r *deadPoolReaper) reap() (err error) { // Another reaper is already running if !acquired { + Logger.Printf("Reaper: locked by another process") return nil } + Logger.Printf("Reaper: lock is acquired") + defer func() { err = r.releaseLock(lockValue) }() @@ -108,6 +119,8 @@ func (r *deadPoolReaper) reapDeadPools() error { return err } + Logger.Printf("Reaper: dead pools: %v", deadPoolIDs) + conn := r.pool.Get() defer conn.Close() @@ -150,6 +163,8 @@ func (r *deadPoolReaper) clearUnknownPools() error { return err } + Logger.Printf("Reaper: unknown pools: %v", unknownPools) + for poolID, jobTypes := range unknownPools { if err = r.requeueInProgressJobs(poolID, jobTypes); err != nil { return err diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index 90cbc040..279d1a62 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -48,7 +48,7 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool, []string{}) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools) @@ -127,7 +127,7 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { assert.EqualValues(t, 3, numPools) // Test getting dead pool ids - reaper := newDeadPoolReaper(ns, pool, []string{"type1"}) + reaper := newDeadPoolReaper(ns, pool, []string{"type1"}, 0) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"1": nil, "2": nil, "3": nil}, deadPools) @@ -210,7 +210,7 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool, []string{}) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"2": {"type1", "type2"}}, deadPools) @@ -283,7 +283,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { // setup a worker pool and start the reaper, which should restart the stale job above wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1}) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}) + wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}, 0) wp.deadPoolReaper.deadTime = expectedDeadTime wp.deadPoolReaper.start() @@ -327,7 +327,7 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { err = conn.Flush() assert.NoError(t, err) - reaper := newDeadPoolReaper(ns, pool, jobNames) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0) // clean lock info for workerPoolID1 err = reaper.cleanStaleLockInfo(workerPoolID1, jobNames) assert.NoError(t, err) @@ -384,7 +384,7 @@ func TestDeadPoolReaperTakeDeadPools(t *testing.T) { assert.NoError(t, err) // Test getting dead pools - reaper := newDeadPoolReaper(ns, pool, []string{}) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": nil}, deadPools) @@ -395,7 +395,7 @@ func TestReaperLock(t *testing.T) { ns := "work" cleanKeyspace(ns, pool) - reaper := newDeadPoolReaper(ns, pool, []string{}) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0) value, err := genValue() assert.NoError(t, err) @@ -472,7 +472,7 @@ func TestDeadPoolReaperGetUnknownPools(t *testing.T) { assert.NoError(t, conn.Flush()) // Run test - reaper := newDeadPoolReaper(ns, pool, jobNames) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0) unknownPools, err := reaper.getUnknownPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, unknownPools) @@ -531,7 +531,7 @@ func TestDeadPoolReaperClearUnknownPool(t *testing.T) { assert.NoError(t, conn.Flush()) // Run test - reaper := newDeadPoolReaper(ns, pool, jobNames) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0) err = reaper.clearUnknownPools() assert.NoError(t, err) diff --git a/worker.go b/worker.go index d66a0ba4..e9131074 100644 --- a/worker.go +++ b/worker.go @@ -11,6 +11,14 @@ import ( const fetchKeysPerJobType = 6 +var sleepBackoffs = []time.Duration{ + time.Millisecond * 0, + time.Millisecond * 10, + time.Millisecond * 100, + time.Millisecond * 1000, + time.Millisecond * 5000, +} + type worker struct { workerID string poolID string @@ -97,8 +105,6 @@ func (w *worker) drain() { w.observer.drain() } -var sleepBackoffsInMilliseconds = []int64{0, 10, 100, 1000, 5000} - func (w *worker) loop() { var drained bool var consequtiveNoJobs int64 @@ -131,10 +137,10 @@ func (w *worker) loop() { } consequtiveNoJobs++ idx := consequtiveNoJobs - if idx >= int64(len(sleepBackoffsInMilliseconds)) { - idx = int64(len(sleepBackoffsInMilliseconds)) - 1 + if idx >= int64(len(sleepBackoffs)) { + idx = int64(len(sleepBackoffs)) - 1 } - timer.Reset(time.Duration(sleepBackoffsInMilliseconds[idx]) * time.Millisecond) + timer.Reset(sleepBackoffs[idx]) } } } @@ -192,6 +198,7 @@ func (w *worker) processJob(job *Job) { if job.Unique { w.deleteUniqueJob(job) } + var runErr error jt := w.jobTypes[job.Name] if jt == nil { @@ -209,14 +216,26 @@ func (w *worker) processJob(job *Job) { job.failed(runErr) fate = w.jobFate(jt, job) } - w.removeJobFromInProgress(job, fate) + + // Since we've taken the task and completed it, we must keep retrying commits + // until we succeed, otherwise we'll end up with block job. + retryErr(sleepBackoffs, func() error { + err := w.removeJobFromInProgress(job, fate) + if err != nil { + logError("worker.remove_job_from_in_progress.lrem", err) + } + + return err + }) } func (w *worker) deleteUniqueJob(job *Job) { uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Args) if err != nil { logError("worker.delete_unique_job.key", err) + return } + conn := w.pool.Get() defer conn.Close() @@ -226,7 +245,7 @@ func (w *worker) deleteUniqueJob(job *Job) { } } -func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) { +func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) error { conn := w.pool.Get() defer conn.Close() @@ -235,14 +254,29 @@ func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) { conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) fate(conn) - if _, err := conn.Do("EXEC"); err != nil { - logError("worker.remove_job_from_in_progress.lrem", err) + + _, err := conn.Do("EXEC") + + return err +} + +func (w *worker) jobFate(jt *jobType, job *Job) terminateOp { + if jt != nil { + failsRemaining := int64(jt.MaxFails) - job.Fails + if failsRemaining > 0 { + return terminateAndRetry(w, jt, job) + } + if jt.SkipDead { + return terminateOnly + } } + return terminateAndDead(w, job) } type terminateOp func(conn redis.Conn) -func terminateOnly(_ redis.Conn) { return } +func terminateOnly(_ redis.Conn) {} + func terminateAndRetry(w *worker, jt *jobType, job *Job) terminateOp { rawJSON, err := job.serialize() if err != nil { @@ -253,6 +287,7 @@ func terminateAndRetry(w *worker, jt *jobType, job *Job) terminateOp { conn.Send("ZADD", redisKeyRetry(w.namespace), nowEpochSeconds()+jt.calcBackoff(job), rawJSON) } } + func terminateAndDead(w *worker, job *Job) terminateOp { rawJSON, err := job.serialize() if err != nil { @@ -269,21 +304,27 @@ func terminateAndDead(w *worker, job *Job) terminateOp { } } -func (w *worker) jobFate(jt *jobType, job *Job) terminateOp { - if jt != nil { - failsRemaining := int64(jt.MaxFails) - job.Fails - if failsRemaining > 0 { - return terminateAndRetry(w, jt, job) - } - if jt.SkipDead { - return terminateOnly - } - } - return terminateAndDead(w, job) -} - // Default algorithm returns an fastly increasing backoff counter which grows in an unbounded fashion func defaultBackoffCalculator(job *Job) int64 { fails := job.Fails return (fails * fails * fails * fails) + 15 + (rand.Int63n(30) * (fails + 1)) } + +// retryErr retries fn until success. +func retryErr(backoffs []time.Duration, fn func() error) { + for attempt := 0; ; attempt++ { + err := fn() + if err == nil { + break + } + + if len(backoffs) != 0 { + idx := attempt + if idx >= len(backoffs) { + idx = len(backoffs) - 1 + } + + time.Sleep(backoffs[idx]) + } + } +} diff --git a/worker_pool.go b/worker_pool.go index 32e22b93..27e5a33b 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -6,6 +6,7 @@ import ( "sort" "strings" "sync" + "time" "github.com/robfig/cron/v3" ) @@ -29,6 +30,7 @@ type WorkerPool struct { heartbeater *workerPoolHeartbeater retrier *requeuer scheduler *requeuer + reapPeriod time.Duration deadPoolReaper *deadPoolReaper periodicEnqueuer *periodicEnqueuer } @@ -95,7 +97,7 @@ type ( // NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. // concurrency specifies how many workers to spin up - each worker can process jobs concurrently. -func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool Pool) *WorkerPool { +func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool Pool, opts ...WorkerPoolOption) *WorkerPool { if pool == nil { panic("NewWorkerPool needs a non-nil Pool") } @@ -111,6 +113,10 @@ func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool Poo jobTypes: make(map[string]*jobType), } + for _, opt := range opts { + opt(wp) + } + for i := uint(0); i < wp.concurrency; i++ { w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, nil, wp.jobTypes) wp.workers = append(wp.workers, w) @@ -273,7 +279,7 @@ func (wp *WorkerPool) startRequeuers() { } wp.retrier = newRequeuer(wp.namespace, wp.pool, redisKeyRetry(wp.namespace), jobNames) wp.scheduler = newRequeuer(wp.namespace, wp.pool, redisKeyScheduled(wp.namespace), jobNames) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, jobNames) + wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, jobNames, wp.reapPeriod) wp.retrier.start() wp.scheduler.start() wp.deadPoolReaper.start() @@ -533,3 +539,13 @@ func applyDefaultsAndValidate(jobOpts JobOptions) JobOptions { return jobOpts } + +// WorkerPoolOption is an optional option for WorkerPool. +type WorkerPoolOption func(wp *WorkerPool) + +// WithReapPeriod defines the reaper running cycle period. +func WithReapPeriod(p time.Duration) WorkerPoolOption { + return func(wp *WorkerPool) { + wp.reapPeriod = p + } +} diff --git a/worker_pool_test.go b/worker_pool_test.go index 343079ca..039bee43 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -268,6 +268,7 @@ func setupTestWorkerPool(pool *redis.Pool, namespace, jobName string, concurrenc wp := NewWorkerPool(TestContext{}, uint(concurrency), namespace, pool) wp.JobWithOptions(jobName, jobOpts, (*TestContext).SleepyJob) // reset the backoff times to help with testing - sleepBackoffsInMilliseconds = []int64{10, 10, 10, 10, 10} + sleepBackoffs = []time.Duration{time.Millisecond * 10} + return wp } diff --git a/worker_test.go b/worker_test.go index 121cb213..7b1affa0 100644 --- a/worker_test.go +++ b/worker_test.go @@ -2,7 +2,10 @@ package work import ( "fmt" + "io" + "log" "strconv" + "sync" "sync/atomic" "testing" "time" @@ -329,7 +332,7 @@ func TestWorkersPaused(t *testing.T) { err = pauseJobs(ns, job1, pool) assert.Nil(t, err) // reset the backoff times to help with testing - sleepBackoffsInMilliseconds = []int64{10, 10, 10, 10, 10} + sleepBackoffs = []time.Duration{time.Millisecond * 10} w.start() // make sure the jobs stay in the still in the run queue and not moved to in progress @@ -631,3 +634,102 @@ func TestWorkerPoolStop(t *testing.T) { t.Errorf("Expected that jobs queue was not completely emptied.") } } + +func TestWorkerRetryRemoveFromInProgress(t *testing.T) { + Logger = log.Default() + + originPool := newTestPool(":6379") + pool := newSwitchablePool(originPool) + ns := "work" + job1 := "job1" + + cleanKeyspace(ns, originPool) + + // reset the backoff times to help with testing + sleepBackoffs = []time.Duration{time.Millisecond * 10} + + var wg sync.WaitGroup + wg.Add(1) + + jobTypes := map[string]*jobType{ + job1: { + Name: job1, + JobOptions: JobOptions{Priority: 1}, + isGeneric: true, + genericHandler: func(job *Job) error { + defer wg.Done() + + // Connection loss emulation. + pool.Off() + + return nil + }, + }, + } + + enqueuer := NewEnqueuer(ns, pool) + _, err := enqueuer.Enqueue(job1, Q{"a": 1}) + assert.Nil(t, err) + + w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w.start() + defer w.stop() + + wg.Wait() + + // One job still in progress. + assert.EqualValues(t, 1, listSize(originPool, redisKeyJobsInProgress(ns, "1", job1))) + + pool.On() + time.Sleep(time.Millisecond * 10) + + // Nothing in retries or dead. + assert.EqualValues(t, 0, zsetSize(originPool, redisKeyRetry(ns))) + assert.EqualValues(t, 0, zsetSize(originPool, redisKeyDead(ns))) + + // Nothing in the queues or in-progress queues. + assert.EqualValues(t, 0, listSize(originPool, redisKeyJobs(ns, job1))) + assert.EqualValues(t, 0, listSize(originPool, redisKeyJobsInProgress(ns, "1", job1))) +} + +type switchablePool struct { + pool Pool + off atomic.Bool +} + +func newSwitchablePool(pool Pool) *switchablePool { + return &switchablePool{pool: pool} +} + +func (p *switchablePool) Get() redis.Conn { + return &switchableConn{p.pool.Get(), &p.off} +} + +func (p *switchablePool) On() { + p.off.Store(false) +} + +func (p *switchablePool) Off() { + p.off.Store(true) +} + +type switchableConn struct { + redis.Conn + off *atomic.Bool +} + +func (c *switchableConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) { + if !c.off.Load() { + return c.Conn.Do(commandName, args...) + } + + return nil, io.EOF +} + +func (c *switchableConn) Send(commandName string, args ...interface{}) error { + if !c.off.Load() { + return c.Conn.Send(commandName, args...) + } + + return io.EOF +} From d45e6b40bb20d0d6ecadafecd4ec1491a3d37c78 Mon Sep 17 00:00:00 2001 From: hageshtrem Date: Fri, 24 Mar 2023 17:33:47 +0300 Subject: [PATCH 2/2] Adjust the lock keys according to the lock_info numbers --- dead_pool_reaper.go | 36 ++++++++++++++++++++++-- dead_pool_reaper_test.go | 59 ++++++++++++++++++++++++++++++++++++++++ redis.go | 34 +++++++++++++++++++++++ 3 files changed, 126 insertions(+), 3 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index 4af71622..f3d5cdf6 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -15,7 +15,7 @@ import ( const ( deadTime = 10 * time.Second // 2 x heartbeat - defaultReapPeriod = 1 * time.Minute + defaultReapPeriod = 5 * time.Minute reapJitterSecs = 30 requeueKeysPerJob = 4 ) @@ -72,7 +72,6 @@ func (r *deadPoolReaper) loop() { // Schedule next occurrence periodically with jitter timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second) - // Reap if err := r.reap(); err != nil { logError("dead_pool_reaper.reap", err) } @@ -90,6 +89,7 @@ func (r *deadPoolReaper) reap() (err error) { acquired, err := r.acquireLock(lockValue) if err != nil { + Logger.Printf("Reaper: acquiring lock: %v", err) return err } @@ -108,7 +108,12 @@ func (r *deadPoolReaper) reap() (err error) { rErr := r.reapDeadPools() cErr := r.clearUnknownPools() - return multierr.Combine(rErr, cErr) + // TODO: consider refactoring requeueInProgressJobs and cleanStaleLockInfo + // and removing removeDanglingLocks. There was a block where lock is 1 and + // lock_info is 0. + dErr := r.removeDanglingLocks() + + return multierr.Combine(err, rErr, cErr, dErr) } // reapDeadPools collects the IDs of expired heartbeat pools and releases the @@ -308,6 +313,31 @@ func (r *deadPoolReaper) getUnknownPools() (map[string][]string, error) { return pools, nil } +// removeDanglingLocks adjusts the lock keys according to the lock_info numbers. +// TODO: it's better to find where the inconsistency comes from. +func (r *deadPoolReaper) removeDanglingLocks() error { + keysCount := len(r.curJobTypes) * 2 // lock and lock_info keys + scriptArgs := make([]interface{}, 0, keysCount+1) // +1 for keys count arg + scriptArgs = append(scriptArgs, keysCount) + + for _, j := range r.curJobTypes { + scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, j)) + scriptArgs = append(scriptArgs, redisKeyJobsLockInfo(r.namespace, j)) + } + + conn := r.pool.Get() + defer conn.Close() + + keys, err := redis.Strings(redisRemoveDanglingLocksScript.Do(conn, scriptArgs...)) + if err != nil { + return err + } + + Logger.Printf("Reaper: dangling locks: %v", keys) + + return nil +} + // acquireLock acquires lock with a value and an expiration time for reap period. func (r *deadPoolReaper) acquireLock(value string) (bool, error) { conn := r.pool.Get() diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index 279d1a62..fb576b6a 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -551,3 +551,62 @@ func TestDeadPoolReaperClearUnknownPool(t *testing.T) { assert.NoError(t, err) assert.Equal(t, map[string]string{workerPoolID1: "0"}, nLockInfo2) } + +func TestDeadPoolReaperRemoveDanglingLocks(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + + workerPoolID1, workerPoolID2 := "1", "2" + + job1, job2, job3, job4 := "type1", "type2", "type3", "type4" + jobNames := []string{job1, job2, job3, job4} + lock1, lock2, lock3 := redisKeyJobsLock(ns, job1), redisKeyJobsLock(ns, job2), redisKeyJobsLock(ns, job3) + lockInfo1, lockInfo2 := redisKeyJobsLockInfo(ns, job1), redisKeyJobsLockInfo(ns, job2) + + conn := pool.Get() + defer conn.Close() + + // Create redis data + var err error + + err = conn.Send("SET", lock1, 4) // One dangling lock + assert.NoError(t, err) + + err = conn.Send("HMSET", lockInfo1, + workerPoolID1, 2, + workerPoolID2, 1, + ) + assert.NoError(t, err) + + err = conn.Send("SET", lock2, 1) // No dangling locks + assert.NoError(t, err) + + err = conn.Send("HMSET", lockInfo2, + workerPoolID1, 0, + workerPoolID2, 1, + ) + assert.NoError(t, err) + + err = conn.Send("SET", lock3, 1) // One dangling lock + assert.NoError(t, err) + + assert.NoError(t, conn.Flush()) + + reaper := newDeadPoolReaper(ns, pool, jobNames, 0) + err = reaper.removeDanglingLocks() + assert.NoError(t, err) + + // Checks + nLock1, err := redis.Int(conn.Do("GET", lock1)) + assert.NoError(t, err) + assert.Equal(t, 3, nLock1) + + nLock2, err := redis.Int(conn.Do("GET", lock2)) + assert.NoError(t, err) + assert.Equal(t, 1, nLock2) + + nLock3, err := redis.Int(conn.Do("GET", lock3)) + assert.NoError(t, err) + assert.Equal(t, 0, nLock3) +} diff --git a/redis.go b/redis.go index 24e84d22..71dfabd2 100644 --- a/redis.go +++ b/redis.go @@ -440,3 +440,37 @@ end return cjson.encode(unknownPools) `) + +// Used by the reaper to DECR dangling locks. Returns the dangling lock keys that +// have been fixed. +// +// KEYS[1] = job's lock key +// KEYS[2...] = job's lock info key +// Returns: ["ns:jobs:job1:lock", "ns:jobs:job3:lock"] +var redisRemoveDanglingLocksScript = redis.NewScript(-1, ` +local danglingLocks = {} + +for i=1,#KEYS,2 do + local lockKey = KEYS[i] + local lockInfoKey = KEYS[i+1] + + local rlocks = redis.call('get', lockKey) + if rlocks ~= false then + local locks = tonumber(rlocks) + local lockInfo = redis.call('hvals', lockInfoKey) + + local totalLocks = 0 + for j=1,#lockInfo do + totalLocks = totalLocks + tonumber(lockInfo[j]) + end + + local diff = locks - totalLocks + if diff ~= 0 then + table.insert(danglingLocks, lockKey) + redis.call('decrby', lockKey, diff) + end + end +end + +return danglingLocks +`)