From 9173d08d28bd1b5e2915fd7da5e368ae6b57298a Mon Sep 17 00:00:00 2001 From: hageshtrem Date: Thu, 7 Dec 2023 18:37:09 +0300 Subject: [PATCH] Add a dead pool reaper hook --- dead_pool_reaper.go | 130 +++++++++++++++++++++++++++++---------- dead_pool_reaper_test.go | 88 +++++++++++++++++++++----- go.mod | 2 - go.sum | 6 -- redis.go | 4 ++ worker_pool.go | 17 ++++- 6 files changed, 189 insertions(+), 58 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index a2deca76..95bc275b 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -4,13 +4,13 @@ import ( crand "crypto/rand" "encoding/base64" "encoding/json" + "errors" "fmt" "math/rand" "strings" "time" "github.com/gomodule/redigo/redis" - "go.uber.org/multierr" ) const ( @@ -20,6 +20,24 @@ const ( requeueKeysPerJob = 4 ) +// ReapResult is a set of data that reaper works with. +type ReapResult struct { + // Err is any errors during the reaper cycle. + Err error + // NoPoolHeartBeatJobs is a collection of job names that have been adjusted + // due to outdated worker pool heartbeats. + NoPoolHeartBeatJobs []string + // UnknownPoolJobs is a set of job names that have been adjusted because the + // worker pools working on them are not part of the overall set of worker pools. + UnknownPoolJobs []string + // DanglingLockJobs is a set of job names that have been adjusted due to + // inconsistency in their "lock" and "lock_info" keys. + DanglingLockJobs []string +} + +// ReaperHook can be used to monitor the reaper's actions. +type ReaperHook func() (afterHook func(ReapResult)) + type deadPoolReaper struct { namespace string pool Pool @@ -29,9 +47,17 @@ type deadPoolReaper struct { stopChan chan struct{} doneStoppingChan chan struct{} + + hook ReaperHook } -func newDeadPoolReaper(namespace string, pool Pool, curJobTypes []string, reapPeriod time.Duration) *deadPoolReaper { +func newDeadPoolReaper( + namespace string, + pool Pool, + curJobTypes []string, + reapPeriod time.Duration, + hook ReaperHook, +) *deadPoolReaper { if reapPeriod == 0 { reapPeriod = defaultReapPeriod } @@ -44,6 +70,7 @@ func newDeadPoolReaper(namespace string, pool Pool, curJobTypes []string, reapPe curJobTypes: curJobTypes, stopChan: make(chan struct{}), doneStoppingChan: make(chan struct{}), + hook: hook, } } @@ -105,82 +132,106 @@ func (r *deadPoolReaper) reap() (err error) { err = r.releaseLock(lockValue) }() - rErr := r.reapDeadPools() - cErr := r.clearUnknownPools() + reapResult := ReapResult{} + if r.hook != nil { + finish := r.hook() + + if finish != nil { + defer func() { finish(reapResult) }() + } + } + + deadPools, rErr := r.reapDeadPools() + if jobs := deadPools.getAllJobs(); len(jobs) != 0 { + Logger.Printf("Reaper: dead pools: %v", deadPools) + + reapResult.NoPoolHeartBeatJobs = jobs + } + + unknownPools, cErr := r.clearUnknownPools() + if jobs := unknownPools.getAllJobs(); len(jobs) != 0 { + Logger.Printf("Reaper: unknown pools: %v", unknownPools) + + reapResult.UnknownPoolJobs = jobs + } // TODO: consider refactoring requeueInProgressJobs and cleanStaleLockInfo // and removing removeDanglingLocks. There was a block where lock is 1 and // lock_info is 0. - dErr := r.removeDanglingLocks() + jobs, dErr := r.removeDanglingLocks() + if len(jobs) != 0 { + Logger.Printf("Reaper: dangling locks: %v", jobs) + + reapResult.DanglingLockJobs = jobs + } - return multierr.Combine(err, rErr, cErr, dErr) + reapResult.Err = errors.Join(err, rErr, cErr, dErr) + + return reapResult.Err } // reapDeadPools collects the IDs of expired heartbeat pools and releases the // associated resources. -func (r *deadPoolReaper) reapDeadPools() error { - deadPoolIDs, err := r.findDeadPools() +func (r *deadPoolReaper) reapDeadPools() (poolsJobs, error) { + deadPools, err := r.findDeadPools() if err != nil { - return err + return nil, err } - Logger.Printf("Reaper: dead pools: %v", deadPoolIDs) - conn := r.pool.Get() defer conn.Close() // Cleanup all dead pools - for deadPoolID, jobTypes := range deadPoolIDs { + for deadPoolID, jobTypes := range deadPools { lockJobTypes := jobTypes // if we found jobs from the heartbeat, requeue them and remove the heartbeat if len(jobTypes) > 0 { if err = r.requeueInProgressJobs(deadPoolID, jobTypes); err != nil { - return err + return deadPools, err } if _, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID)); err != nil { - return err + return deadPools, err } } else { // try to clean up locks for the current set of jobs if heartbeat was not found lockJobTypes = r.curJobTypes + deadPools[deadPoolID] = r.curJobTypes } // Cleanup any stale lock info if err = r.cleanStaleLockInfo(deadPoolID, lockJobTypes); err != nil { - return err + return deadPools, err } // Remove dead pool from worker pools set if _, err = conn.Do("SREM", redisKeyWorkerPools(r.namespace), deadPoolID); err != nil { - return err + return deadPools, err } } - return nil + return deadPools, nil } // clearUnknownPools enumerates the lock_info keys, collects pool IDs that are // not in the worker_pools set, and releases associated locks. -func (r *deadPoolReaper) clearUnknownPools() error { +func (r *deadPoolReaper) clearUnknownPools() (poolsJobs, error) { unknownPools, err := r.getUnknownPools() if err != nil { - return err + return nil, err } - Logger.Printf("Reaper: unknown pools: %v", unknownPools) - for poolID, jobTypes := range unknownPools { if err = r.requeueInProgressJobs(poolID, jobTypes); err != nil { - return err + return unknownPools, err } if err = r.cleanStaleLockInfo(poolID, jobTypes); err != nil { - return err + return unknownPools, err } } - return nil + return unknownPools, nil } func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) error { @@ -240,7 +291,7 @@ func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) } // findDeadPools returns staled pools IDs and associated jobs. -func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) { +func (r *deadPoolReaper) findDeadPools() (poolsJobs, error) { conn := r.pool.Get() defer conn.Close() @@ -250,7 +301,7 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) { return nil, err } - deadPools := make(map[string][]string, len(workerPoolIDs)) + deadPools := make(poolsJobs, len(workerPoolIDs)) for _, workerPoolID := range workerPoolIDs { heartbeatKey := redisKeyHeartbeat(r.namespace, workerPoolID) heartbeatAt, err := redis.Int64(conn.Do("HGET", heartbeatKey, "heartbeat_at")) @@ -284,7 +335,7 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) { // getUnknownPools returns the IDs of the unknown pools and associated job types // found in the lock_info keys. -func (r *deadPoolReaper) getUnknownPools() (map[string][]string, error) { +func (r *deadPoolReaper) getUnknownPools() (poolsJobs, error) { scriptArgs := make([]interface{}, 0, len(r.curJobTypes)+2) // +2 for keys count and pools key scriptArgs = append(scriptArgs, len(r.curJobTypes)+1) // +1 for pools key scriptArgs = append(scriptArgs, redisKeyWorkerPools(r.namespace)) @@ -301,7 +352,7 @@ func (r *deadPoolReaper) getUnknownPools() (map[string][]string, error) { return nil, err } - var pools map[string][]string + var pools poolsJobs if err := json.Unmarshal(data, &pools); err != nil { return nil, err @@ -323,7 +374,7 @@ func (r *deadPoolReaper) getUnknownPools() (map[string][]string, error) { // removeDanglingLocks adjusts the lock keys according to the lock_info numbers. // TODO: it's better to find where the inconsistency comes from. -func (r *deadPoolReaper) removeDanglingLocks() error { +func (r *deadPoolReaper) removeDanglingLocks() ([]string, error) { keysCount := len(r.curJobTypes) * 2 // lock and lock_info keys scriptArgs := make([]interface{}, 0, keysCount+1) // +1 for keys count arg scriptArgs = append(scriptArgs, keysCount) @@ -338,12 +389,15 @@ func (r *deadPoolReaper) removeDanglingLocks() error { keys, err := redis.Strings(redisRemoveDanglingLocksScript.Do(conn, scriptArgs...)) if err != nil { - return err + return nil, err } - Logger.Printf("Reaper: dangling locks: %v", keys) + // convert lock keys to job types + for i, k := range keys { + keys[i] = redisJobNameFromLockKey(r.namespace, k) + } - return nil + return keys, nil } // acquireLock acquires lock with a value and an expiration time for reap period. @@ -380,3 +434,15 @@ func genValue() (string, error) { return base64.StdEncoding.EncodeToString(b), nil } + +type poolsJobs map[string][]string + +func (p poolsJobs) getAllJobs() []string { + r := make([]string, 0, len(p)) + + for _, jobs := range p { + r = append(r, jobs...) + } + + return r +} diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index 2fb0404f..86a7736b 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -6,6 +6,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestDeadPoolReaper(t *testing.T) { @@ -48,10 +49,10 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool, []string{}, 0) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) - assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools) + assert.Equal(t, poolsJobs{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools) // Test requeueing jobs _, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo") @@ -127,10 +128,10 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { assert.EqualValues(t, 3, numPools) // Test getting dead pool ids - reaper := newDeadPoolReaper(ns, pool, []string{"type1"}, 0) + reaper := newDeadPoolReaper(ns, pool, []string{"type1"}, 0, nil) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) - assert.Equal(t, map[string][]string{"1": nil, "2": nil, "3": nil}, deadPools) + assert.Equal(t, poolsJobs{"1": nil, "2": nil, "3": nil}, deadPools) // Test requeueing jobs _, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo") @@ -210,10 +211,10 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool, []string{}, 0) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) - assert.Equal(t, map[string][]string{"2": {"type1", "type2"}}, deadPools) + assert.Equal(t, poolsJobs{"2": {"type1", "type2"}}, deadPools) // Test requeueing jobs _, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "1", "type1"), "foo") @@ -283,7 +284,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { // setup a worker pool and start the reaper, which should restart the stale job above wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1}) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}, 0) + wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}, 0, nil) wp.deadPoolReaper.deadTime = expectedDeadTime wp.deadPoolReaper.start() @@ -327,7 +328,7 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { err = conn.Flush() assert.NoError(t, err) - reaper := newDeadPoolReaper(ns, pool, jobNames, 0) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil) // clean lock info for workerPoolID1 err = reaper.cleanStaleLockInfo(workerPoolID1, jobNames) assert.NoError(t, err) @@ -386,10 +387,10 @@ func TestDeadPoolReaperTakeDeadPools(t *testing.T) { assert.NoError(t, err) // Test getting dead pools - reaper := newDeadPoolReaper(ns, pool, []string{}, 0) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) - assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": nil}, deadPools) + assert.Equal(t, poolsJobs{"2": {"type1", "type2"}, "3": nil}, deadPools) } func TestReaperLock(t *testing.T) { @@ -397,7 +398,7 @@ func TestReaperLock(t *testing.T) { ns := "work" cleanKeyspace(ns, pool) - reaper := newDeadPoolReaper(ns, pool, []string{}, 0) + reaper := newDeadPoolReaper(ns, pool, []string{}, 0, nil) value, err := genValue() assert.NoError(t, err) @@ -474,10 +475,10 @@ func TestDeadPoolReaperGetUnknownPools(t *testing.T) { assert.NoError(t, conn.Flush()) // Run test - reaper := newDeadPoolReaper(ns, pool, jobNames, 0) + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil) unknownPools, err := reaper.getUnknownPools() assert.NoError(t, err) - assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, unknownPools) + assert.Equal(t, poolsJobs{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, unknownPools) } func TestDeadPoolReaperClearUnknownPool(t *testing.T) { @@ -533,8 +534,8 @@ func TestDeadPoolReaperClearUnknownPool(t *testing.T) { assert.NoError(t, conn.Flush()) // Run test - reaper := newDeadPoolReaper(ns, pool, jobNames, 0) - err = reaper.clearUnknownPools() + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil) + _, err = reaper.clearUnknownPools() assert.NoError(t, err) nLock1, err := redis.Int(conn.Do("GET", lock1)) @@ -595,8 +596,8 @@ func TestDeadPoolReaperRemoveDanglingLocks(t *testing.T) { assert.NoError(t, conn.Flush()) - reaper := newDeadPoolReaper(ns, pool, jobNames, 0) - err = reaper.removeDanglingLocks() + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, nil) + _, err = reaper.removeDanglingLocks() assert.NoError(t, err) // Checks @@ -612,3 +613,56 @@ func TestDeadPoolReaperRemoveDanglingLocks(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 0, nLock3) } + +func TestDeadPoolReaperHook(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + + workerPoolID1, workerPoolID2 := "1", "2" + job1, job2 := "type1", "type2" + jobNames := []string{job1, job2} + lock2 := redisKeyJobsLock(ns, job2) + lockInfo2 := redisKeyJobsLockInfo(ns, job2) + + conn := pool.Get() + defer conn.Close() + + workerPoolsKey := redisKeyWorkerPools(ns) + + // Stale heartbeat + var err error + err = conn.Send("SADD", workerPoolsKey, workerPoolID1) + assert.NoError(t, err) + + err = conn.Send("HMSET", redisKeyHeartbeat(ns, "1"), + "heartbeat_at", time.Now().Add(-1*time.Hour).Unix(), + "job_names", job1, + ) + assert.NoError(t, err) + + // Unknown pool and dangling lock + err = conn.Send("SET", lock2, 2) + assert.NoError(t, err) + + err = conn.Send("HMSET", lockInfo2, + workerPoolID2, 1, // unknown workerPoolID2 holds 1 locks on job1 + ) + assert.NoError(t, err) + + assert.NoError(t, conn.Flush()) + + noPoolHeartBeatJobs := []string{job1} + unknownPoolJobs := []string{job2} + danglingLockJobs := []string{job2} + + reaper := newDeadPoolReaper(ns, pool, jobNames, 0, func() func(ReapResult) { + return func(rr ReapResult) { + assert.NoError(t, rr.Err) + assert.Equal(t, noPoolHeartBeatJobs, rr.NoPoolHeartBeatJobs) + assert.Equal(t, unknownPoolJobs, rr.UnknownPoolJobs) + assert.Equal(t, danglingLockJobs, rr.DanglingLockJobs) + } + }) + require.NoError(t, reaper.reap()) +} diff --git a/go.mod b/go.mod index 54203cf8..4596d805 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( go.opentelemetry.io/otel v1.11.2 go.opentelemetry.io/otel/sdk v1.11.2 go.opentelemetry.io/otel/trace v1.11.2 - go.uber.org/multierr v1.8.0 ) require ( @@ -21,7 +20,6 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.uber.org/atomic v1.7.0 // indirect golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index c291d9c6..1b0dbee2 100644 --- a/go.sum +++ b/go.sum @@ -28,7 +28,6 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -40,16 +39,11 @@ go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNX go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU= go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0= go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= -go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc= golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/redis.go b/redis.go index 3d323c0b..4e702eb0 100644 --- a/redis.go +++ b/redis.go @@ -71,6 +71,10 @@ func redisKeyJobsLock(namespace, jobName string) string { return redisKeyJobs(namespace, jobName) + ":lock" } +func redisJobNameFromLockKey(namespace, key string) string { + return redisJobNameFromKey(namespace, strings.TrimSuffix(key, ":lock")) +} + func redisKeyJobsLockInfo(namespace, jobName string) string { return redisKeyJobs(namespace, jobName) + ":lock_info" } diff --git a/worker_pool.go b/worker_pool.go index 26016d73..88875e23 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -33,6 +33,8 @@ type WorkerPool struct { reapPeriod time.Duration deadPoolReaper *deadPoolReaper periodicEnqueuer *periodicEnqueuer + + reaperHook ReaperHook } type jobType struct { @@ -280,7 +282,13 @@ func (wp *WorkerPool) startRequeuers() { wp.retrier = newRequeuer(wp.namespace, wp.pool, redisKeyRetry(wp.namespace), jobNames) wp.scheduler = newRequeuer(wp.namespace, wp.pool, redisKeyScheduled(wp.namespace), jobNames) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, jobNames, wp.reapPeriod) + wp.deadPoolReaper = newDeadPoolReaper( + wp.namespace, + wp.pool, + jobNames, + wp.reapPeriod, + wp.reaperHook, + ) wp.retrier.start() wp.scheduler.start() wp.deadPoolReaper.start() @@ -550,3 +558,10 @@ func WithReapPeriod(p time.Duration) WorkerPoolOption { wp.reapPeriod = p } } + +// WithReaperHook registers a hook to monitor the reaper's actions. +func WithReaperHook(h ReaperHook) WorkerPoolOption { + return func(wp *WorkerPool) { + wp.reaperHook = h + } +}