Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions controllers/reconciler_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ReconcilerBase[T commonapi.ObjectStruct, PT commonapi.PCopyableObjectStruct
reconciliationSeqNo uint32

// Debouncer used to schedule reconciliations.
debouncer *reconcilerDebouncer[struct{}]
debouncer *reconcilerDebouncer

// Channel used to trigger programmatic reconciliations
notifyRunChanged *concurrency.UnboundedChan[ctrl_event.GenericEvent]
Expand All @@ -66,16 +66,19 @@ func NewReconcilerBase[T commonapi.ObjectStruct, PT commonapi.PCopyableObjectStr
log logr.Logger,
lifetimeCtx context.Context,
) *ReconcilerBase[T, PT] {
return &ReconcilerBase[T, PT]{
rb := &ReconcilerBase[T, PT]{
Client: client,
NoCacheClient: noCacheClient,
Log: log,
debouncer: newReconcilerDebouncer[struct{}](),
notifyRunChanged: concurrency.NewUnboundedChan[ctrl_event.GenericEvent](lifetimeCtx),
LifetimeCtx: lifetimeCtx,
conflictBackoff: &syncmap.Map[types.NamespacedName, *backoff.ExponentialBackOff]{},
kind: reflect.TypeFor[T]().Name(),
}
rb.debouncer = newReconcilerDebouncer(func(name types.NamespacedName) {
rb.doScheduleReconciliation(name)
})
return rb
}

// Marks the startup of another reconciliation.
Expand All @@ -100,16 +103,18 @@ func (rb *ReconcilerBase[T, PT]) StartReconciliation(req ctrl.Request) (ctrl_cli

// Schedules reconciliation for specific object identified by namespaced name.
func (rb *ReconcilerBase[T, PT]) ScheduleReconciliation(nn types.NamespacedName) {
rb.debouncer.ReconciliationNeeded(rb.LifetimeCtx, nn, struct{}{}, func(rti reconcileTriggerInput[struct{}]) {
var obj PT = new(T)
om := obj.GetObjectMeta()
om.Name = rti.target.Name
om.Namespace = rti.target.Namespace
event := ctrl_event.GenericEvent{
Object: obj,
}
rb.notifyRunChanged.In <- event
})
rb.debouncer.ReconciliationNeeded(rb.LifetimeCtx, nn)
}

func (rb *ReconcilerBase[T, PT]) doScheduleReconciliation(target types.NamespacedName) {
var obj PT = new(T)
om := obj.GetObjectMeta()
om.Name = target.Name
om.Namespace = target.Namespace
event := ctrl_event.GenericEvent{
Object: obj,
}
rb.notifyRunChanged.In <- event
}

// Schedules reconciliation for specific object identified by namespaced name, delaying it by specified duration.
Expand Down
52 changes: 24 additions & 28 deletions controllers/reconciler_debouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,51 @@ import (
// ReconcilerDebouncer helps debounce calls that trigger reconciliation. Useful for processing external events
// that require reconciliation for an object, and that might come in "batches" of quick succession.

type reconcileTriggerInput[ReconcileInput any] struct {
target types.NamespacedName
input ReconcileInput
}

type reconcileTrigger[ReconcileInput any] func(reconcileTriggerInput[ReconcileInput])
type reconcileTrigger func(types.NamespacedName)

type objectDebounce[ReconcileInput any] resiliency.DebounceLastAction[reconcileTriggerInput[ReconcileInput]]
type objectDebounce struct {
debounce *resiliency.DebounceLastAction
}

func (od *objectDebounce[ReconcileInput]) run(ctx context.Context, input reconcileTriggerInput[ReconcileInput]) {
debounceRaw := resiliency.DebounceLastAction[reconcileTriggerInput[ReconcileInput]](*od)
debounceRaw.Run(ctx, input)
func (od *objectDebounce) run(ctx context.Context) {
od.debounce.Run(ctx)
}

type reconcilerDebouncer[ReconcileInput any] struct {
debounceMap *syncmap.Map[types.NamespacedName, *objectDebounce[ReconcileInput]]
type reconcilerDebouncer struct {
debounceMap *syncmap.Map[types.NamespacedName, *objectDebounce]
debounceDelay time.Duration
maxDelay time.Duration
trigger reconcileTrigger
}

func newReconcilerDebouncer[ReconcileInput any]() *reconcilerDebouncer[ReconcileInput] {
return &reconcilerDebouncer[ReconcileInput]{
debounceMap: &syncmap.Map[types.NamespacedName, *objectDebounce[ReconcileInput]]{},
func newReconcilerDebouncer(trigger reconcileTrigger) *reconcilerDebouncer {
return &reconcilerDebouncer{
debounceMap: &syncmap.Map[types.NamespacedName, *objectDebounce]{},
debounceDelay: reconciliationDebounceDelay,
maxDelay: reconciliationMaxDelay,
trigger: trigger,
}
}

func objectDebounceFactory[ReconcileInput any](trigger reconcileTrigger[ReconcileInput], debounceDelay, maxDelay time.Duration) *objectDebounce[ReconcileInput] {
debounce := resiliency.NewDebounceLastAction(trigger, debounceDelay, maxDelay)
return (*objectDebounce[ReconcileInput])(debounce)
func objectDebounceFactory(trigger reconcileTrigger, name types.NamespacedName, debounceDelay, maxDelay time.Duration) *objectDebounce {
return &objectDebounce{
debounce: resiliency.NewDebounceLastAction(func() {
trigger(name)
}, debounceDelay, maxDelay),
}
}

// Call OnReconcile when an object is being reconciled. This prevents the inner debouncer map from growing indefinitely.
// This method is safe to call regardless whether the name was ever seen.
func (rd *reconcilerDebouncer[ReconcileInput]) OnReconcile(name types.NamespacedName) {
func (rd *reconcilerDebouncer) OnReconcile(name types.NamespacedName) {
rd.debounceMap.Delete(name)
}

// Tries to trigger reconciliation for an object identified by name, after appropriate debouncing.
func (rd *reconcilerDebouncer[ReconcileInput]) ReconciliationNeeded(ctx context.Context, name types.NamespacedName, ri ReconcileInput, trigger reconcileTrigger[ReconcileInput]) {
debounce, _ := rd.debounceMap.LoadOrStoreNew(name, func() *objectDebounce[ReconcileInput] {
return objectDebounceFactory(trigger, rd.debounceDelay, rd.maxDelay)
func (rd *reconcilerDebouncer) ReconciliationNeeded(ctx context.Context, name types.NamespacedName) {
debounce, _ := rd.debounceMap.LoadOrStoreNew(name, func() *objectDebounce {
return objectDebounceFactory(rd.trigger, name, rd.debounceDelay, rd.maxDelay)
})

input := reconcileTriggerInput[ReconcileInput]{
target: name,
input: ri,
}

debounce.run(ctx, input)
debounce.run(ctx)
}
10 changes: 10 additions & 0 deletions internal/dcp/bootstrap/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/fs"
"os"
Expand Down Expand Up @@ -65,6 +66,15 @@ func GetExtensions(ctx context.Context, log logr.Logger) ([]DcpExtension, error)
extensions := []DcpExtension{}

for _, extDir := range extDirs {
if _, statErr := os.Stat(extDir); statErr != nil {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to debouncer changes--just a small fix for unnecessary error I saw in the logs.

if errors.Is(statErr, os.ErrNotExist) {
log.V(1).Info("Extensions directory does not exist, extensions will not be loaded from this directory..", "Directory", extDir)
continue
} else {
return nil, fmt.Errorf("could not access extensions directory '%s': %w", extDir, statErr)
}
}

// Evaluate symlinks for the directory
realExtDir, evalErr := filepath.EvalSymlinks(extDir)
if evalErr != nil {
Expand Down
109 changes: 53 additions & 56 deletions pkg/resiliency/debounce_last.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,94 +11,91 @@ import (
"time"
)

type Runner[T any, R any] interface {
~func(T) (R, error)
}

type ResultWithError[R any] struct {
V R
Err error
}

type runState[R any] struct {
timer *time.Timer
threshold time.Time
doneC chan struct{}
res ResultWithError[R]
}

// DebounceLast calls the runner function after the specified delay, but only if no new calls have arrived in the meantime.
// If new calls arrive, the runner will be delayed further, but no more than maxDelay.
// After the runner function completes, the callers of Run() will all receive the same result (and error, if any).
type DebounceLast[T any, R any, RF Runner[T, R]] struct {
delay time.Duration
maxDelay time.Duration
threshold time.Time
timer *time.Timer
runC chan struct{}
m *sync.Mutex
runner RF
res *ResultWithError[R]
type DebounceLast[R any] struct {
delay time.Duration
maxDelay time.Duration
run *runState[R]
m *sync.Mutex
runner func() (R, error)
}

func NewDebounceLast[T any, R any, RF Runner[T, R]](runner RF, delay, maxDelay time.Duration) *DebounceLast[T, R, RF] {
return &DebounceLast[T, R, RF]{
func NewDebounceLast[R any](runner func() (R, error), delay, maxDelay time.Duration) *DebounceLast[R] {
return &DebounceLast[R]{
delay: delay,
maxDelay: maxDelay,
runner: runner,
m: &sync.Mutex{},
}
}

func (dl *DebounceLast[T, R, RF]) Run(ctx context.Context, arg T) (R, error) {
func (dl *DebounceLast[R]) Run(ctx context.Context) (R, error) {
dl.m.Lock()

var runC chan struct{}
var res *ResultWithError[R]
var run *runState[R]

if dl.runC == nil {
if dl.run == nil {
// New run
dl.timer = time.NewTimer(dl.delay)
dl.runC = make(chan struct{}, 1)
dl.threshold = time.Now().Add(dl.maxDelay)
dl.res = &ResultWithError[R]{}
runC = dl.runC
res = dl.res

go dl.execRunnerIfThresholdExceeded(ctx, arg)
run = &runState[R]{
timer: time.NewTimer(dl.delay),
threshold: time.Now().Add(dl.maxDelay),
doneC: make(chan struct{}),
}
dl.run = run

go dl.execRunnerIfThresholdExceeded(ctx, run)
} else {
// Run in progress
runC = dl.runC
res = dl.res
if time.Now().Add(dl.delay).Before(dl.threshold) {
dl.timer.Reset(dl.delay)
run = dl.run
if time.Now().Add(dl.delay).Before(run.threshold) {
run.timer.Reset(dl.delay)
}
}
dl.m.Unlock()

<-runC
return res.V, res.Err
<-run.doneC
return run.res.V, run.res.Err
}

// The helper goroutine that will be woken up periodically and run the runner if the threshold is exceeded.
func (dl *DebounceLast[T, R, RF]) execRunnerIfThresholdExceeded(ctx context.Context, arg T) {
defer func() {
dl.timer.Stop()
close(dl.runC)
dl.runC = nil
dl.threshold = time.Time{}
dl.m.Unlock()
}()
func (dl *DebounceLast[R]) execRunnerIfThresholdExceeded(ctx context.Context, run *runState[R]) {
var val R
var err error

select {

case <-dl.timer.C:
func() {
var val R
var err error
func() {
defer dl.m.Lock()
val, err = dl.runner(arg)
}()
dl.res.V = val
dl.res.Err = err
}()
case <-run.timer.C:
dl.stopCurrentRun(run)
val, err = dl.runner()

case <-ctx.Done():
dl.m.Lock()
dl.res.V, dl.res.Err = *new(R), ctx.Err()
dl.stopCurrentRun(run)
val, err = *new(R), ctx.Err()
}

run.timer.Stop()
run.res.V = val
run.res.Err = err
close(run.doneC)
}

func (dl *DebounceLast[R]) stopCurrentRun(run *runState[R]) {
dl.m.Lock()
defer dl.m.Unlock()

if dl.run == run {
dl.run = nil
}
}
Loading
Loading