From a5c808a60f499468300888ded9584dfa9c6daea4 Mon Sep 17 00:00:00 2001 From: Eric Millin Date: Mon, 15 Mar 2021 09:07:10 -0400 Subject: [PATCH 1/3] Fix timer leak on closed watch channel --- pkg/wait/poll_watcher.go | 4 +++- pkg/wait/wait_for_ready.go | 22 +++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go index cdfb524afa..dd23ffee8a 100644 --- a/pkg/wait/poll_watcher.go +++ b/pkg/wait/poll_watcher.go @@ -27,6 +27,8 @@ import ( "k8s.io/client-go/rest" ) +const pollInterval = time.Second + // PollInterval determines when you should poll. Useful to mock out, or for // replacing with exponential backoff later. type PollInterval interface { @@ -76,7 +78,7 @@ func NewWatcher(watchFunc watchF, c rest.Interface, ns string, resource string, } polling := &pollingWatcher{ c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}, - newTickerPollInterval(time.Second), nativePoll(c, ns, resource, name)} + newTickerPollInterval(pollInterval), nativePoll(c, ns, resource, name)} polling.start() return polling, nil } diff --git a/pkg/wait/wait_for_ready.go b/pkg/wait/wait_for_ready.go index 00b1b3d3eb..b135857298 100644 --- a/pkg/wait/wait_for_ready.go +++ b/pkg/wait/wait_for_ready.go @@ -111,20 +111,20 @@ func NoopMessageCallback() MessageCallback { func (w *waitForReadyConfig) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) { timeout := options.timeoutWithDefault() - floatingTimeout := timeout + timeoutTimer := time.NewTimer(timeout) for { start := time.Now() - retry, timeoutReached, err := w.waitForReadyCondition(watcher, start, floatingTimeout, options.errorWindowWithDefault(), msgCallback) + retry, timeoutReached, err := w.waitForReadyCondition(watcher, start, timeoutTimer, options.errorWindowWithDefault(), msgCallback) if err != nil { return err, time.Since(start) } - floatingTimeout = floatingTimeout - time.Since(start) - if timeoutReached || floatingTimeout < 0 { + if timeoutReached { return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start) } if retry { - // restart loop + // sleep to prevent CPU pegging and restart the loop + time.Sleep(pollInterval) continue } return nil, time.Since(start) @@ -137,7 +137,9 @@ func (w *waitForReadyConfig) Wait(watcher watch.Interface, name string, options // An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning // an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime // for the "Ready" condition, then the method continues to wait. -func (w *waitForReadyConfig) waitForReadyCondition(watcher watch.Interface, start time.Time, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) { +func (w *waitForReadyConfig) waitForReadyCondition( + watcher watch.Interface, start time.Time, timeoutTimer *time.Timer, errorWindow time.Duration, msgCallback MessageCallback, +) (retry bool, timeoutReached bool, err error) { // channel used to transport the error that has been received errChan := make(chan error) @@ -154,7 +156,7 @@ func (w *waitForReadyConfig) waitForReadyCondition(watcher watch.Interface, star for { select { - case <-time.After(timeout): + case <-timeoutTimer.C: // We reached a timeout without receiving a "Ready" == "True" event return false, true, nil case err = <-errChan: @@ -163,7 +165,8 @@ func (w *waitForReadyConfig) waitForReadyCondition(watcher watch.Interface, star return false, false, err case event, ok := <-watcher.ResultChan(): if !ok || event.Object == nil { - return true, false, nil + // retry only if the channel is still open + return ok, false, nil } // Check whether resource is in sync already (meta.generation == status.observedGeneration) @@ -236,15 +239,12 @@ func (w *waitForEvent) Wait(watcher watch.Interface, name string, options Option timeout := options.timeoutWithDefault() start := time.Now() // channel used to transport the error - errChan := make(chan error) timer := time.NewTimer(timeout) defer timer.Stop() for { select { case <-timer.C: return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start) - case err := <-errChan: - return err, time.Since(start) case event := <-watcher.ResultChan(): if w.eventDone(&event) { return nil, time.Since(start) From fb43c1ffc34a1fa650c3e0f4a6730bb99023e6a1 Mon Sep 17 00:00:00 2001 From: Eric Millin Date: Mon, 15 Mar 2021 10:23:14 -0400 Subject: [PATCH 2/3] Update changelog --- CHANGELOG.adoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 2c54c45737..3443cede28 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -17,6 +17,10 @@ |=== | | Description | PR +| 🐛 +| Fix memory leak on closed watch channel +| https://github.com/knative/client/pull/1263[#1263] + | 🎁 | Add `--prune` & `--prune-all` options to delete the unreferenced revisions | https://github.com/knative/client/pull/1217[#1217] From 9fc6dc655262fe40c67255d1ca50eb4eb341ba28 Mon Sep 17 00:00:00 2001 From: Eric Millin Date: Mon, 15 Mar 2021 11:29:21 -0400 Subject: [PATCH 3/3] Stop watch timer on Wait exit --- pkg/wait/wait_for_ready.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/wait/wait_for_ready.go b/pkg/wait/wait_for_ready.go index b135857298..1cc29f47c2 100644 --- a/pkg/wait/wait_for_ready.go +++ b/pkg/wait/wait_for_ready.go @@ -112,6 +112,7 @@ func (w *waitForReadyConfig) Wait(watcher watch.Interface, name string, options timeout := options.timeoutWithDefault() timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() for { start := time.Now() retry, timeoutReached, err := w.waitForReadyCondition(watcher, start, timeoutTimer, options.errorWindowWithDefault(), msgCallback)