From 6c49578cf94355db9d22d1420fdfec0289449c2b Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Mon, 11 Sep 2023 14:08:45 -0700 Subject: [PATCH 1/5] pkg/cvo/availableupdates: Return a copy in getAvailableUpdates The function had returned the original pointer since it landed in db150e6db4 (cvo: Perform status updates in a single thread, 2018-11-03, #45). But locking the operator structure to return a pointer reference is a bit risky, because after the lock is released you're still holding a pointer into that data, but lack easy access to the lock to guard against simultaneous access. For example, you could have setAvailableUpdates updating the structure, while simultaneously operatorMetrics.Collect, Operator.syncStatus, or Operator.mergeReleaseMetadata is looking at their pointer reference to the old data. There wasn't actually much exposure, because writes all happened to flow through setAvailableUpdates, and setAvailableUpdates's only changes were: * Bumping the u.LastSyncOrConfigChange Time. * Replacing the availableUpdates pointer with a new pointer. and neither of those should significantly disrupt any of the consumers. But switching to a copy doesn't cost much resource wise, and it protects us from a number of possible ways that this could break in the future if setAvailableUpdates does less full-pointer-replacement or one of the consumers starts to care about LastSyncOrConfigChange reliably lining up with the rest of the availableUpdates content. It does mean we need to update the copy logic as we add new properties to the structure, but we'd need to do that even if we used deepcopy-gen or similar to automate the copy generation. --- pkg/cvo/availableupdates.go | 61 +++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/pkg/cvo/availableupdates.go b/pkg/cvo/availableupdates.go index ee5cb4a0a8..3391ff4c9a 100644 --- a/pkg/cvo/availableupdates.go +++ b/pkg/cvo/availableupdates.go @@ -48,6 +48,7 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1 optrAvailableUpdates := optr.getAvailableUpdates() if optrAvailableUpdates == nil { klog.V(2).Info("First attempt to retrieve available updates") + optrAvailableUpdates = &availableUpdates{} } else if !optrAvailableUpdates.RecentlyChanged(optr.minimumUpdateCheckInterval) { klog.V(2).Infof("Retrieving available updates again, because more than %s has elapsed since %s", optr.minimumUpdateCheckInterval, optrAvailableUpdates.LastAttempt.Format(time.RFC3339)) } else if channel != optrAvailableUpdates.Channel { @@ -83,27 +84,25 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1 } } - if usedDefaultUpstream { - upstream = "" - } - if optr.injectClusterIdIntoPromQL { conditionalUpdates = injectClusterIdIntoConditionalUpdates(clusterId, conditionalUpdates) } - au := &availableUpdates{ - Upstream: upstream, - Channel: config.Spec.Channel, - Architecture: desiredArch, - Current: current, - Updates: updates, - ConditionalUpdates: conditionalUpdates, - ConditionRegistry: optr.conditionRegistry, - Condition: condition, + if usedDefaultUpstream { + upstream = "" } - au.evaluateConditionalUpdates(ctx) - optr.setAvailableUpdates(au) + optrAvailableUpdates.Upstream = upstream + optrAvailableUpdates.Channel = channel + optrAvailableUpdates.Architecture = desiredArch + optrAvailableUpdates.Current = current + optrAvailableUpdates.Updates = updates + optrAvailableUpdates.ConditionalUpdates = conditionalUpdates + optrAvailableUpdates.ConditionRegistry = optr.conditionRegistry + optrAvailableUpdates.Condition = condition + + optrAvailableUpdates.evaluateConditionalUpdates(ctx) + optr.setAvailableUpdates(optrAvailableUpdates) // requeue optr.queue.Add(optr.queueKey()) @@ -206,7 +205,37 @@ func (optr *Operator) setAvailableUpdates(u *availableUpdates) { func (optr *Operator) getAvailableUpdates() *availableUpdates { optr.statusLock.Lock() defer optr.statusLock.Unlock() - return optr.availableUpdates + + if optr.availableUpdates == nil { + return nil + } + + u := &availableUpdates{ + Upstream: optr.availableUpdates.Upstream, + Channel: optr.availableUpdates.Channel, + Architecture: optr.availableUpdates.Architecture, + LastAttempt: optr.availableUpdates.LastAttempt, + LastSyncOrConfigChange: optr.availableUpdates.LastSyncOrConfigChange, + Current: *optr.availableUpdates.Current.DeepCopy(), + ConditionRegistry: optr.availableUpdates.ConditionRegistry, // intentionally not a copy, to preserve cache state + Condition: optr.availableUpdates.Condition, + } + + if optr.availableUpdates.Updates != nil { + u.Updates = make([]configv1.Release, 0, len(optr.availableUpdates.Updates)) + for _, update := range optr.availableUpdates.Updates { + u.Updates = append(u.Updates, *update.DeepCopy()) + } + } + + if optr.availableUpdates.ConditionalUpdates != nil { + u.ConditionalUpdates = make([]configv1.ConditionalUpdate, 0, len(optr.availableUpdates.ConditionalUpdates)) + for _, conditionalUpdate := range optr.availableUpdates.ConditionalUpdates { + u.ConditionalUpdates = append(u.ConditionalUpdates, *conditionalUpdate.DeepCopy()) + } + } + + return u } // getArchitecture returns the currently determined cluster architecture. From 965bfb28440c0c1cd47f74bf160a6d24abc7a540 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Mon, 18 Sep 2023 15:58:37 -0700 Subject: [PATCH 2/5] pkg/cvo/availableupdates: Requeue risk evaluation on failure Instead of waiting for the next round of evaluation, which might take minutes. For example, in 4.14.0-rc.1 testing [1]: $ curl -s https://gcsweb-ci.apps.ci.l2s4.p1.openshiftapps.com/gcs/origin-ci-test/logs/periodic-ci-openshift-release-master-nightly-4.14-e2e-aws-ovn-serial/1702743868887273472/artifacts/e2e-aws-ovn-serial/gather-extra/artifacts/pods/openshift-cluster-version_cluster-version-operator-78644f4679-q8sdm_cluster-version-operator.log | grep availableupdate I0915 18:21:24.184272 1 availableupdates.go:50] First attempt to retrieve available updates I0915 18:21:24.845512 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:21:24Z, will try later. I0915 18:21:39.836566 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:21:24Z, will try later. I0915 18:21:39.843398 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:21:24Z, will try later. I0915 18:21:54.835464 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:21:24Z, will try later. I0915 18:23:16.769850 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:21:24Z, will try later. I0915 18:23:16.784421 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:21:24Z, will try later. I0915 18:23:39.842269 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:21:24Z, will try later. I0915 18:23:39.862590 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:21:24Z, will try later. I0915 18:24:09.837669 1 availableupdates.go:52] Retrieving available updates again, because more than 2m28.489200644s has elapsed since 2023-09-15T18:21:24Z I0915 18:24:24.843569 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:24:09Z, will try later. I0915 18:25:24.839869 1 availableupdates.go:58] Available updates were recently retrieved, with less than 2m28.489200644s elapsed since 2023-09-15T18:24:09Z, will try later. ... I0915 20:26:07.109093 1 availableupdates.go:52] Retrieving available updates again, because more than 2m28.489200644s has elapsed since 2023-09-15T20:22:23Z I0915 20:29:50.769739 1 availableupdates.go:52] Retrieving available updates again, because more than 2m28.489200644s has elapsed since 2023-09-15T20:26:07Z I0915 20:33:34.432215 1 availableupdates.go:52] Retrieving available updates again, because more than 2m28.489200644s has elapsed since 2023-09-15T20:29:50Z I0915 20:37:18.093261 1 availableupdates.go:52] Retrieving available updates again, because more than 2m28.489200644s has elapsed since 2023-09-15T20:33:34Z I'm not entirely clear on what the triggers were there, with 3m44s between those final entries. Operator.Run sets up: wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second) and [2] docs UntilWithContext: UntilWithContext loops until context is done, running f every period. UntilWithContext is syntactic sugar on top of JitterUntilWithContext with zero jitter factor and with sliding = true (which means the timer for period starts after the f completes). So that should be waking up, draining the queue, sleeping a second, waking back up, draining the queue again, and on forever. Perhaps we are just backing off to the slowest DefaultControllerRateLimiter period [3], but I expect processNextWorkItem's calling handleErr is calling Forget on the queue, because I don't see any of its error-branch logging: $ curl -s https://gcsweb-ci.apps.ci.l2s4.p1.openshiftapps.com/gcs/origin-ci-test/logs/periodic-ci-openshift-release-master-nightly-4.14-e2e-aws-ovn-serial/1702743868887273472/artifacts/e2e-aws-ovn-serial/gather-extra/artifacts/pods/openshift-cluster-version_cluster-version-operator-78644f4679-q8sdm_cluster-version-operator.log | grep 'Error handling\|out of the queue' ...no hits... That suggests nothing is slowing down our queue processing from once-per-second (plus evaluation time). But what's feeding the queue items to process? There only Add calls seem to be in clusterVersionEventHandler, but checking audit logs: $ curl -s https://gcsweb-ci.apps.ci.l2s4.p1.openshiftapps.com/gcs/origin-ci-test/logs/periodic-ci-openshift-release-master-nightly-4.14-e2e-aws-ovn-serial/1702743868887273472/artifacts/e2e-aws-ovn-serial/gather-audit-logs/artifacts/audit-logs.tar | tar -xz --strip-components=2 $ zgrep -h clusterversion kube-apiserver/*audit*.log.gz | jq -r 'select(.verb != "get" and .verb != "list" and .verb != "watch") | .stageTimestamp + " " + (.responseStatus.code | tostring) + " " + .verb + " " + .objectRef.subresource + " " + .user.username' | sort ... 2023-09-15T18:26:24.841812Z 200 update status system:serviceaccount:openshift-cluster-version:default 2023-09-15T18:26:24.858507Z 200 update status system:serviceaccount:openshift-cluster-version:default 2023-09-15T18:29:39.835307Z 200 update status system:serviceaccount:openshift-cluster-version:default 2023-09-15T18:37:39.836698Z 200 update status system:serviceaccount:openshift-cluster-version:default which are all hours before these 20:26 and similar update retrievals. I suspect this due to resyncPeriod(o.ResyncInterval) being passed to NewFilteredSharedInformerFactory when generating the ClusterVersion informer, putting a lower bound on the UpdateFunc event-handler frequency. My goal is to set the stage for faster cache-warming after receiving a batch of new PromQL update risks, as described in 530a5092a6 (pkg/cvo/availableupdates: Prioritize conditional risks for largest target version, 2023-03-06, #909). I still have not adjusted the caching logic, so at the moment, it only gives us faster updates on the "that PromQL is still throttled" loop. The AddAfter avoids hot-looping on: 1. Can I evaluate the risks? 2. No? Requeue and return to 1 right now. and instead gives us: 1. Can I evaluate the risks? 2. No? Requeue and return to 1 around a second from know. The new addUpdate avoids injecting the same Recommended=True target into availableUpdates multiple times while trying to evaluate another conditional update, now that we have the !needFreshFetch case, where we recycle the previous structure data without the fresh Cincinnati fetch to clear earlier additions. Without the addUpdate pivot, we get [5]: status: availableUpdates: - image: registry.ci.openshift.org/ocp/release@sha256:e385a786f122c6c0e8848ecb9901f510676438f17af8a5c4c206807a9bc0bf28 version: 4.15.0-0.nightly-2023-10-19-222222 - image: registry.ci.openshift.org/ocp/release@sha256:e385a786f122c6c0e8848ecb9901f510676438f17af8a5c4c206807a9bc0bf28 version: 4.15.0-0.nightly-2023-10-19-222222 - image: registry.ci.openshift.org/ocp/release@sha256:e385a786f122c6c0e8848ecb9901f510676438f17af8a5c4c206807a9bc0bf28 version: 4.15.0-0.nightly-2023-10-19-222222 ... conditionalUpdates: - conditions: - lastTransitionTime: "2023-09-21T09:29:30Z" message: The update is recommended, because none of the conditional update risks apply to this cluster. reason: AsExpected status: "True" type: Recommended release: image: registry.ci.openshift.org/ocp/release@sha256:e385a786f122c6c0e8848ecb9901f510676438f17af8a5c4c206807a9bc0bf28 version: 4.15.0-0.nightly-2023-10-19-222222 risks: - matchingRules: - promql: promql: |- cluster_infrastructure_provider{type=~"nonexist"} or 0 * cluster_infrastructure_provider type: PromQL message: Clusters on nonexist provider, this imaginary bug can happen. name: SomeInfrastructureThing url: https://bug.example.com/c ... - conditions: - lastTransitionTime: "2023-09-21T09:29:31Z" message: |- On clusters on default invoker user, this imaginary bug can happen. https://bug.example.com/a Could not evaluate exposure to update risk SomeChannelThing (evaluation is throttled until 09:29:32Z) SomeChannelThing description: On clusters with the channel set to 'buggy', this imaginary bug can happen. SomeChannelThing URL: https://bug.example.com/b reason: MultipleReasons status: "False" type: Recommended release: image: registry.ci.openshift.org/ocp/release@sha256:66c753e8b75d172f2a3f7ba13363383a76ecbc7ecdc00f3a423bef4ea8560405 version: 4.15.0-0.nightly-2023-10-17-000000 risks: - matchingRules: - promql: promql: cluster_installer type: PromQL message: On clusters on default invoker user, this imaginary bug can happen. name: SomeInvokerThing url: https://bug.example.com/a - matchingRules: - promql: promql: |- group(cluster_version_available_updates{channel="buggy"}) or 0 * group(cluster_version_available_updates{channel!="buggy"}) type: PromQL message: On clusters with the channel set to 'buggy', this imaginary bug can happen. name: SomeChannelThing url: https://bug.example.com/b lasting until the next Cincinnati fetch cleared out the availableUpdates redundancy. [1]: https://prow.ci.openshift.org/view/gs/origin-ci-test/logs/periodic-ci-openshift-release-master-nightly-4.14-e2e-aws-ovn-serial/1702743868887273472 [2]: https://pkg.go.dev/k8s.io/apimachinery/pkg/util/wait#UntilWithContext [3]: https://pkg.go.dev/k8s.io/client-go/util/workqueue#DefaultControllerRateLimiter [4]: https://github.com/kubernetes/client-go/blob/v0.28.2/util/workqueue/default_rate_limiters.go#L39 [5]: https://github.com/openshift/cluster-version-operator/pull/939#issuecomment-1729229326 --- pkg/cvo/availableupdates.go | 99 ++++++++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/pkg/cvo/availableupdates.go b/pkg/cvo/availableupdates.go index 3391ff4c9a..90495c4cdc 100644 --- a/pkg/cvo/availableupdates.go +++ b/pkg/cvo/availableupdates.go @@ -46,6 +46,7 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1 // updates are only checked at most once per minimumUpdateCheckInterval or if the generation changes optrAvailableUpdates := optr.getAvailableUpdates() + needFreshFetch := true if optrAvailableUpdates == nil { klog.V(2).Info("First attempt to retrieve available updates") optrAvailableUpdates = &availableUpdates{} @@ -56,24 +57,38 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1 } else if desiredArch != optrAvailableUpdates.Architecture { klog.V(2).Infof("Retrieving available updates again, because the architecture has changed from %q to %q", optrAvailableUpdates.Architecture, desiredArch) } else if upstream == optrAvailableUpdates.Upstream || (upstream == optr.defaultUpstreamServer && optrAvailableUpdates.Upstream == "") { - klog.V(2).Infof("Available updates were recently retrieved, with less than %s elapsed since %s, will try later.", optr.minimumUpdateCheckInterval, optrAvailableUpdates.LastAttempt.Format(time.RFC3339)) - return nil + needsConditionalUpdateEval := false + for _, conditionalUpdate := range optrAvailableUpdates.ConditionalUpdates { + if recommended := meta.FindStatusCondition(conditionalUpdate.Conditions, "Recommended"); recommended == nil { + needsConditionalUpdateEval = true + break + } else if recommended.Status != metav1.ConditionTrue && recommended.Status != metav1.ConditionFalse { + needsConditionalUpdateEval = true + break + } + } + if !needsConditionalUpdateEval { + klog.V(2).Infof("Available updates were recently retrieved, with less than %s elapsed since %s, will try later.", optr.minimumUpdateCheckInterval, optrAvailableUpdates.LastAttempt.Format(time.RFC3339)) + return nil + } + needFreshFetch = false } else { klog.V(2).Infof("Retrieving available updates again, because the upstream has changed from %q to %q", optrAvailableUpdates.Upstream, config.Spec.Upstream) } - transport, err := optr.getTransport() - if err != nil { - return err - } + if needFreshFetch { + transport, err := optr.getTransport() + if err != nil { + return err + } + + userAgent := optr.getUserAgent() + clusterId := string(config.Spec.ClusterID) - userAgent := optr.getUserAgent() - clusterId := string(config.Spec.ClusterID) - current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, clusterId, - transport, userAgent, upstream, desiredArch, currentArch, channel, optr.release.Version, optr.conditionRegistry) + current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, clusterId, + transport, userAgent, upstream, desiredArch, currentArch, channel, optr.release.Version, optr.conditionRegistry) - // Populate conditions on conditional updates from operator state - if optrAvailableUpdates != nil { + // Populate conditions on conditional updates from operator state for i := range optrAvailableUpdates.ConditionalUpdates { for j := range conditionalUpdates { if optrAvailableUpdates.ConditionalUpdates[i].Release.Image == conditionalUpdates[j].Release.Image { @@ -82,30 +97,44 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1 } } } - } - if optr.injectClusterIdIntoPromQL { - conditionalUpdates = injectClusterIdIntoConditionalUpdates(clusterId, conditionalUpdates) - } + if optr.injectClusterIdIntoPromQL { + conditionalUpdates = injectClusterIdIntoConditionalUpdates(clusterId, conditionalUpdates) + } - if usedDefaultUpstream { - upstream = "" - } + if usedDefaultUpstream { + upstream = "" + } - optrAvailableUpdates.Upstream = upstream - optrAvailableUpdates.Channel = channel - optrAvailableUpdates.Architecture = desiredArch - optrAvailableUpdates.Current = current - optrAvailableUpdates.Updates = updates - optrAvailableUpdates.ConditionalUpdates = conditionalUpdates - optrAvailableUpdates.ConditionRegistry = optr.conditionRegistry - optrAvailableUpdates.Condition = condition + optrAvailableUpdates.Upstream = upstream + optrAvailableUpdates.Channel = channel + optrAvailableUpdates.Architecture = desiredArch + optrAvailableUpdates.Current = current + optrAvailableUpdates.Updates = updates + optrAvailableUpdates.ConditionalUpdates = conditionalUpdates + optrAvailableUpdates.ConditionRegistry = optr.conditionRegistry + optrAvailableUpdates.Condition = condition + } optrAvailableUpdates.evaluateConditionalUpdates(ctx) + + queueKey := optr.queueKey() + for _, conditionalUpdate := range optrAvailableUpdates.ConditionalUpdates { + if recommended := meta.FindStatusCondition(conditionalUpdate.Conditions, "Recommended"); recommended == nil { + klog.Warningf("Requeue available-update evaluation, because %q lacks a Recommended condition", conditionalUpdate.Release.Version) + optr.availableUpdatesQueue.AddAfter(queueKey, time.Second) + break + } else if recommended.Status != metav1.ConditionTrue && recommended.Status != metav1.ConditionFalse { + klog.V(2).Infof("Requeue available-update evaluation, because %q is %s=%s: %s: %s", conditionalUpdate.Release.Version, recommended.Type, recommended.Status, recommended.Reason, recommended.Message) + optr.availableUpdatesQueue.AddAfter(queueKey, time.Second) + break + } + } + optr.setAvailableUpdates(optrAvailableUpdates) - // requeue - optr.queue.Add(optr.queueKey()) + // queue optr.sync() to update ClusterVersion status + optr.queue.Add(queueKey) return nil } @@ -366,12 +395,22 @@ func (u *availableUpdates) evaluateConditionalUpdates(ctx context.Context) { Reason: "AsExpected", Message: "The update is recommended, because none of the conditional update risks apply to this cluster.", }) - u.Updates = append(u.Updates, conditionalUpdate.Release) + u.addUpdate(conditionalUpdate.Release) } u.ConditionalUpdates[i].Conditions = conditionalUpdate.Conditions } } +func (u *availableUpdates) addUpdate(release configv1.Release) { + for _, update := range u.Updates { + if update.Image == release.Image { + return + } + } + + u.Updates = append(u.Updates, release) +} + func (u *availableUpdates) removeUpdate(image string) { for i, update := range u.Updates { if update.Image == image { From 518b44612dab3b2ddbb5b26f50d1e43f7e5fcba8 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Mon, 18 Sep 2023 22:35:23 -0700 Subject: [PATCH 3/5] pkg/clusterconditions/promql: Reduce MinBetweenMatches to 1s 530a5092a6 (pkg/cvo/availableupdates: Prioritize conditional risks for largest target version, 2023-03-06, #909) prioritized the order in which risks were evaluated. But we were still waiting 10 minutes between different PromQL evaluations while evaluating conditional update risks. The original 10m requirement is from the enhancement [1], and was implemented in ca186eda34 (pkg/clusterconditions/cache: Add a cache wrapper for client-side throttling, 2021-11-10, #663). But discussing with Lala, Scott, and Ben, we feel like the addressing the demonstrated user experience need of low-latency risk evaluation [2] is worth reducing the throttling to 1s per expression evaluation. We still have MinForCondition set to an hour, so with this commit, a cluster-version operator evaluating three risks will move from a timeline like: 1. 0s, hear about risks that depend on PromQL A, B, and C. Evaluate A for the first time. 2. 10m, evaluate B for the first time (MinBetweenMatches after 1). 3. 20m, evaluate C for the first time (MinBetweenMatches after 2). 4. 1h, evaluate A again (MinForCondition after 1, also well past MinBetweenMatches after 3). 5. 1h10m, evaluate B again (MinForCondition after 2 and MinBetweenMatches after 4). 6. 1h20m, evaluate C again (MinForCondition after 3 and MinBetweenMatches after 5). 7. 2h, evaluate A again (MinForCondition after 4, also well past MinBetweenMatches after 6). 8. 2h10m, evaluate B again (MinForCondition after 5 and MinBetweenMatches after 7). 9. 2h20m, evaluate C again (MinForCondition after 6 and MinBetweenMatches after 8). to a timeline like: 1. 0s, hear about risks that depend on PromQL A, B, and C. Evaluate A for the first time. 2. 1s, evaluate B for the first time (MinBetweenMatches after 1). 3. 2s, evaluate C for the first time (MinBetweenMatches after 2). 4. 1h, evaluate A again (MinForCondition after 1, also well past MinBetweenMatches after 3). 5. 1h1s, evaluate B again (MinForCondition after 2 and MinBetweenMatches after 4). 6. 1h2s, evaluate C again (MinForCondition after 3 and MinBetweenMatches after 5). 7. 2h, evaluate A again (MinForCondition after 4, also well past MinBetweenMatches after 6). 8. 2h1s, evaluate B again (MinForCondition after 5 and MinBetweenMatches after 7). 9. 2h2s, evaluate C again (MinForCondition after 6 and MinBetweenMatches after 8). We could deliver faster cache warming while preserving spaced out refresh evaluation by splitting MinBetweenMatches into a 1s MinBetweenMatchesInitial and 10m MinBetweenMatchesWhenCached, which would produce timelines like: 1. 0s, hear about risks that depend on PromQL A, B, and C. Evaluate A for the first time. 2. 1s, evaluate B for the first time (MinBetweenMatchesInitial after 1). 3. 2s, evaluate C for the first time (MinBetweenMatchesInitial after 2). 4. 1h, evaluate A again (MinForCondition after 1, also well past MinBetweenMatchesWhenCached after 3). 5. 1h10m, evaluate B again (MinForCondition after 2 and MinBetweenMatchesWhenCached after 4). 6. 1h20m, evaluate C again (MinForCondition after 3 and MinBetweenMatchesWhenCached after 5). 7. 2h, evaluate A again (MinForCondition after 4, also well past MinBetweenMatchesWhenCached after 6). 8. 2h10m, evaluate B again (MinForCondition after 5 and MinBetweenMatchesWhenCached after 7). 9. 2h20m, evaluate C again (MinForCondition after 6 and MinBetweenMatchesWhenCached after 8). but again discussing with Lala, Scott, and Ben, the code complexity to deliver that distinction does not seem to be worth thet protection it delivers to the PromQL engine. And really, PromQL engines concerned about load should harden themselves, including via Retry-After [3] that allow clients to back off gracefully when the service needs that, instead of relying on clients to guess about the load the service could handle and back off without insight into actual server capacity. [1]: https://github.com/openshift/enhancements/blame/158111ce156aac7fa6063a47c00e129c13033aec/enhancements/update/targeted-update-edge-blocking.md#L323-L325 [2]: https://issues.redhat.com/browse/OCPBUGS-19512 [3]: https://www.rfc-editor.org/rfc/rfc9110#name-retry-after --- pkg/clusterconditions/promql/promql.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/clusterconditions/promql/promql.go b/pkg/clusterconditions/promql/promql.go index b39d89f5df..ebf5577ed4 100644 --- a/pkg/clusterconditions/promql/promql.go +++ b/pkg/clusterconditions/promql/promql.go @@ -62,7 +62,7 @@ func NewPromQL(promqlTarget clusterconditions.PromQLTarget) *cache.Cache { }, QueryTimeout: 5 * time.Minute, }, - MinBetweenMatches: 10 * time.Minute, + MinBetweenMatches: 1 * time.Second, MinForCondition: time.Hour, Expiration: 24 * time.Hour, } From 316cfb3b15bc845381e3b0152fd6f1cd705d3319 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Mon, 18 Sep 2023 22:39:25 -0700 Subject: [PATCH 4/5] pkg/cvo: Debug logging for the availableUpdatesQueue I'm not entirely clear what the call stack is for this queue, so wrap it with logging to make it easier to understand the queue lifecycle. Stack construction is based on [1], but I'm concatenating the function names to try to fit a bit of the stack on that one log line. [1]: https://pkg.go.dev/runtime#example-Frames --- pkg/cvo/cvo.go | 3 +- pkg/internal/debugworkqueue/debugworkqueue.go | 98 +++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 pkg/internal/debugworkqueue/debugworkqueue.go diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index b4cee42104..d12d324a3c 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -44,6 +44,7 @@ import ( cvointernal "github.com/openshift/cluster-version-operator/pkg/cvo/internal" "github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient" "github.com/openshift/cluster-version-operator/pkg/internal" + "github.com/openshift/cluster-version-operator/pkg/internal/debugworkqueue" "github.com/openshift/cluster-version-operator/pkg/payload" "github.com/openshift/cluster-version-operator/pkg/payload/precondition" preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion" @@ -213,7 +214,7 @@ func New( eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"), - availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"), + availableUpdatesQueue: debugworkqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"), upgradeableQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "upgradeable"), exclude: exclude, diff --git a/pkg/internal/debugworkqueue/debugworkqueue.go b/pkg/internal/debugworkqueue/debugworkqueue.go new file mode 100644 index 0000000000..32bca9fedd --- /dev/null +++ b/pkg/internal/debugworkqueue/debugworkqueue.go @@ -0,0 +1,98 @@ +// Package debugworkqueue wraps client-go work queues with trace logging. +package debugworkqueue + +import ( + "runtime" + "strings" + "time" + + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +func NewNamedRateLimitingQueue(rateLimiter workqueue.RateLimiter, name string) workqueue.RateLimitingInterface { + return &rateLimitingInterface{ + name: name, + wrapped: workqueue.NewNamedRateLimitingQueue(rateLimiter, name), + } +} + +type rateLimitingInterface struct { + name string + wrapped workqueue.RateLimitingInterface +} + +func context() string { + context := "unknown stack" + programCounters := make([]uintptr, 5) // ask for the closest 5 entries in the stack + n := runtime.Callers(3, programCounters) + if n > 0 { + programCounters = programCounters[:n] // drop unfilled slots + frames := runtime.CallersFrames(programCounters) + stack := make([]string, 0, n) + for { + frame, more := frames.Next() + stack = append(stack, frame.Function) + if !more { + break + } + } + context = strings.Join(stack, " < ") + } + return context +} + +func (q *rateLimitingInterface) Add(item interface{}) { + klog.V(2).Infof("%q Add %v (%s)\n", q.name, item, context()) + q.wrapped.Add(item) +} + +func (q *rateLimitingInterface) AddAfter(item interface{}, duration time.Duration) { + klog.V(2).Infof("%q AddAfter %v %s (%s)", q.name, item, duration, context()) + q.wrapped.AddAfter(item, duration) +} + +func (q *rateLimitingInterface) AddRateLimited(item interface{}) { + klog.V(2).Infof("%q AddRateLimited %v (%s)", q.name, item, context()) + q.wrapped.AddRateLimited(item) +} + +func (q *rateLimitingInterface) Done(item interface{}) { + klog.V(2).Infof("%q Done %v", q.name, item) + q.wrapped.Done(item) +} + +func (q *rateLimitingInterface) Forget(item interface{}) { + klog.V(2).Infof("%q Forget %v", q.name, item) + q.wrapped.Forget(item) +} + +func (q *rateLimitingInterface) Get() (interface{}, bool) { + klog.V(2).Infof("%q Get...", q.name) + item, shutdown := q.wrapped.Get() + klog.V(2).Infof("%q ...Get %v (shutdown %t)", q.name, item, shutdown) + return item, shutdown +} + +func (q *rateLimitingInterface) Len() int { + return q.wrapped.Len() +} + +func (q *rateLimitingInterface) NumRequeues(item interface{}) int { + return q.wrapped.NumRequeues(item) +} + +func (q *rateLimitingInterface) ShutDown() { + klog.V(2).Infof("%q ShutDown", q.name) + q.wrapped.ShutDown() +} + +func (q *rateLimitingInterface) ShutDownWithDrain() { + klog.V(2).Infof("%q ShutDownWithDrain...", q.name) + q.wrapped.ShutDownWithDrain() + klog.V(2).Infof("%q ...ShutDownWithDrain", q.name) +} + +func (q *rateLimitingInterface) ShuttingDown() bool { + return q.wrapped.ShuttingDown() +} From f92ff4b56c52a02d7180e38bc63cb697aa34c5c5 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Sat, 23 Sep 2023 00:06:49 -0600 Subject: [PATCH 5/5] Revert "pkg/cvo: Debug logging for the availableUpdatesQueue" This reverts commit 316cfb3b15bc845381e3b0152fd6f1cd705d3319. I could drop the commit entirely, but it was very helpful to me to have this debug queue available, and by landing it in one commit and reverting it in the next, customers won't need to worry about the log verbosity, but developers will have the patch there in version control if they want to pull it back out to help understand a different queue in the future. --- pkg/cvo/cvo.go | 3 +- pkg/internal/debugworkqueue/debugworkqueue.go | 98 ------------------- 2 files changed, 1 insertion(+), 100 deletions(-) delete mode 100644 pkg/internal/debugworkqueue/debugworkqueue.go diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index d12d324a3c..b4cee42104 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -44,7 +44,6 @@ import ( cvointernal "github.com/openshift/cluster-version-operator/pkg/cvo/internal" "github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient" "github.com/openshift/cluster-version-operator/pkg/internal" - "github.com/openshift/cluster-version-operator/pkg/internal/debugworkqueue" "github.com/openshift/cluster-version-operator/pkg/payload" "github.com/openshift/cluster-version-operator/pkg/payload/precondition" preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion" @@ -214,7 +213,7 @@ func New( eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"), - availableUpdatesQueue: debugworkqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"), + availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"), upgradeableQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "upgradeable"), exclude: exclude, diff --git a/pkg/internal/debugworkqueue/debugworkqueue.go b/pkg/internal/debugworkqueue/debugworkqueue.go deleted file mode 100644 index 32bca9fedd..0000000000 --- a/pkg/internal/debugworkqueue/debugworkqueue.go +++ /dev/null @@ -1,98 +0,0 @@ -// Package debugworkqueue wraps client-go work queues with trace logging. -package debugworkqueue - -import ( - "runtime" - "strings" - "time" - - "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" -) - -func NewNamedRateLimitingQueue(rateLimiter workqueue.RateLimiter, name string) workqueue.RateLimitingInterface { - return &rateLimitingInterface{ - name: name, - wrapped: workqueue.NewNamedRateLimitingQueue(rateLimiter, name), - } -} - -type rateLimitingInterface struct { - name string - wrapped workqueue.RateLimitingInterface -} - -func context() string { - context := "unknown stack" - programCounters := make([]uintptr, 5) // ask for the closest 5 entries in the stack - n := runtime.Callers(3, programCounters) - if n > 0 { - programCounters = programCounters[:n] // drop unfilled slots - frames := runtime.CallersFrames(programCounters) - stack := make([]string, 0, n) - for { - frame, more := frames.Next() - stack = append(stack, frame.Function) - if !more { - break - } - } - context = strings.Join(stack, " < ") - } - return context -} - -func (q *rateLimitingInterface) Add(item interface{}) { - klog.V(2).Infof("%q Add %v (%s)\n", q.name, item, context()) - q.wrapped.Add(item) -} - -func (q *rateLimitingInterface) AddAfter(item interface{}, duration time.Duration) { - klog.V(2).Infof("%q AddAfter %v %s (%s)", q.name, item, duration, context()) - q.wrapped.AddAfter(item, duration) -} - -func (q *rateLimitingInterface) AddRateLimited(item interface{}) { - klog.V(2).Infof("%q AddRateLimited %v (%s)", q.name, item, context()) - q.wrapped.AddRateLimited(item) -} - -func (q *rateLimitingInterface) Done(item interface{}) { - klog.V(2).Infof("%q Done %v", q.name, item) - q.wrapped.Done(item) -} - -func (q *rateLimitingInterface) Forget(item interface{}) { - klog.V(2).Infof("%q Forget %v", q.name, item) - q.wrapped.Forget(item) -} - -func (q *rateLimitingInterface) Get() (interface{}, bool) { - klog.V(2).Infof("%q Get...", q.name) - item, shutdown := q.wrapped.Get() - klog.V(2).Infof("%q ...Get %v (shutdown %t)", q.name, item, shutdown) - return item, shutdown -} - -func (q *rateLimitingInterface) Len() int { - return q.wrapped.Len() -} - -func (q *rateLimitingInterface) NumRequeues(item interface{}) int { - return q.wrapped.NumRequeues(item) -} - -func (q *rateLimitingInterface) ShutDown() { - klog.V(2).Infof("%q ShutDown", q.name) - q.wrapped.ShutDown() -} - -func (q *rateLimitingInterface) ShutDownWithDrain() { - klog.V(2).Infof("%q ShutDownWithDrain...", q.name) - q.wrapped.ShutDownWithDrain() - klog.V(2).Infof("%q ...ShutDownWithDrain", q.name) -} - -func (q *rateLimitingInterface) ShuttingDown() bool { - return q.wrapped.ShuttingDown() -}