From 9cf17789fcd0ec893b2284a6db5fe590de081a7d Mon Sep 17 00:00:00 2001 From: Karol Zadora-Przylecki Date: Thu, 19 Mar 2026 11:00:23 -0700 Subject: [PATCH 1/5] Remove arg parameter from DebounceLast Run() method This parameter does not make sense--there is no way to account for different values of the parameter when multiple invocations are "squished" into one by debouncing. --- pkg/resiliency/debounce_last.go | 20 +++++------ pkg/resiliency/debounce_last_test.go | 53 ++++++++++++++-------------- 2 files changed, 35 insertions(+), 38 deletions(-) diff --git a/pkg/resiliency/debounce_last.go b/pkg/resiliency/debounce_last.go index 13868215..998a65bd 100644 --- a/pkg/resiliency/debounce_last.go +++ b/pkg/resiliency/debounce_last.go @@ -11,10 +11,6 @@ import ( "time" ) -type Runner[T any, R any] interface { - ~func(T) (R, error) -} - type ResultWithError[R any] struct { V R Err error @@ -23,19 +19,19 @@ type ResultWithError[R any] struct { // DebounceLast calls the runner function after the specified delay, but only if no new calls have arrived in the meantime. // If new calls arrive, the runner will be delayed further, but no more than maxDelay. // After the runner function completes, the callers of Run() will all receive the same result (and error, if any). -type DebounceLast[T any, R any, RF Runner[T, R]] struct { +type DebounceLast[R any] struct { delay time.Duration maxDelay time.Duration threshold time.Time timer *time.Timer runC chan struct{} m *sync.Mutex - runner RF + runner func() (R, error) res *ResultWithError[R] } -func NewDebounceLast[T any, R any, RF Runner[T, R]](runner RF, delay, maxDelay time.Duration) *DebounceLast[T, R, RF] { - return &DebounceLast[T, R, RF]{ +func NewDebounceLast[R any](runner func() (R, error), delay, maxDelay time.Duration) *DebounceLast[R] { + return &DebounceLast[R]{ delay: delay, maxDelay: maxDelay, runner: runner, @@ -43,7 +39,7 @@ func NewDebounceLast[T any, R any, RF Runner[T, R]](runner RF, delay, maxDelay t } } -func (dl *DebounceLast[T, R, RF]) Run(ctx context.Context, arg T) (R, error) { +func (dl *DebounceLast[R]) Run(ctx context.Context) (R, error) { dl.m.Lock() var runC chan struct{} @@ -58,7 +54,7 @@ func (dl *DebounceLast[T, R, RF]) Run(ctx context.Context, arg T) (R, error) { runC = dl.runC res = dl.res - go dl.execRunnerIfThresholdExceeded(ctx, arg) + go dl.execRunnerIfThresholdExceeded(ctx) } else { // Run in progress runC = dl.runC @@ -74,7 +70,7 @@ func (dl *DebounceLast[T, R, RF]) Run(ctx context.Context, arg T) (R, error) { } // The helper goroutine that will be woken up periodically and run the runner if the threshold is exceeded. -func (dl *DebounceLast[T, R, RF]) execRunnerIfThresholdExceeded(ctx context.Context, arg T) { +func (dl *DebounceLast[R]) execRunnerIfThresholdExceeded(ctx context.Context) { defer func() { dl.timer.Stop() close(dl.runC) @@ -91,7 +87,7 @@ func (dl *DebounceLast[T, R, RF]) execRunnerIfThresholdExceeded(ctx context.Cont var err error func() { defer dl.m.Lock() - val, err = dl.runner(arg) + val, err = dl.runner() }() dl.res.V = val dl.res.Err = err diff --git a/pkg/resiliency/debounce_last_test.go b/pkg/resiliency/debounce_last_test.go index b6988818..1c8f7590 100644 --- a/pkg/resiliency/debounce_last_test.go +++ b/pkg/resiliency/debounce_last_test.go @@ -19,8 +19,8 @@ import ( func TestExecutesRunnerAfterDelay(t *testing.T) { t.Parallel() - runner := func(i int) (int, error) { - return i, nil + runner := func() (int, error) { + return 7, nil } const debounceDelay = time.Millisecond * 100 @@ -31,7 +31,7 @@ func TestExecutesRunnerAfterDelay(t *testing.T) { defer cancel() start := time.Now() - res, err := deb.Run(ctx, 7) + res, err := deb.Run(ctx) finish := time.Now() require.NoError(t, err) @@ -44,7 +44,7 @@ func TestExecutesRunnerAfterDelay(t *testing.T) { func TestReturnsErrorFromRunner(t *testing.T) { t.Parallel() - runner := func(i int) (int, error) { + runner := func() (int, error) { return 0, fmt.Errorf("sorry") } @@ -56,7 +56,7 @@ func TestReturnsErrorFromRunner(t *testing.T) { defer cancel() start := time.Now() - _, err := deb.Run(ctx, 7) + _, err := deb.Run(ctx) finish := time.Now() require.Error(t, err) @@ -67,8 +67,9 @@ func TestReturnsErrorFromRunner(t *testing.T) { func TestDebouncesRapidInvocations(t *testing.T) { t.Parallel() - runner := func(i int) (int, error) { - return i, nil + runnerCalls := atomic.Int32{} + runner := func() (int, error) { + return int(runnerCalls.Add(1)), nil } const debounceDelay = time.Millisecond * 200 @@ -78,7 +79,6 @@ func TestDebouncesRapidInvocations(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testTimeoutDelay) defer cancel() - const resultShift = 10 const numCalls = 5 results := make([]int, numCalls) @@ -86,14 +86,14 @@ func TestDebouncesRapidInvocations(t *testing.T) { wg.Add(numCalls) start := time.Now() - // Make numCalls calls to runner, in order, as fast as possible + // Make numCalls calls to runner as fast as possible. for i := 0; i < numCalls; i++ { - go func(j int) { - res, err := deb.Run(ctx, j) + go func(index int) { + res, err := deb.Run(ctx) require.NoError(t, err) - results[j-resultShift] = res + results[index] = res wg.Done() - }(i + resultShift) + }(i) } wg.Wait() @@ -103,10 +103,8 @@ func TestDebouncesRapidInvocations(t *testing.T) { require.IsNonIncreasing(t, results) require.IsNonDecreasing(t, results) - // It is not predictable what the result will be, because the order of goroutines executing the call - // is random, but they should be within [resultShift, resultShift + numCalls). - require.GreaterOrEqual(t, results[0], resultShift) - require.Less(t, results[0], resultShift+numCalls) + require.Equal(t, 1, results[0]) + require.Equal(t, int32(1), runnerCalls.Load()) require.WithinRange(t, finish, start.Add(debounceDelay), time.Now().Add(testTimeoutDelay)) } @@ -114,8 +112,10 @@ func TestDebouncesRapidInvocations(t *testing.T) { func TestDebounceIsReusable(t *testing.T) { t.Parallel() - runner := func(i int32) (int32, error) { - return i, nil + runnerCalls := atomic.Int32{} + runner := func() (int32, error) { + runnerCalls.Add(1) + return 1, nil } const debounceDelay = time.Millisecond * 150 @@ -134,7 +134,7 @@ func TestDebounceIsReusable(t *testing.T) { for i := 0; i < numCalls; i++ { go func() { - res, err := deb.Run(ctx, 1) + res, err := deb.Run(ctx) require.NoError(t, err) atomic.AddInt32(&sum, res) wg.Done() @@ -152,24 +152,25 @@ func TestDebounceIsReusable(t *testing.T) { // Verify all calls were made and the results add up to expected value require.WithinRange(t, finish, start.Add(debounceDelay), testTimeout) require.Equal(t, int32(numCalls), sum) + require.Equal(t, int32(1), runnerCalls.Load()) // Now the same debounce should be ready for another round of calls - const secondRunResultShift = 10 - sum = secondRunResultShift + sum = 0 start = time.Now() makeCalls() finish = time.Now() require.WithinRange(t, finish, start.Add(debounceDelay), testTimeout) - require.Equal(t, int32(numCalls+secondRunResultShift), sum) + require.Equal(t, int32(numCalls), sum) + require.Equal(t, int32(2), runnerCalls.Load()) } func TestReturnsErrorIfContextCancelled(t *testing.T) { t.Parallel() - runner := func(i int) (int, error) { - return i, nil + runner := func() (int, error) { + return 7, nil } const debounceDelay = time.Millisecond * 500 @@ -180,7 +181,7 @@ func TestReturnsErrorIfContextCancelled(t *testing.T) { start := time.Now() ctx, cancel := context.WithTimeout(context.Background(), contextTimeoutDelay) defer cancel() - _, err := deb.Run(ctx, 7) + _, err := deb.Run(ctx) finish := time.Now() require.ErrorIs(t, err, context.DeadlineExceeded) From 425a6d5da00feff471a1e43fb7a980d71bd36eb0 Mon Sep 17 00:00:00 2001 From: Karol Zadora-Przylecki Date: Thu, 19 Mar 2026 11:24:45 -0700 Subject: [PATCH 2/5] DebounceLast: start new run just before runner function starts executing This covers the common use case where Run() callers set up some state that the runner function subsequently acts upon. --- pkg/resiliency/debounce_last.go | 97 ++++++++++++++-------------- pkg/resiliency/debounce_last_test.go | 76 ++++++++++++++++++++++ 2 files changed, 125 insertions(+), 48 deletions(-) diff --git a/pkg/resiliency/debounce_last.go b/pkg/resiliency/debounce_last.go index 998a65bd..7da5939f 100644 --- a/pkg/resiliency/debounce_last.go +++ b/pkg/resiliency/debounce_last.go @@ -16,18 +16,22 @@ type ResultWithError[R any] struct { Err error } +type runState[R any] struct { + timer *time.Timer + threshold time.Time + doneC chan struct{} + res ResultWithError[R] +} + // DebounceLast calls the runner function after the specified delay, but only if no new calls have arrived in the meantime. // If new calls arrive, the runner will be delayed further, but no more than maxDelay. // After the runner function completes, the callers of Run() will all receive the same result (and error, if any). type DebounceLast[R any] struct { - delay time.Duration - maxDelay time.Duration - threshold time.Time - timer *time.Timer - runC chan struct{} - m *sync.Mutex - runner func() (R, error) - res *ResultWithError[R] + delay time.Duration + maxDelay time.Duration + run *runState[R] + m *sync.Mutex + runner func() (R, error) } func NewDebounceLast[R any](runner func() (R, error), delay, maxDelay time.Duration) *DebounceLast[R] { @@ -42,59 +46,56 @@ func NewDebounceLast[R any](runner func() (R, error), delay, maxDelay time.Durat func (dl *DebounceLast[R]) Run(ctx context.Context) (R, error) { dl.m.Lock() - var runC chan struct{} - var res *ResultWithError[R] + var run *runState[R] - if dl.runC == nil { + if dl.run == nil { // New run - dl.timer = time.NewTimer(dl.delay) - dl.runC = make(chan struct{}, 1) - dl.threshold = time.Now().Add(dl.maxDelay) - dl.res = &ResultWithError[R]{} - runC = dl.runC - res = dl.res - - go dl.execRunnerIfThresholdExceeded(ctx) + run = &runState[R]{ + timer: time.NewTimer(dl.delay), + threshold: time.Now().Add(dl.maxDelay), + doneC: make(chan struct{}), + } + dl.run = run + + go dl.execRunnerIfThresholdExceeded(ctx, run) } else { // Run in progress - runC = dl.runC - res = dl.res - if time.Now().Add(dl.delay).Before(dl.threshold) { - dl.timer.Reset(dl.delay) + run = dl.run + if time.Now().Add(dl.delay).Before(run.threshold) { + run.timer.Reset(dl.delay) } } dl.m.Unlock() - <-runC - return res.V, res.Err + <-run.doneC + return run.res.V, run.res.Err } -// The helper goroutine that will be woken up periodically and run the runner if the threshold is exceeded. -func (dl *DebounceLast[R]) execRunnerIfThresholdExceeded(ctx context.Context) { - defer func() { - dl.timer.Stop() - close(dl.runC) - dl.runC = nil - dl.threshold = time.Time{} - dl.m.Unlock() - }() +func (dl *DebounceLast[R]) execRunnerIfThresholdExceeded(ctx context.Context, run *runState[R]) { + var val R + var err error select { - - case <-dl.timer.C: - func() { - var val R - var err error - func() { - defer dl.m.Lock() - val, err = dl.runner() - }() - dl.res.V = val - dl.res.Err = err - }() + case <-run.timer.C: + dl.stopCurrentRun(run) + val, err = dl.runner() case <-ctx.Done(): - dl.m.Lock() - dl.res.V, dl.res.Err = *new(R), ctx.Err() + dl.stopCurrentRun(run) + val, err = *new(R), ctx.Err() + } + + run.timer.Stop() + run.res.V = val + run.res.Err = err + close(run.doneC) +} + +func (dl *DebounceLast[R]) stopCurrentRun(run *runState[R]) { + dl.m.Lock() + defer dl.m.Unlock() + + if dl.run == run { + dl.run = nil } } diff --git a/pkg/resiliency/debounce_last_test.go b/pkg/resiliency/debounce_last_test.go index 1c8f7590..f9dd98ab 100644 --- a/pkg/resiliency/debounce_last_test.go +++ b/pkg/resiliency/debounce_last_test.go @@ -166,6 +166,82 @@ func TestDebounceIsReusable(t *testing.T) { require.Equal(t, int32(2), runnerCalls.Load()) } +func TestRunDuringRunnerExecutionStartsNewRun(t *testing.T) { + t.Parallel() + + const debounceDelay = time.Millisecond * 500 + const testTimeoutDelay = time.Second * 5 + + firstStarted := make(chan struct{}) + secondStarted := make(chan struct{}) + releaseFirst := make(chan struct{}) + releaseSecond := make(chan struct{}) + runnerCalls := atomic.Int32{} + + runner := func() (int32, error) { + call := runnerCalls.Add(1) + + switch call { + case 1: + close(firstStarted) + <-releaseFirst + return 1, nil + case 2: + close(secondStarted) + <-releaseSecond + return 2, nil + default: + return 0, fmt.Errorf("unexpected runner call: %d", call) + } + } + + deb := NewDebounceLast(runner, debounceDelay, testTimeoutDelay) + ctx, cancel := context.WithTimeout(context.Background(), testTimeoutDelay) + defer cancel() + + firstResultC := make(chan ResultWithError[int32], 1) + secondResultC := make(chan ResultWithError[int32], 1) + + go func() { + v, err := deb.Run(ctx) + firstResultC <- ResultWithError[int32]{V: v, Err: err} + }() + + select { + case <-firstStarted: + case <-time.After(testTimeoutDelay): + require.FailNow(t, "first runner call did not start in time") + } + + go func() { + v, err := deb.Run(ctx) + secondResultC <- ResultWithError[int32]{V: v, Err: err} + }() + + select { + case <-secondStarted: + case <-time.After(testTimeoutDelay): + require.FailNow(t, "second runner call did not start as a new run") + } + + close(releaseFirst) + firstResult := <-firstResultC + require.NoError(t, firstResult.Err) + require.Equal(t, int32(1), firstResult.V) + + select { + case secondResult := <-secondResultC: + require.FailNowf(t, "second run should still be waiting", "unexpected result: %+v", secondResult) + case <-time.After(debounceDelay): + } + + close(releaseSecond) + secondResult := <-secondResultC + require.NoError(t, secondResult.Err) + require.Equal(t, int32(2), secondResult.V) + require.Equal(t, int32(2), runnerCalls.Load()) +} + func TestReturnsErrorIfContextCancelled(t *testing.T) { t.Parallel() From db79d02ee06321c7ed5aeec4bc91e74ae80956ff Mon Sep 17 00:00:00 2001 From: Karol Zadora-Przylecki Date: Thu, 19 Mar 2026 11:38:15 -0700 Subject: [PATCH 3/5] Remove arg parameter from DebounceLastAction and simplify reconciler debouncer --- controllers/reconciler_base.go | 10 +-- controllers/reconciler_debouncer.go | 50 ++++++-------- pkg/resiliency/debounce_last_action.go | 67 +++++++++++-------- pkg/resiliency/debounce_last_action_test.go | 74 ++++++++++++++++++--- 4 files changed, 130 insertions(+), 71 deletions(-) diff --git a/controllers/reconciler_base.go b/controllers/reconciler_base.go index 3ee50cb2..3be257cc 100644 --- a/controllers/reconciler_base.go +++ b/controllers/reconciler_base.go @@ -44,7 +44,7 @@ type ReconcilerBase[T commonapi.ObjectStruct, PT commonapi.PCopyableObjectStruct reconciliationSeqNo uint32 // Debouncer used to schedule reconciliations. - debouncer *reconcilerDebouncer[struct{}] + debouncer *reconcilerDebouncer // Channel used to trigger programmatic reconciliations notifyRunChanged *concurrency.UnboundedChan[ctrl_event.GenericEvent] @@ -70,7 +70,7 @@ func NewReconcilerBase[T commonapi.ObjectStruct, PT commonapi.PCopyableObjectStr Client: client, NoCacheClient: noCacheClient, Log: log, - debouncer: newReconcilerDebouncer[struct{}](), + debouncer: newReconcilerDebouncer(), notifyRunChanged: concurrency.NewUnboundedChan[ctrl_event.GenericEvent](lifetimeCtx), LifetimeCtx: lifetimeCtx, conflictBackoff: &syncmap.Map[types.NamespacedName, *backoff.ExponentialBackOff]{}, @@ -100,11 +100,11 @@ func (rb *ReconcilerBase[T, PT]) StartReconciliation(req ctrl.Request) (ctrl_cli // Schedules reconciliation for specific object identified by namespaced name. func (rb *ReconcilerBase[T, PT]) ScheduleReconciliation(nn types.NamespacedName) { - rb.debouncer.ReconciliationNeeded(rb.LifetimeCtx, nn, struct{}{}, func(rti reconcileTriggerInput[struct{}]) { + rb.debouncer.ReconciliationNeeded(rb.LifetimeCtx, nn, func(target types.NamespacedName) { var obj PT = new(T) om := obj.GetObjectMeta() - om.Name = rti.target.Name - om.Namespace = rti.target.Namespace + om.Name = target.Name + om.Namespace = target.Namespace event := ctrl_event.GenericEvent{ Object: obj, } diff --git a/controllers/reconciler_debouncer.go b/controllers/reconciler_debouncer.go index 0e04937c..74712615 100644 --- a/controllers/reconciler_debouncer.go +++ b/controllers/reconciler_debouncer.go @@ -18,55 +18,49 @@ import ( // ReconcilerDebouncer helps debounce calls that trigger reconciliation. Useful for processing external events // that require reconciliation for an object, and that might come in "batches" of quick succession. -type reconcileTriggerInput[ReconcileInput any] struct { - target types.NamespacedName - input ReconcileInput -} - -type reconcileTrigger[ReconcileInput any] func(reconcileTriggerInput[ReconcileInput]) +type reconcileTrigger func(types.NamespacedName) -type objectDebounce[ReconcileInput any] resiliency.DebounceLastAction[reconcileTriggerInput[ReconcileInput]] +type objectDebounce struct { + debounce *resiliency.DebounceLastAction +} -func (od *objectDebounce[ReconcileInput]) run(ctx context.Context, input reconcileTriggerInput[ReconcileInput]) { - debounceRaw := resiliency.DebounceLastAction[reconcileTriggerInput[ReconcileInput]](*od) - debounceRaw.Run(ctx, input) +func (od *objectDebounce) run(ctx context.Context) { + od.debounce.Run(ctx) } -type reconcilerDebouncer[ReconcileInput any] struct { - debounceMap *syncmap.Map[types.NamespacedName, *objectDebounce[ReconcileInput]] +type reconcilerDebouncer struct { + debounceMap *syncmap.Map[types.NamespacedName, *objectDebounce] debounceDelay time.Duration maxDelay time.Duration } -func newReconcilerDebouncer[ReconcileInput any]() *reconcilerDebouncer[ReconcileInput] { - return &reconcilerDebouncer[ReconcileInput]{ - debounceMap: &syncmap.Map[types.NamespacedName, *objectDebounce[ReconcileInput]]{}, +func newReconcilerDebouncer() *reconcilerDebouncer { + return &reconcilerDebouncer{ + debounceMap: &syncmap.Map[types.NamespacedName, *objectDebounce]{}, debounceDelay: reconciliationDebounceDelay, maxDelay: reconciliationMaxDelay, } } -func objectDebounceFactory[ReconcileInput any](trigger reconcileTrigger[ReconcileInput], debounceDelay, maxDelay time.Duration) *objectDebounce[ReconcileInput] { - debounce := resiliency.NewDebounceLastAction(trigger, debounceDelay, maxDelay) - return (*objectDebounce[ReconcileInput])(debounce) +func objectDebounceFactory(trigger reconcileTrigger, name types.NamespacedName, debounceDelay, maxDelay time.Duration) *objectDebounce { + return &objectDebounce{ + debounce: resiliency.NewDebounceLastAction(func() { + trigger(name) + }, debounceDelay, maxDelay), + } } // Call OnReconcile when an object is being reconciled. This prevents the inner debouncer map from growing indefinitely. // This method is safe to call regardless whether the name was ever seen. -func (rd *reconcilerDebouncer[ReconcileInput]) OnReconcile(name types.NamespacedName) { +func (rd *reconcilerDebouncer) OnReconcile(name types.NamespacedName) { rd.debounceMap.Delete(name) } // Tries to trigger reconciliation for an object identified by name, after appropriate debouncing. -func (rd *reconcilerDebouncer[ReconcileInput]) ReconciliationNeeded(ctx context.Context, name types.NamespacedName, ri ReconcileInput, trigger reconcileTrigger[ReconcileInput]) { - debounce, _ := rd.debounceMap.LoadOrStoreNew(name, func() *objectDebounce[ReconcileInput] { - return objectDebounceFactory(trigger, rd.debounceDelay, rd.maxDelay) +func (rd *reconcilerDebouncer) ReconciliationNeeded(ctx context.Context, name types.NamespacedName, trigger reconcileTrigger) { + debounce, _ := rd.debounceMap.LoadOrStoreNew(name, func() *objectDebounce { + return objectDebounceFactory(trigger, name, rd.debounceDelay, rd.maxDelay) }) - input := reconcileTriggerInput[ReconcileInput]{ - target: name, - input: ri, - } - - debounce.run(ctx, input) + debounce.run(ctx) } diff --git a/pkg/resiliency/debounce_last_action.go b/pkg/resiliency/debounce_last_action.go index 5d4ee04a..c5bf2daa 100644 --- a/pkg/resiliency/debounce_last_action.go +++ b/pkg/resiliency/debounce_last_action.go @@ -11,27 +11,30 @@ import ( "time" ) +type runStateAction struct { + timer *time.Timer + threshold time.Time +} + // A variant of DebounceLast that calls an "action" (function with no return value). // Because there is no return value, the Run method does not wait for the action to complete // (the action is executed in a separate goroutine, fully asynchronously). // The action will be called after the specified delay, but only if no new calls arrive in the meantime. // If new calls arrive, the action will be delayed further, but no more than maxDelay. -type DebounceLastAction[T any] struct { - delay time.Duration - maxDelay time.Duration - timer *time.Timer - threshold time.Time - runC chan struct{} - m *sync.Mutex - action func(T) +type DebounceLastAction struct { + delay time.Duration + maxDelay time.Duration + run *runStateAction + m *sync.Mutex + action func() } -func NewDebounceLastAction[T any](action func(T), delay, maxDelay time.Duration) *DebounceLastAction[T] { +func NewDebounceLastAction(action func(), delay, maxDelay time.Duration) *DebounceLastAction { if maxDelay < delay { maxDelay = delay } - return &DebounceLastAction[T]{ + return &DebounceLastAction{ delay: delay, maxDelay: maxDelay, action: action, @@ -39,38 +42,44 @@ func NewDebounceLastAction[T any](action func(T), delay, maxDelay time.Duration) } } -func (dl *DebounceLastAction[T]) Run(ctx context.Context, arg T) { +func (dl *DebounceLastAction) Run(ctx context.Context) { dl.m.Lock() - defer dl.m.Unlock() - if dl.runC == nil { + if dl.run == nil { // New run - dl.timer = time.NewTimer(dl.delay) - dl.runC = make(chan struct{}, 1) - dl.threshold = time.Now().Add(dl.maxDelay) - go dl.execRunnerIfThresholdExceeded(ctx, arg) + run := &runStateAction{ + timer: time.NewTimer(dl.delay), + threshold: time.Now().Add(dl.maxDelay), + } + dl.run = run + dl.m.Unlock() + + go dl.execRunnerIfThresholdExceeded(ctx, run) + return } else { - if time.Now().Add(dl.delay).Before(dl.threshold) { - dl.timer.Reset(dl.delay) + if time.Now().Add(dl.delay).Before(dl.run.threshold) { + dl.run.timer.Reset(dl.delay) } } + + dl.m.Unlock() } -func (dl *DebounceLastAction[T]) execRunnerIfThresholdExceeded(ctx context.Context, arg T) { +func (dl *DebounceLastAction) execRunnerIfThresholdExceeded(ctx context.Context, run *runStateAction) { select { - case <-dl.timer.C: - dl.stopCurrentRun() - dl.action(arg) + case <-run.timer.C: + dl.stopCurrentRun(run) + dl.action() case <-ctx.Done(): - dl.stopCurrentRun() + dl.stopCurrentRun(run) } } -func (dl *DebounceLastAction[T]) stopCurrentRun() { +func (dl *DebounceLastAction) stopCurrentRun(run *runStateAction) { dl.m.Lock() defer dl.m.Unlock() - dl.timer.Stop() - close(dl.runC) - dl.runC = nil - dl.threshold = time.Time{} + run.timer.Stop() + if dl.run == run { + dl.run = nil + } } diff --git a/pkg/resiliency/debounce_last_action_test.go b/pkg/resiliency/debounce_last_action_test.go index 06d203c4..6915d031 100644 --- a/pkg/resiliency/debounce_last_action_test.go +++ b/pkg/resiliency/debounce_last_action_test.go @@ -23,12 +23,12 @@ func TestExecutesActionRunnerAfterDelay(t *testing.T) { const testTimeoutDelay = time.Millisecond * 1000 done := make(chan struct{}) - deb := NewDebounceLastAction(func(_ int) { close(done) }, debounceDelay, testTimeoutDelay) + deb := NewDebounceLastAction(func() { close(done) }, debounceDelay, testTimeoutDelay) ctx, cancel := context.WithTimeout(context.Background(), testTimeoutDelay) defer cancel() start := time.Now() - deb.Run(ctx, 0) + deb.Run(ctx) <-done finish := time.Now() @@ -46,8 +46,8 @@ func TestDebounceActionRapidInvocations(t *testing.T) { const debounceDelay = time.Millisecond * 200 const testTimeoutDelay = time.Millisecond * 1000 - deb := NewDebounceLastAction(func(c *atomic.Int32) { - c.Add(1) + deb := NewDebounceLastAction(func() { + counter.Add(1) lastFinished.Store(time.Now()) }, debounceDelay, testTimeoutDelay) ctx, cancel := context.WithTimeout(context.Background(), testTimeoutDelay) @@ -56,7 +56,7 @@ func TestDebounceActionRapidInvocations(t *testing.T) { start := time.Now() // Make numCalls calls to runner, in order, as fast as possible for i := 0; i < numCalls; i++ { - deb.Run(ctx, &counter) + deb.Run(ctx) } time.Sleep(testTimeoutDelay) @@ -73,7 +73,7 @@ func TestDebounceActionIsReusable(t *testing.T) { const numCalls = 3 event := concurrency.NewAutoResetEvent(false) - deb := NewDebounceLastAction(func(e *concurrency.AutoResetEvent) { e.Set() }, debounceDelay, testTimeoutDelay) + deb := NewDebounceLastAction(func() { event.Set() }, debounceDelay, testTimeoutDelay) ctx, cancel := context.WithTimeout(context.Background(), testTimeoutDelay) defer cancel() @@ -81,7 +81,7 @@ func TestDebounceActionIsReusable(t *testing.T) { event.Clear() for i := 0; i < numCalls; i++ { - deb.Run(ctx, event) + deb.Run(ctx) } <-event.Wait() @@ -110,12 +110,12 @@ func TestDebounceActionDoesNotExecuteRunnerIfContextCancelled(t *testing.T) { const contextTimeoutDelay = time.Millisecond * 100 called := atomic.Bool{} - deb := NewDebounceLastAction(func(b *atomic.Bool) { b.Store(true) }, debounceDelay, time.Second) + deb := NewDebounceLastAction(func() { called.Store(true) }, debounceDelay, time.Second) start := time.Now() ctx, cancel := context.WithTimeout(context.Background(), contextTimeoutDelay) defer cancel() - deb.Run(ctx, &called) + deb.Run(ctx) <-ctx.Done() finish := time.Now() @@ -125,3 +125,59 @@ func TestDebounceActionDoesNotExecuteRunnerIfContextCancelled(t *testing.T) { // before debounceDelay. require.WithinRange(t, finish, start.Add(contextTimeoutDelay), start.Add(debounceDelay)) } + +func TestRunDuringActionExecutionStartsNewRun(t *testing.T) { + t.Parallel() + + const debounceDelay = time.Millisecond * 20 + const testTimeoutDelay = time.Second * 2 + + firstStarted := make(chan struct{}) + secondStarted := make(chan struct{}) + firstFinished := make(chan struct{}) + secondFinished := make(chan struct{}) + releaseFirst := make(chan struct{}) + releaseSecond := make(chan struct{}) + actionCalls := atomic.Int32{} + + deb := NewDebounceLastAction(func() { + call := actionCalls.Add(1) + switch call { + case 1: + close(firstStarted) + <-releaseFirst + close(firstFinished) + case 2: + close(secondStarted) + <-releaseSecond + close(secondFinished) + } + }, debounceDelay, testTimeoutDelay) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeoutDelay) + defer cancel() + + deb.Run(ctx) + + select { + case <-firstStarted: + case <-time.After(testTimeoutDelay): + require.FailNow(t, "first action did not start in time") + } + + deb.Run(ctx) + + select { + case <-secondStarted: + case <-time.After(testTimeoutDelay): + require.FailNow(t, "second action did not start as a new run") + } + + close(releaseFirst) + <-firstFinished + + close(releaseSecond) + <-secondFinished + + require.Equal(t, int32(2), actionCalls.Load()) +} From 1471fa57667d1df648646706e5ebb222a46f574b Mon Sep 17 00:00:00 2001 From: Karol Zadora-Przylecki Date: Fri, 20 Mar 2026 09:41:45 -0700 Subject: [PATCH 4/5] Do not log errors if extension directory does not exist --- internal/dcp/bootstrap/extensions.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/dcp/bootstrap/extensions.go b/internal/dcp/bootstrap/extensions.go index c8cddbdd..19ca420d 100644 --- a/internal/dcp/bootstrap/extensions.go +++ b/internal/dcp/bootstrap/extensions.go @@ -9,6 +9,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io/fs" "os" @@ -65,6 +66,15 @@ func GetExtensions(ctx context.Context, log logr.Logger) ([]DcpExtension, error) extensions := []DcpExtension{} for _, extDir := range extDirs { + if _, statErr := os.Stat(extDir); statErr != nil { + if errors.Is(statErr, os.ErrNotExist) { + log.V(1).Info("Extensions directory does not exist, extensions will not be loaded from this directory..", "Directory", extDir) + continue + } else { + return nil, fmt.Errorf("could not access extensions directory '%s': %w", extDir, statErr) + } + } + // Evaluate symlinks for the directory realExtDir, evalErr := filepath.EvalSymlinks(extDir) if evalErr != nil { From 7cc0caea17e49c180450c422c261ede5dd1d0e7e Mon Sep 17 00:00:00 2001 From: Karol Zadora-Przylecki Date: Fri, 20 Mar 2026 14:55:16 -0700 Subject: [PATCH 5/5] Reconcile trigger is not really changing from one reconciliation to the next --- controllers/reconciler_base.go | 29 +++++++++++++++++------------ controllers/reconciler_debouncer.go | 8 +++++--- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/controllers/reconciler_base.go b/controllers/reconciler_base.go index 3be257cc..5a3129ae 100644 --- a/controllers/reconciler_base.go +++ b/controllers/reconciler_base.go @@ -66,16 +66,19 @@ func NewReconcilerBase[T commonapi.ObjectStruct, PT commonapi.PCopyableObjectStr log logr.Logger, lifetimeCtx context.Context, ) *ReconcilerBase[T, PT] { - return &ReconcilerBase[T, PT]{ + rb := &ReconcilerBase[T, PT]{ Client: client, NoCacheClient: noCacheClient, Log: log, - debouncer: newReconcilerDebouncer(), notifyRunChanged: concurrency.NewUnboundedChan[ctrl_event.GenericEvent](lifetimeCtx), LifetimeCtx: lifetimeCtx, conflictBackoff: &syncmap.Map[types.NamespacedName, *backoff.ExponentialBackOff]{}, kind: reflect.TypeFor[T]().Name(), } + rb.debouncer = newReconcilerDebouncer(func(name types.NamespacedName) { + rb.doScheduleReconciliation(name) + }) + return rb } // Marks the startup of another reconciliation. @@ -100,16 +103,18 @@ func (rb *ReconcilerBase[T, PT]) StartReconciliation(req ctrl.Request) (ctrl_cli // Schedules reconciliation for specific object identified by namespaced name. func (rb *ReconcilerBase[T, PT]) ScheduleReconciliation(nn types.NamespacedName) { - rb.debouncer.ReconciliationNeeded(rb.LifetimeCtx, nn, func(target types.NamespacedName) { - var obj PT = new(T) - om := obj.GetObjectMeta() - om.Name = target.Name - om.Namespace = target.Namespace - event := ctrl_event.GenericEvent{ - Object: obj, - } - rb.notifyRunChanged.In <- event - }) + rb.debouncer.ReconciliationNeeded(rb.LifetimeCtx, nn) +} + +func (rb *ReconcilerBase[T, PT]) doScheduleReconciliation(target types.NamespacedName) { + var obj PT = new(T) + om := obj.GetObjectMeta() + om.Name = target.Name + om.Namespace = target.Namespace + event := ctrl_event.GenericEvent{ + Object: obj, + } + rb.notifyRunChanged.In <- event } // Schedules reconciliation for specific object identified by namespaced name, delaying it by specified duration. diff --git a/controllers/reconciler_debouncer.go b/controllers/reconciler_debouncer.go index 74712615..7b14070b 100644 --- a/controllers/reconciler_debouncer.go +++ b/controllers/reconciler_debouncer.go @@ -32,13 +32,15 @@ type reconcilerDebouncer struct { debounceMap *syncmap.Map[types.NamespacedName, *objectDebounce] debounceDelay time.Duration maxDelay time.Duration + trigger reconcileTrigger } -func newReconcilerDebouncer() *reconcilerDebouncer { +func newReconcilerDebouncer(trigger reconcileTrigger) *reconcilerDebouncer { return &reconcilerDebouncer{ debounceMap: &syncmap.Map[types.NamespacedName, *objectDebounce]{}, debounceDelay: reconciliationDebounceDelay, maxDelay: reconciliationMaxDelay, + trigger: trigger, } } @@ -57,9 +59,9 @@ func (rd *reconcilerDebouncer) OnReconcile(name types.NamespacedName) { } // Tries to trigger reconciliation for an object identified by name, after appropriate debouncing. -func (rd *reconcilerDebouncer) ReconciliationNeeded(ctx context.Context, name types.NamespacedName, trigger reconcileTrigger) { +func (rd *reconcilerDebouncer) ReconciliationNeeded(ctx context.Context, name types.NamespacedName) { debounce, _ := rd.debounceMap.LoadOrStoreNew(name, func() *objectDebounce { - return objectDebounceFactory(trigger, name, rd.debounceDelay, rd.maxDelay) + return objectDebounceFactory(rd.trigger, name, rd.debounceDelay, rd.maxDelay) }) debounce.run(ctx)