Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions lib/resourcebuilder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,9 @@ import (

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/klog/v2"
)

// WaitForJobCompletion waits for job to complete.
func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error {
return wait.PollUntilContextCancel(ctx, defaultObjectPollInterval, true, func(localCtx context.Context) (bool, error) {
j, err := client.Jobs(job.Namespace).Get(localCtx, job.Name, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("error getting Job %s: %v", job.Name, err)
}

if done, err := checkJobHealth(localCtx, j); err != nil && done {
return false, err
} else if err != nil {
klog.Error(err)
return false, nil
} else if !done {
klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace)
return false, nil
}
return true, nil
})
}

func (b *builder) checkJobHealth(ctx context.Context, job *batchv1.Job) error {
if b.mode == InitializingMode {
return nil
Expand Down
5 changes: 0 additions & 5 deletions lib/resourcebuilder/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -88,7 +87,3 @@ func New(mapper *ResourceMapper, rest *rest.Config, m manifest.Manifest) (Interf
}
return f(rest, m), nil
}

// defaultObjectPollInterval is the default interval to poll the API to determine whether an object
// is ready. Use this when a more specific interval is not necessary.
const defaultObjectPollInterval = 3 * time.Second
263 changes: 174 additions & 89 deletions pkg/cvo/updatepayload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,26 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/pkg/errors"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
randutil "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/cluster-version-operator/lib/resourcebuilder"
"github.com/openshift/cluster-version-operator/pkg/payload"
"github.com/openshift/library-go/pkg/verify"
)
Expand Down Expand Up @@ -162,9 +164,10 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co
payloadHash := base64.RawURLEncoding.EncodeToString(hash.Sum(nil))
tdir := filepath.Join(r.workingDir, payloadHash)

// Prune older jobs and directories while gracefully handling errors.
if err := r.pruneJobs(ctx, 0); err != nil {
klog.Warningf("failed to prune jobs: %v", err)
// Prune older pods and directories.
if err := r.prunePods(ctx); err != nil {
klog.Errorf("failed to prune pods: %v", err)
return "", fmt.Errorf("failed to prune pods: %w", err)
}

if err := payload.ValidateDirectory(tdir); os.IsNotExist(err) {
Expand Down Expand Up @@ -217,123 +220,205 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri
return container
}

job := &batchv1.Job{
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"k8s-app": "retrieve-openshift-release",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a workload closely coupled to CVO, I'd put them under a single k8s-app

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-used the label here:

labels:
k8s-app: cluster-version-operator

Copy link
Copy Markdown
Member Author

@hongkailiu hongkailiu Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to recover the label because it is used to list the version-- pod and prune.
CVO has the same label, then is killed too.

},
},
Spec: batchv1.JobSpec{
Spec: corev1.PodSpec{
ActiveDeadlineSeconds: deadline,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{
setContainerDefaults(corev1.Container{
Name: "cleanup",
Command: []string{"sh", "-c", "rm -fR ./*"},
WorkingDir: baseDir,
}),
setContainerDefaults(corev1.Container{
Name: "make-temporary-directory",
Command: []string{"mkdir", tmpDir},
}),
setContainerDefaults(corev1.Container{
Name: "move-operator-manifests-to-temporary-directory",
Command: []string{
"mv",
filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir),
filepath.Join(tmpDir, payload.CVOManifestDir),
},
}),
setContainerDefaults(corev1.Container{
Name: "move-release-manifests-to-temporary-directory",
Command: []string{
"mv",
filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir),
filepath.Join(tmpDir, payload.ReleaseManifestDir),
},
}),
InitContainers: []corev1.Container{
setContainerDefaults(corev1.Container{
Name: "cleanup",
Command: []string{"sh", "-c", "rm -fR ./*"},
WorkingDir: baseDir,
}),
setContainerDefaults(corev1.Container{
Name: "make-temporary-directory",
Command: []string{"mkdir", tmpDir},
}),
setContainerDefaults(corev1.Container{
Name: "move-operator-manifests-to-temporary-directory",
Command: []string{
"mv",
filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir),
filepath.Join(tmpDir, payload.CVOManifestDir),
},
Containers: []corev1.Container{
setContainerDefaults(corev1.Container{
Name: "rename-to-final-location",
Command: []string{"mv", tmpDir, dir},
}),
}),
setContainerDefaults(corev1.Container{
Name: "move-release-manifests-to-temporary-directory",
Command: []string{
"mv",
filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir),
filepath.Join(tmpDir, payload.ReleaseManifestDir),
},
Volumes: []corev1.Volume{{
Name: "payloads",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: targetUpdatePayloadsDir,
},
},
}},
NodeName: nodename,
NodeSelector: map[string]string{
nodeSelectorKey: "",
}),
},
Containers: []corev1.Container{
setContainerDefaults(corev1.Container{
Name: "rename-to-final-location",
Command: []string{"mv", tmpDir, dir},
}),
},
Volumes: []corev1.Volume{{
Name: "payloads",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: targetUpdatePayloadsDir,
},
PriorityClassName: "openshift-user-critical",
Tolerations: []corev1.Toleration{{
Key: nodeSelectorKey,
}},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
}},
NodeName: nodename,
NodeSelector: map[string]string{
nodeSelectorKey: "",
},
PriorityClassName: "openshift-user-critical",
Tolerations: []corev1.Toleration{{
Key: nodeSelectorKey,
}},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
}

if _, err := r.kubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}); err != nil {
klog.Infof("Spawning Pod %s ...", name)
if _, err := r.kubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
return err
}
return resourcebuilder.WaitForJobCompletion(ctx, r.kubeClient.BatchV1(), job)

return waitForPodCompletion(ctx, r.kubeClient.CoreV1().Pods(pod.Namespace), pod.Name)
}

// pruneJobs deletes the older, finished jobs in the namespace.
// retain - the number of newest jobs to keep.
func (r *payloadRetriever) pruneJobs(ctx context.Context, retain int) error {
jobs, err := r.kubeClient.BatchV1().Jobs(r.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return err
type PodListerWatcher interface {
List(ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
}

func collectStatuses(status corev1.PodStatus, waitingConditionFunc func(reason, message string) bool) []string {
var statuses []string
for _, cs := range status.ContainerStatuses {
if cs.State.Waiting != nil && waitingConditionFunc(cs.State.Waiting.Reason, cs.State.Waiting.Message) {
statuses = append(statuses, fmt.Sprintf("container %s is waiting with reason %q and message %q", cs.Name, cs.State.Waiting.Reason, cs.State.Waiting.Message))
}
if cs.State.Terminated != nil && cs.State.Terminated.Message != "" {
statuses = append(statuses, fmt.Sprintf("container %s is terminated with reason %q and message %q", cs.Name, cs.State.Terminated.Reason, cs.State.Terminated.Message))
}
}
if len(jobs.Items) <= retain {
return nil
for _, ics := range status.InitContainerStatuses {
if ics.State.Waiting != nil && waitingConditionFunc(ics.State.Waiting.Reason, ics.State.Waiting.Message) {
statuses = append(statuses, fmt.Sprintf("initcontainer %s is waiting with reason %q and message %q", ics.Name, ics.State.Waiting.Reason, ics.State.Waiting.Message))
}
if ics.State.Terminated != nil && ics.State.Terminated.Message != "" {
statuses = append(statuses, fmt.Sprintf("initcontainer %s is terminated with reason %q and message %q", ics.Name, ics.State.Terminated.Reason, ics.State.Terminated.Message))
}
}
return statuses
}

// Select jobs to be deleted
var deleteJobs []batchv1.Job
for _, job := range jobs.Items {
switch {
// Ignore jobs not beginning with operatorName
case !strings.HasPrefix(job.Name, r.operatorName+"-"):
break
func podCompletionCheckFn(name string) toolswatch.ConditionFunc {
return func(event watch.Event) (bool, error) {
p, ok := event.Object.(*corev1.Pod)
if !ok {
klog.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind())
return false, fmt.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind())
}
switch phase := p.Status.Phase; phase {
case corev1.PodPending:
klog.V(4).Infof("Pod %s is pending", name)
// There are two cases at the moment we want to bottle up the waiting message where
// the details would be lost if we waited until the pod failed.
// Case 1: "reason: SignatureValidationFailed".
// The message looks like 'image pull failed for quay.io/openshift-release-dev/ocp-release@sha256:digest because the signature validation failed: Source image rejected: A signature was required, but no signature exists'
// We do not need Case 1 if https://github.com/kubernetes/kubernetes/pull/127918 lands into OCP.
// Case 2: "reason: ErrImagePull".
// The message looks like '...: reading manifest sha256:... in quay.io/openshift-release-dev/ocp-release: manifest unknown'
// In case those keywords are changed in the future Kubernetes implementation, we will have to follow up accordingly.
// Otherwise, we will lose these details in the waiting message. It brings no other harms.
if statuses := collectStatuses(p.Status, func(reason, message string) bool {
return reason == "SignatureValidationFailed" ||
(reason == "ErrImagePull" && strings.Contains(message, "manifest unknown"))
}); len(statuses) > 0 {
klog.Errorf("Pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
return false, fmt.Errorf("pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
}
return false, nil
case corev1.PodRunning:
klog.V(4).Infof("Pod %s is running, waiting for its completion ...", name)
return false, nil
case corev1.PodSucceeded:
klog.Infof("Pod %s succeeded", name)
return true, nil
case corev1.PodFailed:
statuses := collectStatuses(p.Status, func(reason, message string) bool { return message != "" })
klog.Errorf("Pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
return false, fmt.Errorf("pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
default:
deleteJobs = append(deleteJobs, job)
klog.Errorf("Pod %s is with unexpected phase %s", name, phase)
return false, fmt.Errorf("pod %s is with unexpected phase %s", name, phase)
}
}
if len(deleteJobs) <= retain {
return nil
}

func waitForPodCompletion(ctx context.Context, podListerWatcher PodListerWatcher, name string) error {
fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String()
_, err := toolswatch.UntilWithSync(
ctx,
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
return podListerWatcher.List(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return podListerWatcher.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
},
},
&corev1.Pod{},
nil,
podCompletionCheckFn(name),
)
return err
}

// prunePods deletes the older, finished pods in the namespace.
func (r *payloadRetriever) prunePods(ctx context.Context) error {
Comment thread
hongkailiu marked this conversation as resolved.
var errs []error

// begin transitional job pruning, in case any dangled from earlier versions
jobs, err := r.kubeClient.BatchV1().Jobs(r.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
errs = append(errs, err)
}

// Sort jobs by StartTime to determine the newest. nil StartTime is assumed newest.
sort.Slice(deleteJobs, func(i, j int) bool {
if deleteJobs[i].Status.StartTime == nil {
return false
for _, job := range jobs.Items {
if !strings.HasPrefix(job.Name, r.operatorName+"-") {
// Ignore jobs not beginning with operatorName
continue
}
if deleteJobs[j].Status.StartTime == nil {
return true
err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name))
}
return deleteJobs[i].Status.StartTime.Before(deleteJobs[j].Status.StartTime)
}
// end transitional job pruning

pods, err := r.kubeClient.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{
LabelSelector: "k8s-app=retrieve-openshift-release",
})
if err != nil {
errs = append(errs, err)
}

var errs []error
for _, job := range deleteJobs[:len(deleteJobs)-retain] {
err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{})
for _, pod := range pods.Items {
err := r.kubeClient.CoreV1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name))
errs = append(errs, errors.Wrapf(err, "failed to delete pod %v", pod.Name))
}
}

agg := utilerrors.NewAggregate(errs)
if agg != nil {
return fmt.Errorf("error deleting jobs: %v", agg.Error())
return fmt.Errorf("error deleting pods: %v", agg.Error())
}
return nil
}
Expand Down
Loading