Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

const (
deadTime = 10 * time.Second // 2 x heartbeat
reapPeriod = 10 * time.Minute
defaultReapPeriod = 5 * time.Minute
reapJitterSecs = 30
requeueKeysPerJob = 4
)
Expand All @@ -31,7 +31,11 @@ type deadPoolReaper struct {
doneStoppingChan chan struct{}
}

func newDeadPoolReaper(namespace string, pool Pool, curJobTypes []string) *deadPoolReaper {
func newDeadPoolReaper(namespace string, pool Pool, curJobTypes []string, reapPeriod time.Duration) *deadPoolReaper {
if reapPeriod == 0 {
reapPeriod = defaultReapPeriod
}

return &deadPoolReaper{
namespace: namespace,
pool: pool,
Expand All @@ -53,6 +57,8 @@ func (r *deadPoolReaper) stop() {
}

func (r *deadPoolReaper) loop() {
Logger.Printf("Reaper: started with a period of %v", r.reapPeriod)

// Reap immediately after we provide some time for initialization
timer := time.NewTimer(r.deadTime)
defer timer.Stop()
Expand All @@ -66,7 +72,6 @@ func (r *deadPoolReaper) loop() {
// Schedule next occurrence periodically with jitter
timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second)

// Reap
if err := r.reap(); err != nil {
logError("dead_pool_reaper.reap", err)
}
Expand All @@ -80,24 +85,35 @@ func (r *deadPoolReaper) reap() (err error) {
return err
}

Logger.Printf("Reaper: trying to acquire lock...")

acquired, err := r.acquireLock(lockValue)
if err != nil {
Logger.Printf("Reaper: acquiring lock: %v", err)
return err
}

// Another reaper is already running
if !acquired {
Logger.Printf("Reaper: locked by another process")
return nil
}

Logger.Printf("Reaper: lock is acquired")

defer func() {
err = r.releaseLock(lockValue)
}()

rErr := r.reapDeadPools()
cErr := r.clearUnknownPools()

return multierr.Combine(rErr, cErr)
// TODO: consider refactoring requeueInProgressJobs and cleanStaleLockInfo
// and removing removeDanglingLocks. There was a block where lock is 1 and
// lock_info is 0.
dErr := r.removeDanglingLocks()

return multierr.Combine(err, rErr, cErr, dErr)
}

// reapDeadPools collects the IDs of expired heartbeat pools and releases the
Expand All @@ -108,6 +124,8 @@ func (r *deadPoolReaper) reapDeadPools() error {
return err
}

Logger.Printf("Reaper: dead pools: %v", deadPoolIDs)

conn := r.pool.Get()
defer conn.Close()

Expand Down Expand Up @@ -150,6 +168,8 @@ func (r *deadPoolReaper) clearUnknownPools() error {
return err
}

Logger.Printf("Reaper: unknown pools: %v", unknownPools)

for poolID, jobTypes := range unknownPools {
if err = r.requeueInProgressJobs(poolID, jobTypes); err != nil {
return err
Expand Down Expand Up @@ -293,6 +313,31 @@ func (r *deadPoolReaper) getUnknownPools() (map[string][]string, error) {
return pools, nil
}

// removeDanglingLocks adjusts the lock keys according to the lock_info numbers.
// TODO: it's better to find where the inconsistency comes from.
func (r *deadPoolReaper) removeDanglingLocks() error {
keysCount := len(r.curJobTypes) * 2 // lock and lock_info keys
scriptArgs := make([]interface{}, 0, keysCount+1) // +1 for keys count arg
scriptArgs = append(scriptArgs, keysCount)

for _, j := range r.curJobTypes {
scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, j))
scriptArgs = append(scriptArgs, redisKeyJobsLockInfo(r.namespace, j))
}

conn := r.pool.Get()
defer conn.Close()

keys, err := redis.Strings(redisRemoveDanglingLocksScript.Do(conn, scriptArgs...))
if err != nil {
return err
}

Logger.Printf("Reaper: dangling locks: %v", keys)

return nil
}

// acquireLock acquires lock with a value and an expiration time for reap period.
func (r *deadPoolReaper) acquireLock(value string) (bool, error) {
conn := r.pool.Get()
Expand Down
77 changes: 68 additions & 9 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestDeadPoolReaper(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pool
reaper := newDeadPoolReaper(ns, pool, []string{})
reaper := newDeadPoolReaper(ns, pool, []string{}, 0)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools)
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
assert.EqualValues(t, 3, numPools)

// Test getting dead pool ids
reaper := newDeadPoolReaper(ns, pool, []string{"type1"})
reaper := newDeadPoolReaper(ns, pool, []string{"type1"}, 0)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"1": nil, "2": nil, "3": nil}, deadPools)
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pool
reaper := newDeadPoolReaper(ns, pool, []string{})
reaper := newDeadPoolReaper(ns, pool, []string{}, 0)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}}, deadPools)
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {

// setup a worker pool and start the reaper, which should restart the stale job above
wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1})
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"})
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}, 0)
wp.deadPoolReaper.deadTime = expectedDeadTime
wp.deadPoolReaper.start()

Expand Down Expand Up @@ -327,7 +327,7 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) {
err = conn.Flush()
assert.NoError(t, err)

reaper := newDeadPoolReaper(ns, pool, jobNames)
reaper := newDeadPoolReaper(ns, pool, jobNames, 0)
// clean lock info for workerPoolID1
err = reaper.cleanStaleLockInfo(workerPoolID1, jobNames)
assert.NoError(t, err)
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestDeadPoolReaperTakeDeadPools(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pools
reaper := newDeadPoolReaper(ns, pool, []string{})
reaper := newDeadPoolReaper(ns, pool, []string{}, 0)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": nil}, deadPools)
Expand All @@ -395,7 +395,7 @@ func TestReaperLock(t *testing.T) {
ns := "work"
cleanKeyspace(ns, pool)

reaper := newDeadPoolReaper(ns, pool, []string{})
reaper := newDeadPoolReaper(ns, pool, []string{}, 0)

value, err := genValue()
assert.NoError(t, err)
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestDeadPoolReaperGetUnknownPools(t *testing.T) {
assert.NoError(t, conn.Flush())

// Run test
reaper := newDeadPoolReaper(ns, pool, jobNames)
reaper := newDeadPoolReaper(ns, pool, jobNames, 0)
unknownPools, err := reaper.getUnknownPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, unknownPools)
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestDeadPoolReaperClearUnknownPool(t *testing.T) {
assert.NoError(t, conn.Flush())

// Run test
reaper := newDeadPoolReaper(ns, pool, jobNames)
reaper := newDeadPoolReaper(ns, pool, jobNames, 0)
err = reaper.clearUnknownPools()
assert.NoError(t, err)

Expand All @@ -551,3 +551,62 @@ func TestDeadPoolReaperClearUnknownPool(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, map[string]string{workerPoolID1: "0"}, nLockInfo2)
}

func TestDeadPoolReaperRemoveDanglingLocks(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

workerPoolID1, workerPoolID2 := "1", "2"

job1, job2, job3, job4 := "type1", "type2", "type3", "type4"
jobNames := []string{job1, job2, job3, job4}
lock1, lock2, lock3 := redisKeyJobsLock(ns, job1), redisKeyJobsLock(ns, job2), redisKeyJobsLock(ns, job3)
lockInfo1, lockInfo2 := redisKeyJobsLockInfo(ns, job1), redisKeyJobsLockInfo(ns, job2)

conn := pool.Get()
defer conn.Close()

// Create redis data
var err error

err = conn.Send("SET", lock1, 4) // One dangling lock
assert.NoError(t, err)

err = conn.Send("HMSET", lockInfo1,
workerPoolID1, 2,
workerPoolID2, 1,
)
assert.NoError(t, err)

err = conn.Send("SET", lock2, 1) // No dangling locks
assert.NoError(t, err)

err = conn.Send("HMSET", lockInfo2,
workerPoolID1, 0,
workerPoolID2, 1,
)
assert.NoError(t, err)

err = conn.Send("SET", lock3, 1) // One dangling lock
assert.NoError(t, err)

assert.NoError(t, conn.Flush())

reaper := newDeadPoolReaper(ns, pool, jobNames, 0)
err = reaper.removeDanglingLocks()
assert.NoError(t, err)

// Checks
nLock1, err := redis.Int(conn.Do("GET", lock1))
assert.NoError(t, err)
assert.Equal(t, 3, nLock1)

nLock2, err := redis.Int(conn.Do("GET", lock2))
assert.NoError(t, err)
assert.Equal(t, 1, nLock2)

nLock3, err := redis.Int(conn.Do("GET", lock3))
assert.NoError(t, err)
assert.Equal(t, 0, nLock3)
}
34 changes: 34 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,37 @@ end

return cjson.encode(unknownPools)
`)

// Used by the reaper to DECR dangling locks. Returns the dangling lock keys that
// have been fixed.
//
// KEYS[1] = job's lock key
// KEYS[2...] = job's lock info key
// Returns: ["ns:jobs:job1:lock", "ns:jobs:job3:lock"]
var redisRemoveDanglingLocksScript = redis.NewScript(-1, `
local danglingLocks = {}

for i=1,#KEYS,2 do
local lockKey = KEYS[i]
local lockInfoKey = KEYS[i+1]

local rlocks = redis.call('get', lockKey)
if rlocks ~= false then
local locks = tonumber(rlocks)
local lockInfo = redis.call('hvals', lockInfoKey)

local totalLocks = 0
for j=1,#lockInfo do
totalLocks = totalLocks + tonumber(lockInfo[j])
end

local diff = locks - totalLocks
if diff ~= 0 then
table.insert(danglingLocks, lockKey)
redis.call('decrby', lockKey, diff)
end
end
end

return danglingLocks
`)
Loading