Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 67 additions & 60 deletions pkg/cvo/updatepayload.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,19 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co
hash := md5.New()
hash.Write([]byte(update.Image))
payloadHash := base64.RawURLEncoding.EncodeToString(hash.Sum(nil))

tdir := filepath.Join(r.workingDir, payloadHash)
err := payload.ValidateDirectory(tdir)
if os.IsNotExist(err) {
// the dirs don't exist, try fetching the payload to tdir.
err = r.fetchUpdatePayloadToDir(ctx, tdir, update)

// Prune older jobs and directories while gracefully handling errors.
if err := r.pruneJobs(ctx, 0); err != nil {
klog.Warningf("failed to prune jobs: %v", err)
}
if err != nil {

if err := payload.ValidateDirectory(tdir); os.IsNotExist(err) {
// the dirs don't exist, try fetching the payload to tdir.
if err := r.fetchUpdatePayloadToDir(ctx, tdir, update); err != nil {
return "", err
}
} else if err != nil {
return "", err
}

Expand All @@ -156,16 +161,36 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co
func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir string, update configv1.Update) error {
var (
version = update.Version
payload = update.Image
image = update.Image
name = fmt.Sprintf("%s-%s-%s", r.operatorName, version, randutil.String(5))
namespace = r.namespace
deadline = pointer.Int64Ptr(2 * 60)
nodeSelectorKey = "node-role.kubernetes.io/master"
nodename = r.nodeName
cmd = []string{"/bin/sh"}
args = []string{"-c", copyPayloadCmd(dir)}
)

baseDir, targetName := filepath.Split(dir)
tmpDir := filepath.Join(baseDir, fmt.Sprintf("%s-%s", targetName, randutil.String(5)))

setContainerDefaults := func(container corev1.Container) corev1.Container {
container.Image = image
container.VolumeMounts = []corev1.VolumeMount{{
MountPath: targetUpdatePayloadsDir,
Name: "payloads",
}}
container.SecurityContext = &corev1.SecurityContext{
Privileged: pointer.BoolPtr(true),
}
container.Resources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10m"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourceEphemeralStorage: resource.MustParse("2Mi"),
},
}
return container
}

job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -175,26 +200,39 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri
ActiveDeadlineSeconds: deadline,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "payload",
Image: payload,
Command: cmd,
Args: args,
VolumeMounts: []corev1.VolumeMount{{
MountPath: targetUpdatePayloadsDir,
Name: "payloads",
}},
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.BoolPtr(true),
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10m"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourceEphemeralStorage: resource.MustParse("2Mi"),
InitContainers: []corev1.Container{
setContainerDefaults(corev1.Container{
Name: "cleanup",
Command: []string{"sh", "-c", "rm -fR *"},
WorkingDir: baseDir,
}),
setContainerDefaults(corev1.Container{
Name: "make-temporary-directory",
Command: []string{"mkdir", tmpDir},
}),
setContainerDefaults(corev1.Container{
Name: "move-operator-manifests-to-temporary-directory",
Command: []string{
"mv",
filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir),
filepath.Join(tmpDir, payload.CVOManifestDir),
},
},
}},
}),
setContainerDefaults(corev1.Container{
Name: "move-release-manifests-to-temporary-directory",
Command: []string{
"mv",
filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir),
filepath.Join(tmpDir, payload.ReleaseManifestDir),
},
}),
},
Containers: []corev1.Container{
setContainerDefaults(corev1.Container{
Name: "rename-to-final-location",
Command: []string{"mv", tmpDir, dir},
}),
},
Volumes: []corev1.Volume{{
Name: "payloads",
VolumeSource: corev1.VolumeSource{
Expand All @@ -217,14 +255,7 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri
},
}

// Prune older jobs while gracefully handling errors.
err := r.pruneJobs(ctx, 2)
if err != nil {
klog.Warningf("failed to prune jobs: %v", err)
}

_, err = r.kubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
if _, err := r.kubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}); err != nil {
return err
}
return resourcebuilder.WaitForJobCompletion(ctx, r.kubeClient.BatchV1(), job)
Expand All @@ -248,12 +279,6 @@ func (r *payloadRetriever) pruneJobs(ctx context.Context, retain int) error {
// Ignore jobs not beginning with operatorName
case !strings.HasPrefix(job.Name, r.operatorName+"-"):
break
// Ignore jobs that have not yet started
case job.Status.StartTime == nil:
break
// Ignore jobs that are still active
case job.Status.Active == 1:
break
default:
deleteJobs = append(deleteJobs, job)
}
Expand Down Expand Up @@ -287,24 +312,6 @@ func (r *payloadRetriever) pruneJobs(ctx context.Context, retain int) error {
return nil
}

// copyPayloadCmd returns a shell command that copies CVO and release manifests from the default location
// to the target dir.
// It is made up of 2 commands:
// `mkdir -p <target dir> && mv <default cvo manifest dir> <target cvo manifests dir>`
// `mkdir -p <target dir> && mv <default release manifest dir> <target release manifests dir>`
func copyPayloadCmd(tdir string) string {
var (
fromCVOPath = filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir)
toCVOPath = filepath.Join(tdir, payload.CVOManifestDir)
cvoCmd = fmt.Sprintf("mkdir -p %s && mv %s %s", tdir, fromCVOPath, toCVOPath)

fromReleasePath = filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir)
toReleasePath = filepath.Join(tdir, payload.ReleaseManifestDir)
releaseCmd = fmt.Sprintf("mkdir -p %s && mv %s %s", tdir, fromReleasePath, toReleasePath)
)
return fmt.Sprintf("%s && %s", cvoCmd, releaseCmd)
}

// findUpdateFromConfig identifies a desired update from user input or returns false. It will
// resolve payload if the user specifies a version and a matching available update.
func findUpdateFromConfig(config *configv1.ClusterVersion) (configv1.Update, bool) {
Expand Down