diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go index be7e80dd9b..18f3b4befa 100644 --- a/lib/resourcebuilder/batch.go +++ b/lib/resourcebuilder/batch.go @@ -6,33 +6,9 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/klog/v2" ) -// WaitForJobCompletion waits for job to complete. -func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error { - return wait.PollUntilContextCancel(ctx, defaultObjectPollInterval, true, func(localCtx context.Context) (bool, error) { - j, err := client.Jobs(job.Namespace).Get(localCtx, job.Name, metav1.GetOptions{}) - if err != nil { - return false, fmt.Errorf("error getting Job %s: %v", job.Name, err) - } - - if done, err := checkJobHealth(localCtx, j); err != nil && done { - return false, err - } else if err != nil { - klog.Error(err) - return false, nil - } else if !done { - klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace) - return false, nil - } - return true, nil - }) -} - func (b *builder) checkJobHealth(ctx context.Context, job *batchv1.Job) error { if b.mode == InitializingMode { return nil diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index e56d59c9ee..e2bbd283f5 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -88,7 +87,3 @@ func New(mapper *ResourceMapper, rest *rest.Config, m manifest.Manifest) (Interf } return f(rest, m), nil } - -// defaultObjectPollInterval is the default interval to poll the API to determine whether an object -// is ready. Use this when a more specific interval is not necessary. -const defaultObjectPollInterval = 3 * time.Second diff --git a/pkg/cvo/updatepayload.go b/pkg/cvo/updatepayload.go index d9eba2f922..c68cb42cd8 100644 --- a/pkg/cvo/updatepayload.go +++ b/pkg/cvo/updatepayload.go @@ -7,24 +7,26 @@ import ( "fmt" "os" "path/filepath" - "sort" "strings" "time" "github.com/pkg/errors" - batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" utilerrors "k8s.io/apimachinery/pkg/util/errors" randutil "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + toolswatch "k8s.io/client-go/tools/watch" "k8s.io/klog/v2" "k8s.io/utils/ptr" configv1 "github.com/openshift/api/config/v1" - "github.com/openshift/cluster-version-operator/lib/resourcebuilder" "github.com/openshift/cluster-version-operator/pkg/payload" "github.com/openshift/library-go/pkg/verify" ) @@ -162,9 +164,10 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co payloadHash := base64.RawURLEncoding.EncodeToString(hash.Sum(nil)) tdir := filepath.Join(r.workingDir, payloadHash) - // Prune older jobs and directories while gracefully handling errors. - if err := r.pruneJobs(ctx, 0); err != nil { - klog.Warningf("failed to prune jobs: %v", err) + // Prune older pods and directories. + if err := r.prunePods(ctx); err != nil { + klog.Errorf("failed to prune pods: %v", err) + return "", fmt.Errorf("failed to prune pods: %w", err) } if err := payload.ValidateDirectory(tdir); os.IsNotExist(err) { @@ -217,123 +220,205 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri return container } - job := &batchv1.Job{ + pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + "k8s-app": "retrieve-openshift-release", + }, }, - Spec: batchv1.JobSpec{ + Spec: corev1.PodSpec{ ActiveDeadlineSeconds: deadline, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - InitContainers: []corev1.Container{ - setContainerDefaults(corev1.Container{ - Name: "cleanup", - Command: []string{"sh", "-c", "rm -fR ./*"}, - WorkingDir: baseDir, - }), - setContainerDefaults(corev1.Container{ - Name: "make-temporary-directory", - Command: []string{"mkdir", tmpDir}, - }), - setContainerDefaults(corev1.Container{ - Name: "move-operator-manifests-to-temporary-directory", - Command: []string{ - "mv", - filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir), - filepath.Join(tmpDir, payload.CVOManifestDir), - }, - }), - setContainerDefaults(corev1.Container{ - Name: "move-release-manifests-to-temporary-directory", - Command: []string{ - "mv", - filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir), - filepath.Join(tmpDir, payload.ReleaseManifestDir), - }, - }), + InitContainers: []corev1.Container{ + setContainerDefaults(corev1.Container{ + Name: "cleanup", + Command: []string{"sh", "-c", "rm -fR ./*"}, + WorkingDir: baseDir, + }), + setContainerDefaults(corev1.Container{ + Name: "make-temporary-directory", + Command: []string{"mkdir", tmpDir}, + }), + setContainerDefaults(corev1.Container{ + Name: "move-operator-manifests-to-temporary-directory", + Command: []string{ + "mv", + filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir), + filepath.Join(tmpDir, payload.CVOManifestDir), }, - Containers: []corev1.Container{ - setContainerDefaults(corev1.Container{ - Name: "rename-to-final-location", - Command: []string{"mv", tmpDir, dir}, - }), + }), + setContainerDefaults(corev1.Container{ + Name: "move-release-manifests-to-temporary-directory", + Command: []string{ + "mv", + filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir), + filepath.Join(tmpDir, payload.ReleaseManifestDir), }, - Volumes: []corev1.Volume{{ - Name: "payloads", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: targetUpdatePayloadsDir, - }, - }, - }}, - NodeName: nodename, - NodeSelector: map[string]string{ - nodeSelectorKey: "", + }), + }, + Containers: []corev1.Container{ + setContainerDefaults(corev1.Container{ + Name: "rename-to-final-location", + Command: []string{"mv", tmpDir, dir}, + }), + }, + Volumes: []corev1.Volume{{ + Name: "payloads", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: targetUpdatePayloadsDir, }, - PriorityClassName: "openshift-user-critical", - Tolerations: []corev1.Toleration{{ - Key: nodeSelectorKey, - }}, - RestartPolicy: corev1.RestartPolicyOnFailure, }, + }}, + NodeName: nodename, + NodeSelector: map[string]string{ + nodeSelectorKey: "", }, + PriorityClassName: "openshift-user-critical", + Tolerations: []corev1.Toleration{{ + Key: nodeSelectorKey, + }}, + RestartPolicy: corev1.RestartPolicyOnFailure, }, } - if _, err := r.kubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}); err != nil { + klog.Infof("Spawning Pod %s ...", name) + if _, err := r.kubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { return err } - return resourcebuilder.WaitForJobCompletion(ctx, r.kubeClient.BatchV1(), job) + + return waitForPodCompletion(ctx, r.kubeClient.CoreV1().Pods(pod.Namespace), pod.Name) } -// pruneJobs deletes the older, finished jobs in the namespace. -// retain - the number of newest jobs to keep. -func (r *payloadRetriever) pruneJobs(ctx context.Context, retain int) error { - jobs, err := r.kubeClient.BatchV1().Jobs(r.namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - return err +type PodListerWatcher interface { + List(ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error) + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) +} + +func collectStatuses(status corev1.PodStatus, waitingConditionFunc func(reason, message string) bool) []string { + var statuses []string + for _, cs := range status.ContainerStatuses { + if cs.State.Waiting != nil && waitingConditionFunc(cs.State.Waiting.Reason, cs.State.Waiting.Message) { + statuses = append(statuses, fmt.Sprintf("container %s is waiting with reason %q and message %q", cs.Name, cs.State.Waiting.Reason, cs.State.Waiting.Message)) + } + if cs.State.Terminated != nil && cs.State.Terminated.Message != "" { + statuses = append(statuses, fmt.Sprintf("container %s is terminated with reason %q and message %q", cs.Name, cs.State.Terminated.Reason, cs.State.Terminated.Message)) + } } - if len(jobs.Items) <= retain { - return nil + for _, ics := range status.InitContainerStatuses { + if ics.State.Waiting != nil && waitingConditionFunc(ics.State.Waiting.Reason, ics.State.Waiting.Message) { + statuses = append(statuses, fmt.Sprintf("initcontainer %s is waiting with reason %q and message %q", ics.Name, ics.State.Waiting.Reason, ics.State.Waiting.Message)) + } + if ics.State.Terminated != nil && ics.State.Terminated.Message != "" { + statuses = append(statuses, fmt.Sprintf("initcontainer %s is terminated with reason %q and message %q", ics.Name, ics.State.Terminated.Reason, ics.State.Terminated.Message)) + } } + return statuses +} - // Select jobs to be deleted - var deleteJobs []batchv1.Job - for _, job := range jobs.Items { - switch { - // Ignore jobs not beginning with operatorName - case !strings.HasPrefix(job.Name, r.operatorName+"-"): - break +func podCompletionCheckFn(name string) toolswatch.ConditionFunc { + return func(event watch.Event) (bool, error) { + p, ok := event.Object.(*corev1.Pod) + if !ok { + klog.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind()) + return false, fmt.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind()) + } + switch phase := p.Status.Phase; phase { + case corev1.PodPending: + klog.V(4).Infof("Pod %s is pending", name) + // There are two cases at the moment we want to bottle up the waiting message where + // the details would be lost if we waited until the pod failed. + // Case 1: "reason: SignatureValidationFailed". + // The message looks like 'image pull failed for quay.io/openshift-release-dev/ocp-release@sha256:digest because the signature validation failed: Source image rejected: A signature was required, but no signature exists' + // We do not need Case 1 if https://github.com/kubernetes/kubernetes/pull/127918 lands into OCP. + // Case 2: "reason: ErrImagePull". + // The message looks like '...: reading manifest sha256:... in quay.io/openshift-release-dev/ocp-release: manifest unknown' + // In case those keywords are changed in the future Kubernetes implementation, we will have to follow up accordingly. + // Otherwise, we will lose these details in the waiting message. It brings no other harms. + if statuses := collectStatuses(p.Status, func(reason, message string) bool { + return reason == "SignatureValidationFailed" || + (reason == "ErrImagePull" && strings.Contains(message, "manifest unknown")) + }); len(statuses) > 0 { + klog.Errorf("Pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ",")) + return false, fmt.Errorf("pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ",")) + } + return false, nil + case corev1.PodRunning: + klog.V(4).Infof("Pod %s is running, waiting for its completion ...", name) + return false, nil + case corev1.PodSucceeded: + klog.Infof("Pod %s succeeded", name) + return true, nil + case corev1.PodFailed: + statuses := collectStatuses(p.Status, func(reason, message string) bool { return message != "" }) + klog.Errorf("Pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ",")) + return false, fmt.Errorf("pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ",")) default: - deleteJobs = append(deleteJobs, job) + klog.Errorf("Pod %s is with unexpected phase %s", name, phase) + return false, fmt.Errorf("pod %s is with unexpected phase %s", name, phase) } } - if len(deleteJobs) <= retain { - return nil +} + +func waitForPodCompletion(ctx context.Context, podListerWatcher PodListerWatcher, name string) error { + fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String() + _, err := toolswatch.UntilWithSync( + ctx, + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + return podListerWatcher.List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return podListerWatcher.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + }, + }, + &corev1.Pod{}, + nil, + podCompletionCheckFn(name), + ) + return err +} + +// prunePods deletes the older, finished pods in the namespace. +func (r *payloadRetriever) prunePods(ctx context.Context) error { + var errs []error + + // begin transitional job pruning, in case any dangled from earlier versions + jobs, err := r.kubeClient.BatchV1().Jobs(r.namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + errs = append(errs, err) } - // Sort jobs by StartTime to determine the newest. nil StartTime is assumed newest. - sort.Slice(deleteJobs, func(i, j int) bool { - if deleteJobs[i].Status.StartTime == nil { - return false + for _, job := range jobs.Items { + if !strings.HasPrefix(job.Name, r.operatorName+"-") { + // Ignore jobs not beginning with operatorName + continue } - if deleteJobs[j].Status.StartTime == nil { - return true + err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{}) + if err != nil { + errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name)) } - return deleteJobs[i].Status.StartTime.Before(deleteJobs[j].Status.StartTime) + } + // end transitional job pruning + + pods, err := r.kubeClient.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: "k8s-app=retrieve-openshift-release", }) + if err != nil { + errs = append(errs, err) + } - var errs []error - for _, job := range deleteJobs[:len(deleteJobs)-retain] { - err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{}) + for _, pod := range pods.Items { + err := r.kubeClient.CoreV1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) if err != nil { - errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name)) + errs = append(errs, errors.Wrapf(err, "failed to delete pod %v", pod.Name)) } } + agg := utilerrors.NewAggregate(errs) if agg != nil { - return fmt.Errorf("error deleting jobs: %v", agg.Error()) + return fmt.Errorf("error deleting pods: %v", agg.Error()) } return nil } diff --git a/vendor/k8s.io/client-go/tools/watch/informerwatcher.go b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go new file mode 100644 index 0000000000..5e6aad5cf1 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go @@ -0,0 +1,150 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func newEventProcessor(out chan<- watch.Event) *eventProcessor { + return &eventProcessor{ + out: out, + cond: sync.NewCond(&sync.Mutex{}), + done: make(chan struct{}), + } +} + +// eventProcessor buffers events and writes them to an out chan when a reader +// is waiting. Because of the requirement to buffer events, it synchronizes +// input with a condition, and synchronizes output with a channels. It needs to +// be able to yield while both waiting on an input condition and while blocked +// on writing to the output channel. +type eventProcessor struct { + out chan<- watch.Event + + cond *sync.Cond + buff []watch.Event + + done chan struct{} +} + +func (e *eventProcessor) run() { + for { + batch := e.takeBatch() + e.writeBatch(batch) + if e.stopped() { + return + } + } +} + +func (e *eventProcessor) takeBatch() []watch.Event { + e.cond.L.Lock() + defer e.cond.L.Unlock() + + for len(e.buff) == 0 && !e.stopped() { + e.cond.Wait() + } + + batch := e.buff + e.buff = nil + return batch +} + +func (e *eventProcessor) writeBatch(events []watch.Event) { + for _, event := range events { + select { + case e.out <- event: + case <-e.done: + return + } + } +} + +func (e *eventProcessor) push(event watch.Event) { + e.cond.L.Lock() + defer e.cond.L.Unlock() + defer e.cond.Signal() + e.buff = append(e.buff, event) +} + +func (e *eventProcessor) stopped() bool { + select { + case <-e.done: + return true + default: + return false + } +} + +func (e *eventProcessor) stop() { + close(e.done) + e.cond.Signal() +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +// it also returns a channel you can use to wait for the informers to fully shutdown. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { + ch := make(chan watch.Event) + w := watch.NewProxyWatcher(ch) + e := newEventProcessor(ch) + + indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + e.push(watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), + }) + }, + UpdateFunc: func(old, new interface{}) { + e.push(watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), + }) + }, + DeleteFunc: func(obj interface{}) { + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using + // watch API based on watch.Event but the caller can filter such + // objects by checking if metadata.deletionTimestamp is set + obj = staleObj.Obj + } + + e.push(watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), + }) + }, + }, cache.Indexers{}) + + go e.run() + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + defer e.stop() + informer.Run(w.StopChan()) + }() + + return indexer, informer, w, doneCh +} diff --git a/vendor/k8s.io/client-go/tools/watch/retrywatcher.go b/vendor/k8s.io/client-go/tools/watch/retrywatcher.go new file mode 100644 index 0000000000..d81dc43570 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/retrywatcher.go @@ -0,0 +1,295 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/dump" + "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// resourceVersionGetter is an interface used to get resource version from events. +// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) +// it will get restarted from the last point without the consumer even knowing about it. +// RetryWatcher does that by inspecting events and keeping track of resourceVersion. +// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes. +// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to +// use Informers for that. +type RetryWatcher struct { + lastResourceVersion string + watcherClient cache.Watcher + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} + minRestartDelay time.Duration +} + +// NewRetryWatcher creates a new RetryWatcher. +// It will make sure that watches gets restarted in case of recoverable errors. +// The initialResourceVersion will be given to watch method when first called. +func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { + return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) +} + +func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { + switch initialResourceVersion { + case "", "0": + // TODO: revisit this if we ever get WATCH v2 where it means start "now" + // without doing the synthetic list of objects at the beginning (see #74022) + return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion) + default: + break + } + + rw := &RetryWatcher{ + lastResourceVersion: initialResourceVersion, + watcherClient: watcherClient, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event, 0), + minRestartDelay: minRestartDelay, + } + + go rw.receive() + return rw, nil +} + +func (rw *RetryWatcher) send(event watch.Event) bool { + // Writing to an unbuffered channel is blocking operation + // and we need to check if stop wasn't requested while doing so. + select { + case rw.resultChan <- event: + return true + case <-rw.stopChan: + return false + } +} + +// doReceive returns true when it is done, false otherwise. +// If it is not done the second return value holds the time to wait before calling it again. +func (rw *RetryWatcher) doReceive() (bool, time.Duration) { + watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ + ResourceVersion: rw.lastResourceVersion, + AllowWatchBookmarks: true, + }) + // We are very unlikely to hit EOF here since we are just establishing the call, + // but it may happen that the apiserver is just shutting down (e.g. being restarted) + // This is consistent with how it is handled for informers + switch err { + case nil: + break + + case io.EOF: + // watch closed normally + return false, 0 + + case io.ErrUnexpectedEOF: + klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err) + return false, 0 + + default: + msg := "Watch failed" + if net.IsProbableEOF(err) || net.IsTimeout(err) { + klog.V(5).InfoS(msg, "err", err) + // Retry + return false, 0 + } + + klog.ErrorS(err, msg) + // Retry + return false, 0 + } + + if watcher == nil { + klog.ErrorS(nil, "Watch returned nil watcher") + // Retry + return false, 0 + } + + ch := watcher.ResultChan() + defer watcher.Stop() + + for { + select { + case <-rw.stopChan: + klog.V(4).InfoS("Stopping RetryWatcher.") + return true, 0 + case event, ok := <-ch: + if !ok { + klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion) + return false, 0 + } + + // We need to inspect the event and get ResourceVersion out of it + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: + metaObject, ok := event.Object.(resourceVersionGetter) + if !ok { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + resourceVersion := metaObject.GetResourceVersion() + if resourceVersion == "" { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + // All is fine; send the non-bookmark events and update resource version. + if event.Type != watch.Bookmark { + ok = rw.send(event) + if !ok { + return true, 0 + } + } + rw.lastResourceVersion = resourceVersion + + continue + + case watch.Error: + // This round trip allows us to handle unstructured status + errObject := apierrors.FromObject(event.Object) + statusErr, ok := errObject.(*apierrors.StatusError) + if !ok { + klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object))) + // Retry unknown errors + return false, 0 + } + + status := statusErr.ErrStatus + + statusDelay := time.Duration(0) + if status.Details != nil { + statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second + } + + switch status.Code { + case http.StatusGone: + // Never retry RV too old errors + _ = rw.send(event) + return true, 0 + + case http.StatusGatewayTimeout, http.StatusInternalServerError: + // Retry + return false, statusDelay + + default: + // We retry by default. RetryWatcher is meant to proceed unless it is certain + // that it can't. If we are not certain, we proceed with retry and leave it + // up to the user to timeout if needed. + + // Log here so we have a record of hitting the unexpected error + // and we can whitelist some error codes if we missed any that are expected. + klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object))) + + // Retry + return false, statusDelay + } + + default: + klog.Errorf("Failed to recognize Event type %q", event.Type) + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, + }) + // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + } + } +} + +// receive reads the result from a watcher, restarting it if necessary. +func (rw *RetryWatcher) receive() { + defer close(rw.doneChan) + defer close(rw.resultChan) + + klog.V(4).Info("Starting RetryWatcher.") + defer klog.V(4).Info("Stopping RetryWatcher.") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-rw.stopChan: + cancel() + return + case <-ctx.Done(): + return + } + }() + + // We use non sliding until so we don't introduce delays on happy path when WATCH call + // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. + wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { + done, retryAfter := rw.doReceive() + if done { + cancel() + return + } + + timer := time.NewTimer(retryAfter) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + + klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) + }, rw.minRestartDelay) +} + +// ResultChan implements Interface. +func (rw *RetryWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop implements Interface. +func (rw *RetryWatcher) Stop() { + close(rw.stopChan) +} + +// Done allows the caller to be notified when Retry watcher stops. +func (rw *RetryWatcher) Done() <-chan struct{} { + return rw.doneChan +} diff --git a/vendor/k8s.io/client-go/tools/watch/until.go b/vendor/k8s.io/client-go/tools/watch/until.go new file mode 100644 index 0000000000..a2474556b0 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/until.go @@ -0,0 +1,168 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition failed or detected an error state. +type PreconditionFunc func(store cache.Store) (bool, error) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event watch.Event) (bool, error) + +// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry. +var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout") + +// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// Waits until context deadline or until context is canceled. +// +// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!! +// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error. +// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below, +// Warning: solving such issues. +// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone. +func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var lastEvent *watch.Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, ErrWatchClosed + } + lastEvent = &event + + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-ctx.Done(): + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} + +// Until wraps the watcherClient's watch function with RetryWatcher making sure that watcher gets restarted in case of errors. +// The initialResourceVersion will be given to watch method when first called. It shall not be "" or "0" +// given the underlying WATCH call issues (#74022). +// Remaining behaviour is identical to function UntilWithoutRetry. (See above.) +// Until can deal with API timeouts and lost connections. +// It guarantees you to see all events and in the order they happened. +// Due to this guarantee there is no way it can deal with 'Resource version too old error'. It will fail in this case. +// (See `UntilWithSync` if you'd prefer to recover from all the errors including RV too old by re-listing +// those items. In normal code you should care about being level driven so you'd not care about not seeing all the edges.) +// +// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges"). +func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) { + w, err := NewRetryWatcher(initialResourceVersion, watcherClient) + if err != nil { + return nil, err + } + + return UntilWithoutRetry(ctx, w, conditions...) +} + +// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, +// and watches the output until each provided condition succeeds, in a way that is identical +// to function UntilWithoutRetry. (See above.) +// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'. +// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will +// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple +// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will +// re-list to recover and you always get an event, if there has been a change, after recovery. +// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for +// particular object, not between more of them even it's the same resource. +// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: +// waiting for object reaching a state, "small" controllers, ... +func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { + indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) + // We need to wait for the internal informers to fully stop so it's easier to reason about + // and it works with non-thread safe clients. + defer func() { <-done }() + // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and + // let UntilWithoutRetry to stop it + defer watcher.Stop() + + if precondition != nil { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %w", ctx.Err()) + } + + done, err := precondition(indexer) + if err != nil { + return nil, err + } + + if done { + return nil, nil + } + } + + return UntilWithoutRetry(ctx, watcher, conditions...) +} + +// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. +func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout < 0 { + // This should be handled in validation + klog.Errorf("Timeout for context shall not be negative!") + timeout = 0 + } + + if timeout == 0 { + return context.WithCancel(parent) + } + + return context.WithTimeout(parent, timeout) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b7cabc168b..8cb6209ef7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -728,6 +728,7 @@ k8s.io/client-go/tools/pager k8s.io/client-go/tools/record k8s.io/client-go/tools/record/util k8s.io/client-go/tools/reference +k8s.io/client-go/tools/watch k8s.io/client-go/transport k8s.io/client-go/util/cert k8s.io/client-go/util/connrotation