From 9111c3357c762616c14b58e2e8114282c946e00e Mon Sep 17 00:00:00 2001 From: wking Date: Thu, 3 Oct 2024 18:31:37 -0700 Subject: [PATCH 1/2] WIP: pkg/cvo/updatepayload: Drop the Job controller for release-manifests downloads Previously, the CVO launched a Job and waited for it to complete to get manifests for an incoming release payload. But the Job controller doesn't bubble up details about why the pod has trouble (e.g. Init:SignatureValidationFailed), so to get those details, we need direct access to the Pod. The Job controller doesn't seem like it's adding much value here, so we're dropping it and monitoring the Pod ourselves. --- lib/resourcebuilder/batch.go | 22 ---- pkg/cvo/updatepayload.go | 189 ++++++++++++++++++----------------- 2 files changed, 99 insertions(+), 112 deletions(-) diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go index be7e80dd9b..bcbd2f3c06 100644 --- a/lib/resourcebuilder/batch.go +++ b/lib/resourcebuilder/batch.go @@ -7,32 +7,10 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/klog/v2" ) -// WaitForJobCompletion waits for job to complete. -func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error { - return wait.PollUntilContextCancel(ctx, defaultObjectPollInterval, true, func(localCtx context.Context) (bool, error) { - j, err := client.Jobs(job.Namespace).Get(localCtx, job.Name, metav1.GetOptions{}) - if err != nil { - return false, fmt.Errorf("error getting Job %s: %v", job.Name, err) - } - - if done, err := checkJobHealth(localCtx, j); err != nil && done { - return false, err - } else if err != nil { - klog.Error(err) - return false, nil - } else if !done { - klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace) - return false, nil - } - return true, nil - }) -} - func (b *builder) checkJobHealth(ctx context.Context, job *batchv1.Job) error { if b.mode == InitializingMode { return nil diff --git a/pkg/cvo/updatepayload.go b/pkg/cvo/updatepayload.go index d9eba2f922..ce043377fe 100644 --- a/pkg/cvo/updatepayload.go +++ b/pkg/cvo/updatepayload.go @@ -13,12 +13,12 @@ import ( "github.com/pkg/errors" - batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" randutil "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -162,9 +162,9 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co payloadHash := base64.RawURLEncoding.EncodeToString(hash.Sum(nil)) tdir := filepath.Join(r.workingDir, payloadHash) - // Prune older jobs and directories while gracefully handling errors. - if err := r.pruneJobs(ctx, 0); err != nil { - klog.Warningf("failed to prune jobs: %v", err) + // Prune older pods and directories while gracefully handling errors. + if err := r.prunePods(ctx); err != nil { + klog.Warningf("failed to prune pods: %v", err) } if err := payload.ValidateDirectory(tdir); os.IsNotExist(err) { @@ -217,123 +217,132 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri return container } - job := &batchv1.Job{ + pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + "k8s-app": "retrieve-openshift-release", + }, }, - Spec: batchv1.JobSpec{ + Spec: corev1.PodSpec{ ActiveDeadlineSeconds: deadline, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - InitContainers: []corev1.Container{ - setContainerDefaults(corev1.Container{ - Name: "cleanup", - Command: []string{"sh", "-c", "rm -fR ./*"}, - WorkingDir: baseDir, - }), - setContainerDefaults(corev1.Container{ - Name: "make-temporary-directory", - Command: []string{"mkdir", tmpDir}, - }), - setContainerDefaults(corev1.Container{ - Name: "move-operator-manifests-to-temporary-directory", - Command: []string{ - "mv", - filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir), - filepath.Join(tmpDir, payload.CVOManifestDir), - }, - }), - setContainerDefaults(corev1.Container{ - Name: "move-release-manifests-to-temporary-directory", - Command: []string{ - "mv", - filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir), - filepath.Join(tmpDir, payload.ReleaseManifestDir), - }, - }), + InitContainers: []corev1.Container{ + setContainerDefaults(corev1.Container{ + Name: "cleanup", + Command: []string{"sh", "-c", "rm -fR ./*"}, + WorkingDir: baseDir, + }), + setContainerDefaults(corev1.Container{ + Name: "make-temporary-directory", + Command: []string{"mkdir", tmpDir}, + }), + setContainerDefaults(corev1.Container{ + Name: "move-operator-manifests-to-temporary-directory", + Command: []string{ + "mv", + filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir), + filepath.Join(tmpDir, payload.CVOManifestDir), }, - Containers: []corev1.Container{ - setContainerDefaults(corev1.Container{ - Name: "rename-to-final-location", - Command: []string{"mv", tmpDir, dir}, - }), + }), + setContainerDefaults(corev1.Container{ + Name: "move-release-manifests-to-temporary-directory", + Command: []string{ + "mv", + filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir), + filepath.Join(tmpDir, payload.ReleaseManifestDir), }, - Volumes: []corev1.Volume{{ - Name: "payloads", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: targetUpdatePayloadsDir, - }, - }, - }}, - NodeName: nodename, - NodeSelector: map[string]string{ - nodeSelectorKey: "", + }), + }, + Containers: []corev1.Container{ + setContainerDefaults(corev1.Container{ + Name: "rename-to-final-location", + Command: []string{"mv", tmpDir, dir}, + }), + }, + Volumes: []corev1.Volume{{ + Name: "payloads", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: targetUpdatePayloadsDir, }, - PriorityClassName: "openshift-user-critical", - Tolerations: []corev1.Toleration{{ - Key: nodeSelectorKey, - }}, - RestartPolicy: corev1.RestartPolicyOnFailure, }, + }}, + NodeName: nodename, + NodeSelector: map[string]string{ + nodeSelectorKey: "", }, + PriorityClassName: "openshift-user-critical", + Tolerations: []corev1.Toleration{{ + Key: nodeSelectorKey, + }}, + RestartPolicy: corev1.RestartPolicyOnFailure, }, - } + }, - if _, err := r.kubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}); err != nil { + if _, err := r.kubeClient.V1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { return err } - return resourcebuilder.WaitForJobCompletion(ctx, r.kubeClient.BatchV1(), job) + + return wait.PollUntilContextCancel(ctx, 3 * time.Second, true, func(localCtx context.Context) (bool, error) { + p, err := r.kubeClient.V1().Pods(pod.Namespace).Get(localCtx, pod.Name, metav1.GetOptions{}) + if err != nil { + klog.Warningf("unable to get OpenShift release retrieval pod: %v", err) + return false, nil + } + + if p.Status.Phase != "", { + return false, err + } else if err != nil { + klog.Error(err) + return false, nil + } else if !done { + klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace) + return false, nil + } + return true, nil + }) } -// pruneJobs deletes the older, finished jobs in the namespace. -// retain - the number of newest jobs to keep. -func (r *payloadRetriever) pruneJobs(ctx context.Context, retain int) error { +// prunePods deletes the older, finished pods in the namespace. +func (r *payloadRetriever) prunePods(ctx context.Context) error { + var errs []error + + // begin transitional job pruning, in case any dangled from earlier versions jobs, err := r.kubeClient.BatchV1().Jobs(r.namespace).List(ctx, metav1.ListOptions{}) if err != nil { - return err - } - if len(jobs.Items) <= retain { - return nil + errs = append(errs, err) } - // Select jobs to be deleted - var deleteJobs []batchv1.Job for _, job := range jobs.Items { - switch { - // Ignore jobs not beginning with operatorName - case !strings.HasPrefix(job.Name, r.operatorName+"-"): - break - default: - deleteJobs = append(deleteJobs, job) + if !strings.HasPrefix(job.Name, r.operatorName+"-") { + // Ignore jobs not beginning with operatorName + continue + } + err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{}) + if err != nil { + errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name)) } } - if len(deleteJobs) <= retain { - return nil - } + // end transitional job pruning - // Sort jobs by StartTime to determine the newest. nil StartTime is assumed newest. - sort.Slice(deleteJobs, func(i, j int) bool { - if deleteJobs[i].Status.StartTime == nil { - return false - } - if deleteJobs[j].Status.StartTime == nil { - return true - } - return deleteJobs[i].Status.StartTime.Before(deleteJobs[j].Status.StartTime) + pods, err := r.kubeClient.BatchV1().Pods(r.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: "k8s-app=retrieve-openshift-release", }) + if err != nil { + errs = append(errs, err) + } - var errs []error - for _, job := range deleteJobs[:len(deleteJobs)-retain] { - err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{}) + for _, pod := pods.Items { + err := r.kubeClient.V1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) if err != nil { - errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name)) + errs = append(errs, errors.Wrapf(err, "failed to delete pod %v", pod.Name)) } } + agg := utilerrors.NewAggregate(errs) if agg != nil { - return fmt.Errorf("error deleting jobs: %v", agg.Error()) + return fmt.Errorf("error deleting pods: %v", agg.Error()) } return nil } From 4386a7bba9451d9e38edf820097ce05fac17ce94 Mon Sep 17 00:00:00 2001 From: Hongkai Liu Date: Wed, 6 Nov 2024 17:35:52 -0500 Subject: [PATCH 2/2] Use Watch API in fetchUpdatePayloadToDir --- lib/resourcebuilder/batch.go | 2 - lib/resourcebuilder/interface.go | 5 - pkg/cvo/updatepayload.go | 126 ++++++-- .../client-go/tools/watch/informerwatcher.go | 150 +++++++++ .../client-go/tools/watch/retrywatcher.go | 295 ++++++++++++++++++ vendor/k8s.io/client-go/tools/watch/until.go | 168 ++++++++++ vendor/modules.txt | 1 + 7 files changed, 715 insertions(+), 32 deletions(-) create mode 100644 vendor/k8s.io/client-go/tools/watch/informerwatcher.go create mode 100644 vendor/k8s.io/client-go/tools/watch/retrywatcher.go create mode 100644 vendor/k8s.io/client-go/tools/watch/until.go diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go index bcbd2f3c06..18f3b4befa 100644 --- a/lib/resourcebuilder/batch.go +++ b/lib/resourcebuilder/batch.go @@ -6,8 +6,6 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/klog/v2" ) diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index e56d59c9ee..e2bbd283f5 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -88,7 +87,3 @@ func New(mapper *ResourceMapper, rest *rest.Config, m manifest.Manifest) (Interf } return f(rest, m), nil } - -// defaultObjectPollInterval is the default interval to poll the API to determine whether an object -// is ready. Use this when a more specific interval is not necessary. -const defaultObjectPollInterval = 3 * time.Second diff --git a/pkg/cvo/updatepayload.go b/pkg/cvo/updatepayload.go index ce043377fe..c68cb42cd8 100644 --- a/pkg/cvo/updatepayload.go +++ b/pkg/cvo/updatepayload.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "path/filepath" - "sort" "strings" "time" @@ -16,15 +15,18 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" utilerrors "k8s.io/apimachinery/pkg/util/errors" randutil "k8s.io/apimachinery/pkg/util/rand" - "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + toolswatch "k8s.io/client-go/tools/watch" "k8s.io/klog/v2" "k8s.io/utils/ptr" configv1 "github.com/openshift/api/config/v1" - "github.com/openshift/cluster-version-operator/lib/resourcebuilder" "github.com/openshift/cluster-version-operator/pkg/payload" "github.com/openshift/library-go/pkg/verify" ) @@ -162,9 +164,10 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co payloadHash := base64.RawURLEncoding.EncodeToString(hash.Sum(nil)) tdir := filepath.Join(r.workingDir, payloadHash) - // Prune older pods and directories while gracefully handling errors. + // Prune older pods and directories. if err := r.prunePods(ctx); err != nil { - klog.Warningf("failed to prune pods: %v", err) + klog.Errorf("failed to prune pods: %v", err) + return "", fmt.Errorf("failed to prune pods: %w", err) } if err := payload.ValidateDirectory(tdir); os.IsNotExist(err) { @@ -217,7 +220,7 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri return container } - pod := &v1.Pod{ + pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, @@ -278,30 +281,103 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri }}, RestartPolicy: corev1.RestartPolicyOnFailure, }, - }, + } - if _, err := r.kubeClient.V1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + klog.Infof("Spawning Pod %s ...", name) + if _, err := r.kubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { return err } - return wait.PollUntilContextCancel(ctx, 3 * time.Second, true, func(localCtx context.Context) (bool, error) { - p, err := r.kubeClient.V1().Pods(pod.Namespace).Get(localCtx, pod.Name, metav1.GetOptions{}) - if err != nil { - klog.Warningf("unable to get OpenShift release retrieval pod: %v", err) - return false, nil + return waitForPodCompletion(ctx, r.kubeClient.CoreV1().Pods(pod.Namespace), pod.Name) +} + +type PodListerWatcher interface { + List(ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error) + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) +} + +func collectStatuses(status corev1.PodStatus, waitingConditionFunc func(reason, message string) bool) []string { + var statuses []string + for _, cs := range status.ContainerStatuses { + if cs.State.Waiting != nil && waitingConditionFunc(cs.State.Waiting.Reason, cs.State.Waiting.Message) { + statuses = append(statuses, fmt.Sprintf("container %s is waiting with reason %q and message %q", cs.Name, cs.State.Waiting.Reason, cs.State.Waiting.Message)) + } + if cs.State.Terminated != nil && cs.State.Terminated.Message != "" { + statuses = append(statuses, fmt.Sprintf("container %s is terminated with reason %q and message %q", cs.Name, cs.State.Terminated.Reason, cs.State.Terminated.Message)) + } + } + for _, ics := range status.InitContainerStatuses { + if ics.State.Waiting != nil && waitingConditionFunc(ics.State.Waiting.Reason, ics.State.Waiting.Message) { + statuses = append(statuses, fmt.Sprintf("initcontainer %s is waiting with reason %q and message %q", ics.Name, ics.State.Waiting.Reason, ics.State.Waiting.Message)) + } + if ics.State.Terminated != nil && ics.State.Terminated.Message != "" { + statuses = append(statuses, fmt.Sprintf("initcontainer %s is terminated with reason %q and message %q", ics.Name, ics.State.Terminated.Reason, ics.State.Terminated.Message)) } - - if p.Status.Phase != "", { - return false, err - } else if err != nil { - klog.Error(err) + } + return statuses +} + +func podCompletionCheckFn(name string) toolswatch.ConditionFunc { + return func(event watch.Event) (bool, error) { + p, ok := event.Object.(*corev1.Pod) + if !ok { + klog.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind()) + return false, fmt.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind()) + } + switch phase := p.Status.Phase; phase { + case corev1.PodPending: + klog.V(4).Infof("Pod %s is pending", name) + // There are two cases at the moment we want to bottle up the waiting message where + // the details would be lost if we waited until the pod failed. + // Case 1: "reason: SignatureValidationFailed". + // The message looks like 'image pull failed for quay.io/openshift-release-dev/ocp-release@sha256:digest because the signature validation failed: Source image rejected: A signature was required, but no signature exists' + // We do not need Case 1 if https://github.com/kubernetes/kubernetes/pull/127918 lands into OCP. + // Case 2: "reason: ErrImagePull". + // The message looks like '...: reading manifest sha256:... in quay.io/openshift-release-dev/ocp-release: manifest unknown' + // In case those keywords are changed in the future Kubernetes implementation, we will have to follow up accordingly. + // Otherwise, we will lose these details in the waiting message. It brings no other harms. + if statuses := collectStatuses(p.Status, func(reason, message string) bool { + return reason == "SignatureValidationFailed" || + (reason == "ErrImagePull" && strings.Contains(message, "manifest unknown")) + }); len(statuses) > 0 { + klog.Errorf("Pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ",")) + return false, fmt.Errorf("pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ",")) + } return false, nil - } else if !done { - klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace) + case corev1.PodRunning: + klog.V(4).Infof("Pod %s is running, waiting for its completion ...", name) return false, nil + case corev1.PodSucceeded: + klog.Infof("Pod %s succeeded", name) + return true, nil + case corev1.PodFailed: + statuses := collectStatuses(p.Status, func(reason, message string) bool { return message != "" }) + klog.Errorf("Pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ",")) + return false, fmt.Errorf("pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ",")) + default: + klog.Errorf("Pod %s is with unexpected phase %s", name, phase) + return false, fmt.Errorf("pod %s is with unexpected phase %s", name, phase) } - return true, nil - }) + } +} + +func waitForPodCompletion(ctx context.Context, podListerWatcher PodListerWatcher, name string) error { + fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String() + _, err := toolswatch.UntilWithSync( + ctx, + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + return podListerWatcher.List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return podListerWatcher.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + }, + }, + &corev1.Pod{}, + nil, + podCompletionCheckFn(name), + ) + return err } // prunePods deletes the older, finished pods in the namespace. @@ -326,15 +402,15 @@ func (r *payloadRetriever) prunePods(ctx context.Context) error { } // end transitional job pruning - pods, err := r.kubeClient.BatchV1().Pods(r.namespace).List(ctx, metav1.ListOptions{ + pods, err := r.kubeClient.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{ LabelSelector: "k8s-app=retrieve-openshift-release", }) if err != nil { errs = append(errs, err) } - for _, pod := pods.Items { - err := r.kubeClient.V1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + for _, pod := range pods.Items { + err := r.kubeClient.CoreV1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) if err != nil { errs = append(errs, errors.Wrapf(err, "failed to delete pod %v", pod.Name)) } diff --git a/vendor/k8s.io/client-go/tools/watch/informerwatcher.go b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go new file mode 100644 index 0000000000..5e6aad5cf1 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go @@ -0,0 +1,150 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func newEventProcessor(out chan<- watch.Event) *eventProcessor { + return &eventProcessor{ + out: out, + cond: sync.NewCond(&sync.Mutex{}), + done: make(chan struct{}), + } +} + +// eventProcessor buffers events and writes them to an out chan when a reader +// is waiting. Because of the requirement to buffer events, it synchronizes +// input with a condition, and synchronizes output with a channels. It needs to +// be able to yield while both waiting on an input condition and while blocked +// on writing to the output channel. +type eventProcessor struct { + out chan<- watch.Event + + cond *sync.Cond + buff []watch.Event + + done chan struct{} +} + +func (e *eventProcessor) run() { + for { + batch := e.takeBatch() + e.writeBatch(batch) + if e.stopped() { + return + } + } +} + +func (e *eventProcessor) takeBatch() []watch.Event { + e.cond.L.Lock() + defer e.cond.L.Unlock() + + for len(e.buff) == 0 && !e.stopped() { + e.cond.Wait() + } + + batch := e.buff + e.buff = nil + return batch +} + +func (e *eventProcessor) writeBatch(events []watch.Event) { + for _, event := range events { + select { + case e.out <- event: + case <-e.done: + return + } + } +} + +func (e *eventProcessor) push(event watch.Event) { + e.cond.L.Lock() + defer e.cond.L.Unlock() + defer e.cond.Signal() + e.buff = append(e.buff, event) +} + +func (e *eventProcessor) stopped() bool { + select { + case <-e.done: + return true + default: + return false + } +} + +func (e *eventProcessor) stop() { + close(e.done) + e.cond.Signal() +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +// it also returns a channel you can use to wait for the informers to fully shutdown. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { + ch := make(chan watch.Event) + w := watch.NewProxyWatcher(ch) + e := newEventProcessor(ch) + + indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + e.push(watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), + }) + }, + UpdateFunc: func(old, new interface{}) { + e.push(watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), + }) + }, + DeleteFunc: func(obj interface{}) { + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using + // watch API based on watch.Event but the caller can filter such + // objects by checking if metadata.deletionTimestamp is set + obj = staleObj.Obj + } + + e.push(watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), + }) + }, + }, cache.Indexers{}) + + go e.run() + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + defer e.stop() + informer.Run(w.StopChan()) + }() + + return indexer, informer, w, doneCh +} diff --git a/vendor/k8s.io/client-go/tools/watch/retrywatcher.go b/vendor/k8s.io/client-go/tools/watch/retrywatcher.go new file mode 100644 index 0000000000..d81dc43570 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/retrywatcher.go @@ -0,0 +1,295 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/dump" + "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// resourceVersionGetter is an interface used to get resource version from events. +// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) +// it will get restarted from the last point without the consumer even knowing about it. +// RetryWatcher does that by inspecting events and keeping track of resourceVersion. +// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes. +// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to +// use Informers for that. +type RetryWatcher struct { + lastResourceVersion string + watcherClient cache.Watcher + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} + minRestartDelay time.Duration +} + +// NewRetryWatcher creates a new RetryWatcher. +// It will make sure that watches gets restarted in case of recoverable errors. +// The initialResourceVersion will be given to watch method when first called. +func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { + return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) +} + +func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { + switch initialResourceVersion { + case "", "0": + // TODO: revisit this if we ever get WATCH v2 where it means start "now" + // without doing the synthetic list of objects at the beginning (see #74022) + return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion) + default: + break + } + + rw := &RetryWatcher{ + lastResourceVersion: initialResourceVersion, + watcherClient: watcherClient, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event, 0), + minRestartDelay: minRestartDelay, + } + + go rw.receive() + return rw, nil +} + +func (rw *RetryWatcher) send(event watch.Event) bool { + // Writing to an unbuffered channel is blocking operation + // and we need to check if stop wasn't requested while doing so. + select { + case rw.resultChan <- event: + return true + case <-rw.stopChan: + return false + } +} + +// doReceive returns true when it is done, false otherwise. +// If it is not done the second return value holds the time to wait before calling it again. +func (rw *RetryWatcher) doReceive() (bool, time.Duration) { + watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ + ResourceVersion: rw.lastResourceVersion, + AllowWatchBookmarks: true, + }) + // We are very unlikely to hit EOF here since we are just establishing the call, + // but it may happen that the apiserver is just shutting down (e.g. being restarted) + // This is consistent with how it is handled for informers + switch err { + case nil: + break + + case io.EOF: + // watch closed normally + return false, 0 + + case io.ErrUnexpectedEOF: + klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err) + return false, 0 + + default: + msg := "Watch failed" + if net.IsProbableEOF(err) || net.IsTimeout(err) { + klog.V(5).InfoS(msg, "err", err) + // Retry + return false, 0 + } + + klog.ErrorS(err, msg) + // Retry + return false, 0 + } + + if watcher == nil { + klog.ErrorS(nil, "Watch returned nil watcher") + // Retry + return false, 0 + } + + ch := watcher.ResultChan() + defer watcher.Stop() + + for { + select { + case <-rw.stopChan: + klog.V(4).InfoS("Stopping RetryWatcher.") + return true, 0 + case event, ok := <-ch: + if !ok { + klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion) + return false, 0 + } + + // We need to inspect the event and get ResourceVersion out of it + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: + metaObject, ok := event.Object.(resourceVersionGetter) + if !ok { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + resourceVersion := metaObject.GetResourceVersion() + if resourceVersion == "" { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + // All is fine; send the non-bookmark events and update resource version. + if event.Type != watch.Bookmark { + ok = rw.send(event) + if !ok { + return true, 0 + } + } + rw.lastResourceVersion = resourceVersion + + continue + + case watch.Error: + // This round trip allows us to handle unstructured status + errObject := apierrors.FromObject(event.Object) + statusErr, ok := errObject.(*apierrors.StatusError) + if !ok { + klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object))) + // Retry unknown errors + return false, 0 + } + + status := statusErr.ErrStatus + + statusDelay := time.Duration(0) + if status.Details != nil { + statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second + } + + switch status.Code { + case http.StatusGone: + // Never retry RV too old errors + _ = rw.send(event) + return true, 0 + + case http.StatusGatewayTimeout, http.StatusInternalServerError: + // Retry + return false, statusDelay + + default: + // We retry by default. RetryWatcher is meant to proceed unless it is certain + // that it can't. If we are not certain, we proceed with retry and leave it + // up to the user to timeout if needed. + + // Log here so we have a record of hitting the unexpected error + // and we can whitelist some error codes if we missed any that are expected. + klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object))) + + // Retry + return false, statusDelay + } + + default: + klog.Errorf("Failed to recognize Event type %q", event.Type) + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, + }) + // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + } + } +} + +// receive reads the result from a watcher, restarting it if necessary. +func (rw *RetryWatcher) receive() { + defer close(rw.doneChan) + defer close(rw.resultChan) + + klog.V(4).Info("Starting RetryWatcher.") + defer klog.V(4).Info("Stopping RetryWatcher.") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-rw.stopChan: + cancel() + return + case <-ctx.Done(): + return + } + }() + + // We use non sliding until so we don't introduce delays on happy path when WATCH call + // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. + wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { + done, retryAfter := rw.doReceive() + if done { + cancel() + return + } + + timer := time.NewTimer(retryAfter) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + + klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) + }, rw.minRestartDelay) +} + +// ResultChan implements Interface. +func (rw *RetryWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop implements Interface. +func (rw *RetryWatcher) Stop() { + close(rw.stopChan) +} + +// Done allows the caller to be notified when Retry watcher stops. +func (rw *RetryWatcher) Done() <-chan struct{} { + return rw.doneChan +} diff --git a/vendor/k8s.io/client-go/tools/watch/until.go b/vendor/k8s.io/client-go/tools/watch/until.go new file mode 100644 index 0000000000..a2474556b0 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/until.go @@ -0,0 +1,168 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition failed or detected an error state. +type PreconditionFunc func(store cache.Store) (bool, error) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event watch.Event) (bool, error) + +// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry. +var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout") + +// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// Waits until context deadline or until context is canceled. +// +// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!! +// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error. +// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below, +// Warning: solving such issues. +// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone. +func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var lastEvent *watch.Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, ErrWatchClosed + } + lastEvent = &event + + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-ctx.Done(): + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} + +// Until wraps the watcherClient's watch function with RetryWatcher making sure that watcher gets restarted in case of errors. +// The initialResourceVersion will be given to watch method when first called. It shall not be "" or "0" +// given the underlying WATCH call issues (#74022). +// Remaining behaviour is identical to function UntilWithoutRetry. (See above.) +// Until can deal with API timeouts and lost connections. +// It guarantees you to see all events and in the order they happened. +// Due to this guarantee there is no way it can deal with 'Resource version too old error'. It will fail in this case. +// (See `UntilWithSync` if you'd prefer to recover from all the errors including RV too old by re-listing +// those items. In normal code you should care about being level driven so you'd not care about not seeing all the edges.) +// +// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges"). +func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) { + w, err := NewRetryWatcher(initialResourceVersion, watcherClient) + if err != nil { + return nil, err + } + + return UntilWithoutRetry(ctx, w, conditions...) +} + +// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, +// and watches the output until each provided condition succeeds, in a way that is identical +// to function UntilWithoutRetry. (See above.) +// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'. +// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will +// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple +// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will +// re-list to recover and you always get an event, if there has been a change, after recovery. +// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for +// particular object, not between more of them even it's the same resource. +// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: +// waiting for object reaching a state, "small" controllers, ... +func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { + indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) + // We need to wait for the internal informers to fully stop so it's easier to reason about + // and it works with non-thread safe clients. + defer func() { <-done }() + // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and + // let UntilWithoutRetry to stop it + defer watcher.Stop() + + if precondition != nil { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %w", ctx.Err()) + } + + done, err := precondition(indexer) + if err != nil { + return nil, err + } + + if done { + return nil, nil + } + } + + return UntilWithoutRetry(ctx, watcher, conditions...) +} + +// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. +func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout < 0 { + // This should be handled in validation + klog.Errorf("Timeout for context shall not be negative!") + timeout = 0 + } + + if timeout == 0 { + return context.WithCancel(parent) + } + + return context.WithTimeout(parent, timeout) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b7cabc168b..8cb6209ef7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -728,6 +728,7 @@ k8s.io/client-go/tools/pager k8s.io/client-go/tools/record k8s.io/client-go/tools/record/util k8s.io/client-go/tools/reference +k8s.io/client-go/tools/watch k8s.io/client-go/transport k8s.io/client-go/util/cert k8s.io/client-go/util/connrotation