diff --git a/cmd/memberagent/main.go b/cmd/memberagent/main.go index 39d79875a..7101a5d8d 100644 --- a/cmd/memberagent/main.go +++ b/cmd/memberagent/main.go @@ -26,10 +26,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/metrics" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" - workcontrollers "sigs.k8s.io/work-api/pkg/controllers" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/controllers/internalmembercluster" + workapi "go.goms.io/fleet/pkg/controllers/work" fleetmetrics "go.goms.io/fleet/pkg/metrics" "go.goms.io/fleet/pkg/utils" //+kubebuilder:scaffold:imports @@ -200,7 +200,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb } // create the work controller, so we can pass it to the internal member cluster reconciler - workController := workcontrollers.NewApplyWorkReconciler( + workController := workapi.NewApplyWorkReconciler( hubMgr.GetClient(), spokeDynamicClient, memberMgr.GetClient(), diff --git a/go.mod b/go.mod index 6710be27e..6237a3721 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0 github.com/crossplane/crossplane-runtime v0.16.0 + github.com/go-logr/logr v1.2.0 github.com/google/go-cmp v0.5.8 github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/gomega v1.19.0 @@ -40,7 +41,6 @@ require ( github.com/emicklei/go-restful v2.9.5+incompatible // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect - github.com/go-logr/logr v1.2.0 // indirect github.com/go-logr/zapr v1.2.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.6 // indirect diff --git a/pkg/controllers/clusterresourceplacement/work_propagation.go b/pkg/controllers/clusterresourceplacement/work_propagation.go index 15c994446..ad4fc4a09 100644 --- a/pkg/controllers/clusterresourceplacement/work_propagation.go +++ b/pkg/controllers/clusterresourceplacement/work_propagation.go @@ -24,9 +24,9 @@ import ( "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" - workController "sigs.k8s.io/work-api/pkg/controllers" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" + workapi "go.goms.io/fleet/pkg/controllers/work" "go.goms.io/fleet/pkg/utils" ) @@ -174,7 +174,7 @@ func (r *Reconciler) collectAllManifestsStatus(placement *fleetv1alpha1.ClusterR return false, errors.Wrap(err, fmt.Sprintf("failed to get the work obj %s from namespace %s", workName, memberClusterNsName)) } // check the overall condition - appliedCond := meta.FindStatusCondition(work.Status.Conditions, workController.ConditionTypeApplied) + appliedCond := meta.FindStatusCondition(work.Status.Conditions, workapi.ConditionTypeApplied) if appliedCond == nil { hasPending = true klog.V(4).InfoS("the work is never picked up by the member cluster", @@ -201,7 +201,7 @@ func (r *Reconciler) collectAllManifestsStatus(placement *fleetv1alpha1.ClusterR Name: manifestCondition.Identifier.Name, Namespace: manifestCondition.Identifier.Namespace, } - appliedCond = meta.FindStatusCondition(manifestCondition.Conditions, workController.ConditionTypeApplied) + appliedCond = meta.FindStatusCondition(manifestCondition.Conditions, workapi.ConditionTypeApplied) // collect if there is an explicit fail if appliedCond != nil && appliedCond.Status != metav1.ConditionTrue { klog.V(3).InfoS("find a failed to apply manifest", "member cluster namespace", memberClusterNsName, diff --git a/pkg/controllers/internalmembercluster/member_controller.go b/pkg/controllers/internalmembercluster/member_controller.go index 87d8a0d91..4c8f8db19 100644 --- a/pkg/controllers/internalmembercluster/member_controller.go +++ b/pkg/controllers/internalmembercluster/member_controller.go @@ -21,10 +21,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/predicate" - workcontrollers "sigs.k8s.io/work-api/pkg/controllers" "go.goms.io/fleet/apis" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" + workapi "go.goms.io/fleet/pkg/controllers/work" "go.goms.io/fleet/pkg/metrics" ) @@ -36,7 +36,7 @@ type Reconciler struct { // the join/leave agent maintains the list of controllers in the member cluster // so that it can make sure that all the agents on the member cluster have joined/left // before updating the internal member cluster CR status - workController *workcontrollers.ApplyWorkReconciler + workController *workapi.ApplyWorkReconciler recorder record.EventRecorder } @@ -51,7 +51,7 @@ const ( ) // NewReconciler creates a new reconciler for the internalMemberCluster CR -func NewReconciler(hubClient client.Client, memberClient client.Client, workController *workcontrollers.ApplyWorkReconciler) *Reconciler { +func NewReconciler(hubClient client.Client, memberClient client.Client, workController *workapi.ApplyWorkReconciler) *Reconciler { return &Reconciler{ hubClient: hubClient, memberClient: memberClient, diff --git a/pkg/controllers/internalmembercluster/member_controller_integration_test.go b/pkg/controllers/internalmembercluster/member_controller_integration_test.go index 766a9b755..afd2ee03c 100644 --- a/pkg/controllers/internalmembercluster/member_controller_integration_test.go +++ b/pkg/controllers/internalmembercluster/member_controller_integration_test.go @@ -15,9 +15,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" - workcontrollers "sigs.k8s.io/work-api/pkg/controllers" "go.goms.io/fleet/apis/v1alpha1" + workapi "go.goms.io/fleet/pkg/controllers/work" "go.goms.io/fleet/pkg/utils" ) @@ -58,7 +58,7 @@ var _ = Describe("Test Internal Member Cluster Controller", func() { } By("create the internalMemberCluster reconciler") - workController := workcontrollers.NewApplyWorkReconciler( + workController := workapi.NewApplyWorkReconciler( k8sClient, nil, k8sClient, nil, nil, 5, memberClusterNamespace) r = NewReconciler(k8sClient, k8sClient, workController) err := r.SetupWithManager(mgr) diff --git a/pkg/controllers/work/applied_work_syncer.go b/pkg/controllers/work/applied_work_syncer.go new file mode 100644 index 000000000..416afae0b --- /dev/null +++ b/pkg/controllers/work/applied_work_syncer.go @@ -0,0 +1,165 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + + workapi "sigs.k8s.io/work-api/pkg/apis/v1alpha1" +) + +// generateDiff check the difference between what is supposed to be applied (tracked by the work CR status) +// and what was applied in the member cluster (tracked by the appliedWork CR). +// What is in the `appliedWork` but not in the `work` should be deleted from the member cluster +// What is in the `work` but not in the `appliedWork` should be added to the appliedWork status +func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Work, appliedWork *workapi.AppliedWork) ([]workapi.AppliedResourceMeta, []workapi.AppliedResourceMeta, error) { + var staleRes, newRes []workapi.AppliedResourceMeta + // for every resource applied in cluster, check if it's still in the work's manifest condition + // we keep the applied resource in the appliedWork status even if it is not applied successfully + // to make sure that it is safe to delete the resource from the member cluster. + for _, resourceMeta := range appliedWork.Status.AppliedResources { + resStillExist := false + for _, manifestCond := range work.Status.ManifestConditions { + if isSameResourceIdentifier(resourceMeta.ResourceIdentifier, manifestCond.Identifier) { + resStillExist = true + break + } + } + if !resStillExist { + klog.V(2).InfoS("find an orphaned resource in the member cluster", + "parent resource", work.GetName(), "orphaned resource", resourceMeta.ResourceIdentifier) + staleRes = append(staleRes, resourceMeta) + } + } + // add every resource in the work's manifest condition that is applied successfully back to the appliedWork status + for _, manifestCond := range work.Status.ManifestConditions { + ac := meta.FindStatusCondition(manifestCond.Conditions, ConditionTypeApplied) + if ac == nil { + // should not happen + klog.ErrorS(fmt.Errorf("resource is missing applied condition"), "applied condition missing", "resource", manifestCond.Identifier) + continue + } + // we only add the applied one to the appliedWork status + if ac.Status == metav1.ConditionTrue { + resRecorded := false + // we update the identifier + // TODO: this UID may not be the current one if the resource is deleted and recreated + for _, resourceMeta := range appliedWork.Status.AppliedResources { + if isSameResourceIdentifier(resourceMeta.ResourceIdentifier, manifestCond.Identifier) { + resRecorded = true + newRes = append(newRes, workapi.AppliedResourceMeta{ + ResourceIdentifier: manifestCond.Identifier, + UID: resourceMeta.UID, + }) + break + } + } + if !resRecorded { + klog.V(2).InfoS("discovered a new manifest resource", + "parent Work", work.GetName(), "manifest", manifestCond.Identifier) + obj, err := r.spokeDynamicClient.Resource(schema.GroupVersionResource{ + Group: manifestCond.Identifier.Group, + Version: manifestCond.Identifier.Version, + Resource: manifestCond.Identifier.Resource, + }).Namespace(manifestCond.Identifier.Namespace).Get(ctx, manifestCond.Identifier.Name, metav1.GetOptions{}) + switch { + case apierrors.IsNotFound(err): + klog.V(2).InfoS("the new manifest resource is already deleted", "parent Work", work.GetName(), "manifest", manifestCond.Identifier) + continue + case err != nil: + klog.ErrorS(err, "failed to retrieve the manifest", "parent Work", work.GetName(), "manifest", manifestCond.Identifier) + return nil, nil, err + } + newRes = append(newRes, workapi.AppliedResourceMeta{ + ResourceIdentifier: manifestCond.Identifier, + UID: obj.GetUID(), + }) + } + } + } + return newRes, staleRes, nil +} + +func (r *ApplyWorkReconciler) deleteStaleManifest(ctx context.Context, staleManifests []workapi.AppliedResourceMeta, owner metav1.OwnerReference) error { + var errs []error + + for _, staleManifest := range staleManifests { + gvr := schema.GroupVersionResource{ + Group: staleManifest.Group, + Version: staleManifest.Version, + Resource: staleManifest.Resource, + } + uObj, err := r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace). + Get(ctx, staleManifest.Name, metav1.GetOptions{}) + if err != nil { + // It is possible that the staled manifest was already deleted but the status wasn't updated to reflect that yet. + if apierrors.IsNotFound(err) { + klog.V(2).InfoS("the staled manifest already deleted", "manifest", staleManifest, "owner", owner) + continue + } + klog.ErrorS(err, "failed to get the staled manifest", "manifest", staleManifest, "owner", owner) + errs = append(errs, err) + continue + } + existingOwners := uObj.GetOwnerReferences() + newOwners := make([]metav1.OwnerReference, 0) + found := false + for index, r := range existingOwners { + if isReferSameObject(r, owner) { + found = true + newOwners = append(newOwners, existingOwners[:index]...) + newOwners = append(newOwners, existingOwners[index+1:]...) + } + } + if !found { + klog.V(2).InfoS("the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner) + continue + } + if len(newOwners) == 0 { + klog.V(2).InfoS("delete the staled manifest", "manifest", staleManifest, "owner", owner) + err = r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace). + Delete(ctx, staleManifest.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + klog.ErrorS(err, "failed to delete the staled manifest", "manifest", staleManifest, "owner", owner) + errs = append(errs, err) + } + } else { + klog.V(2).InfoS("remove the owner reference from the staled manifest", "manifest", staleManifest, "owner", owner) + uObj.SetOwnerReferences(newOwners) + _, err = r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace).Update(ctx, uObj, metav1.UpdateOptions{FieldManager: workFieldManagerName}) + if err != nil { + klog.ErrorS(err, "failed to remove the owner reference from manifest", "manifest", staleManifest, "owner", owner) + errs = append(errs, err) + } + } + } + return utilerrors.NewAggregate(errs) +} + +// isSameResourceIdentifier returns true if a and b identifies the same object. +func isSameResourceIdentifier(a, b workapi.ResourceIdentifier) bool { + // compare GVKNN but ignore the Ordinal and Resource + return a.Group == b.Group && a.Version == b.Version && a.Kind == b.Kind && a.Namespace == b.Namespace && a.Name == b.Name +} diff --git a/pkg/controllers/work/applied_work_syncer_integration_test.go b/pkg/controllers/work/applied_work_syncer_integration_test.go new file mode 100644 index 000000000..fa108273c --- /dev/null +++ b/pkg/controllers/work/applied_work_syncer_integration_test.go @@ -0,0 +1,322 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + kruisev1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilrand "k8s.io/apimachinery/pkg/util/rand" + + workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" +) + +var _ = Describe("Work Status Reconciler", func() { + var resourceNamespace string + var workNamespace string + var work *workv1alpha1.Work + var cm, cm2 *corev1.ConfigMap + + var wns corev1.Namespace + var rns corev1.Namespace + + BeforeEach(func() { + workNamespace = "cluster-" + utilrand.String(5) + resourceNamespace = utilrand.String(5) + + wns = corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: workNamespace, + }, + } + err := k8sClient.Create(context.Background(), &wns) + Expect(err).ToNot(HaveOccurred()) + + rns = corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceNamespace, + }, + } + err = k8sClient.Create(context.Background(), &rns) + Expect(err).ToNot(HaveOccurred()) + + // Create the Work object with some type of Manifest resource. + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "configmap-" + utilrand.String(5), + Namespace: resourceNamespace, + }, + Data: map[string]string{ + "test": "test", + }, + } + cm2 = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "configmap2-" + utilrand.String(5), + Namespace: resourceNamespace, + }, + Data: map[string]string{ + "test": "test", + }, + } + + By("Create work that contains two configMaps") + work = &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work-" + utilrand.String(5), + Namespace: workNamespace, + }, + Spec: workv1alpha1.WorkSpec{ + Workload: workv1alpha1.WorkloadTemplate{ + Manifests: []workv1alpha1.Manifest{ + { + RawExtension: runtime.RawExtension{Object: cm}, + }, + { + RawExtension: runtime.RawExtension{Object: cm2}, + }, + }, + }, + }, + } + }) + + AfterEach(func() { + // TODO: Ensure that all resources are being deleted. + Expect(k8sClient.Delete(context.Background(), work)).Should(Succeed()) + Expect(k8sClient.Delete(context.Background(), &wns)).Should(Succeed()) + Expect(k8sClient.Delete(context.Background(), &rns)).Should(Succeed()) + }) + + It("Should delete the manifest from the member cluster after it is removed from work", func() { + By("Apply the work") + Expect(k8sClient.Create(context.Background(), work)).ToNot(HaveOccurred()) + + By("Make sure that the work is applied") + currentWork := waitForWorkToApply(work.Name, workNamespace) + var appliedWork workv1alpha1.AppliedWork + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed()) + Expect(len(appliedWork.Status.AppliedResources)).Should(Equal(2)) + + By("Remove configMap 2 from the work") + currentWork.Spec.Workload.Manifests = []workv1alpha1.Manifest{ + { + RawExtension: runtime.RawExtension{Object: cm}, + }, + } + Expect(k8sClient.Update(context.Background(), currentWork)).Should(Succeed()) + + By("Verify that the resource is removed from the cluster") + Eventually(func() bool { + var configMap corev1.ConfigMap + return apierrors.IsNotFound(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap)) + }, timeout, interval).Should(BeTrue()) + + By("Verify that the appliedWork status is correct") + Eventually(func() bool { + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed()) + return len(appliedWork.Status.AppliedResources) == 1 + }, timeout, interval).Should(BeTrue()) + Expect(appliedWork.Status.AppliedResources[0].Name).Should(Equal(cm.GetName())) + Expect(appliedWork.Status.AppliedResources[0].Namespace).Should(Equal(cm.GetNamespace())) + Expect(appliedWork.Status.AppliedResources[0].Version).Should(Equal(cm.GetObjectKind().GroupVersionKind().Version)) + Expect(appliedWork.Status.AppliedResources[0].Group).Should(Equal(cm.GetObjectKind().GroupVersionKind().Group)) + Expect(appliedWork.Status.AppliedResources[0].Kind).Should(Equal(cm.GetObjectKind().GroupVersionKind().Kind)) + }) + + It("Should delete the shared manifest from the member cluster after it is removed from all works", func() { + By("Create another work that contains configMap 2") + work2 := work.DeepCopy() + work2.Name = "work-" + utilrand.String(5) + Expect(k8sClient.Create(context.Background(), work)).ToNot(HaveOccurred()) + Expect(k8sClient.Create(context.Background(), work2)).ToNot(HaveOccurred()) + + By("Make sure that the appliedWork is updated") + var appliedWork, appliedWork2 workv1alpha1.AppliedWork + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork) + if err != nil { + return false + } + return len(appliedWork.Status.AppliedResources) == 2 + }, timeout, interval).Should(BeTrue()) + + By("Make sure that the appliedWork2 is updated") + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: work2.Name}, &appliedWork2) + if err != nil { + return false + } + return len(appliedWork2.Status.AppliedResources) == 2 + }, timeout, interval).Should(BeTrue()) + + By("Remove configMap 2 from the work") + currentWork := waitForWorkToApply(work.Name, workNamespace) + currentWork.Spec.Workload.Manifests = []workv1alpha1.Manifest{ + { + RawExtension: runtime.RawExtension{Object: cm}, + }, + } + Expect(k8sClient.Update(context.Background(), currentWork)).Should(Succeed()) + currentWork = waitForWorkToApply(work.Name, workNamespace) + Expect(len(currentWork.Status.ManifestConditions)).Should(Equal(1)) + + By("Verify that configMap 2 is removed from the appliedWork") + Eventually(func() bool { + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed()) + return len(appliedWork.Status.AppliedResources) == 1 + }, timeout, interval).Should(BeTrue()) + + By("Verify that configMap 2 is not removed from the cluster") + Consistently(func() bool { + var configMap corev1.ConfigMap + return k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap) == nil + }, timeout, interval).Should(BeTrue()) + + By("Remove configMap 2 from the work2") + currentWork = waitForWorkToApply(work2.Name, workNamespace) + currentWork.Spec.Workload.Manifests = []workv1alpha1.Manifest{ + { + RawExtension: runtime.RawExtension{Object: cm}, + }, + } + Expect(k8sClient.Update(context.Background(), currentWork)).Should(Succeed()) + currentWork = waitForWorkToApply(work.Name, workNamespace) + Expect(len(currentWork.Status.ManifestConditions)).Should(Equal(1)) + + By("Verify that the resource is removed from the appliedWork") + Eventually(func() bool { + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work2.Name}, &appliedWork2)).Should(Succeed()) + return len(appliedWork2.Status.AppliedResources) == 1 + }, timeout, interval).Should(BeTrue()) + + By("Verify that the cm2 is removed from the cluster") + Eventually(func() bool { + var configMap corev1.ConfigMap + return apierrors.IsNotFound(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap)) + }, timeout, interval).Should(BeTrue()) + }) + + It("Should delete the manifest from the member cluster even if there is apply failure", func() { + By("Apply the work") + Expect(k8sClient.Create(context.Background(), work)).ToNot(HaveOccurred()) + + By("Make sure that the work is applied") + currentWork := waitForWorkToApply(work.Name, workNamespace) + var appliedWork workv1alpha1.AppliedWork + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed()) + Expect(len(appliedWork.Status.AppliedResources)).Should(Equal(2)) + + By("replace configMap with a bad object from the work") + broadcastJob := &kruisev1alpha1.BroadcastJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: kruisev1alpha1.SchemeGroupVersion.String(), + Kind: "BroadcastJob", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "broadcastjob-" + utilrand.String(5), + Namespace: workNamespace, + }, + Spec: kruisev1alpha1.BroadcastJobSpec{ + Paused: true, + }, + } + currentWork.Spec.Workload.Manifests = []workv1alpha1.Manifest{ + { + RawExtension: runtime.RawExtension{Object: broadcastJob}, + }, + } + Expect(k8sClient.Update(context.Background(), currentWork)).Should(Succeed()) + + By("Verify that the configMaps are removed from the cluster even if the new resource didn't apply") + Eventually(func() bool { + var configMap corev1.ConfigMap + return apierrors.IsNotFound(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm.Name, Namespace: resourceNamespace}, &configMap)) + }, timeout, interval).Should(BeTrue()) + + Eventually(func() bool { + var configMap corev1.ConfigMap + return apierrors.IsNotFound(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap)) + }, timeout, interval).Should(BeTrue()) + + By("Verify that the appliedWork status is correct") + Eventually(func() bool { + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed()) + return len(appliedWork.Status.AppliedResources) == 0 + }, timeout, interval).Should(BeTrue()) + }) + + It("Test the order of the manifest in the work alone does not trigger any operation in the member cluster", func() { + By("Apply the work") + Expect(k8sClient.Create(context.Background(), work)).ToNot(HaveOccurred()) + + By("Make sure that the work is applied") + currentWork := waitForWorkToApply(work.Name, workNamespace) + var appliedWork workv1alpha1.AppliedWork + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed()) + Expect(len(appliedWork.Status.AppliedResources)).Should(Equal(2)) + + By("Make sure that the manifests exist on the member cluster") + Eventually(func() bool { + var configMap corev1.ConfigMap + return k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap) == nil && + k8sClient.Get(context.Background(), types.NamespacedName{Name: cm.Name, Namespace: resourceNamespace}, &configMap) == nil + }, timeout, interval).Should(BeTrue()) + + By("Change the order of the two configs in the work") + currentWork.Spec.Workload.Manifests = []workv1alpha1.Manifest{ + { + RawExtension: runtime.RawExtension{Object: cm2}, + }, + { + RawExtension: runtime.RawExtension{Object: cm}, + }, + } + Expect(k8sClient.Update(context.Background(), currentWork)).Should(Succeed()) + + By("Verify that nothing is removed from the cluster") + Consistently(func() bool { + var configMap corev1.ConfigMap + return k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap) == nil && + k8sClient.Get(context.Background(), types.NamespacedName{Name: cm.Name, Namespace: resourceNamespace}, &configMap) == nil + }, timeout, time.Millisecond*25).Should(BeTrue()) + + By("Verify that the appliedWork status is correct") + Eventually(func() bool { + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed()) + return len(appliedWork.Status.AppliedResources) == 2 + }, timeout, interval).Should(BeTrue()) + Expect(appliedWork.Status.AppliedResources[0].Name).Should(Equal(cm2.GetName())) + Expect(appliedWork.Status.AppliedResources[1].Name).Should(Equal(cm.GetName())) + }) +}) diff --git a/pkg/controllers/work/applied_work_syncer_test.go b/pkg/controllers/work/applied_work_syncer_test.go new file mode 100644 index 000000000..33207d3ce --- /dev/null +++ b/pkg/controllers/work/applied_work_syncer_test.go @@ -0,0 +1,360 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/fake" + testingclient "k8s.io/client-go/testing" + + "sigs.k8s.io/work-api/pkg/apis/v1alpha1" +) + +// TestCalculateNewAppliedWork validates the calculation logic between the Work & AppliedWork resources. +// The result of the tests pass back a collection of resources that should either +// be applied to the member cluster or removed. +func TestCalculateNewAppliedWork(t *testing.T) { + workIdentifier := generateResourceIdentifier() + diffOrdinalIdentifier := workIdentifier + diffOrdinalIdentifier.Ordinal = rand.Int() + tests := map[string]struct { + spokeDynamicClient dynamic.Interface + inputWork v1alpha1.Work + inputAppliedWork v1alpha1.AppliedWork + expectedNewRes []v1alpha1.AppliedResourceMeta + expectedStaleRes []v1alpha1.AppliedResourceMeta + hasErr bool + }{ + "Test work and appliedWork in sync with no manifest applied": { + spokeDynamicClient: nil, + inputWork: generateWorkObj(nil), + inputAppliedWork: generateAppliedWorkObj(nil), + expectedNewRes: []v1alpha1.AppliedResourceMeta(nil), + expectedStaleRes: []v1alpha1.AppliedResourceMeta(nil), + hasErr: false, + }, + "Test work and appliedWork in sync with one manifest applied": { + spokeDynamicClient: nil, + inputWork: generateWorkObj(&workIdentifier), + inputAppliedWork: generateAppliedWorkObj(&workIdentifier), + expectedNewRes: []v1alpha1.AppliedResourceMeta{ + { + ResourceIdentifier: workIdentifier, + }, + }, + expectedStaleRes: []v1alpha1.AppliedResourceMeta(nil), + hasErr: false, + }, + "Test work and appliedWork has the same resource but with different ordinal": { + spokeDynamicClient: nil, + inputWork: generateWorkObj(&workIdentifier), + inputAppliedWork: generateAppliedWorkObj(&diffOrdinalIdentifier), + expectedNewRes: []v1alpha1.AppliedResourceMeta{ + { + ResourceIdentifier: workIdentifier, + }, + }, + expectedStaleRes: []v1alpha1.AppliedResourceMeta(nil), + hasErr: false, + }, + "Test work is missing one manifest": { + spokeDynamicClient: nil, + inputWork: generateWorkObj(nil), + inputAppliedWork: generateAppliedWorkObj(&workIdentifier), + expectedNewRes: []v1alpha1.AppliedResourceMeta(nil), + expectedStaleRes: []v1alpha1.AppliedResourceMeta{ + { + ResourceIdentifier: workIdentifier, + }, + }, + hasErr: false, + }, + "Test work has more manifest but not applied": { + spokeDynamicClient: nil, + inputWork: func() v1alpha1.Work { + return v1alpha1.Work{ + Status: v1alpha1.WorkStatus{ + ManifestConditions: []v1alpha1.ManifestCondition{ + { + Identifier: workIdentifier, + Conditions: []metav1.Condition{ + { + Type: ConditionTypeApplied, + Status: metav1.ConditionFalse, + }, + }, + }, + }, + }, + } + }(), + inputAppliedWork: generateAppliedWorkObj(nil), + expectedNewRes: []v1alpha1.AppliedResourceMeta(nil), + expectedStaleRes: []v1alpha1.AppliedResourceMeta(nil), + hasErr: false, + }, + "Test work is adding one manifest, happy case": { + spokeDynamicClient: func() *fake.FakeDynamicClient { + uObj := unstructured.Unstructured{} + uObj.SetUID(types.UID(rand.String(10))) + dynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, uObj.DeepCopy(), nil + }) + return dynamicClient + }(), + inputWork: generateWorkObj(&workIdentifier), + inputAppliedWork: generateAppliedWorkObj(nil), + expectedNewRes: []v1alpha1.AppliedResourceMeta{ + { + ResourceIdentifier: workIdentifier, + }, + }, + expectedStaleRes: []v1alpha1.AppliedResourceMeta(nil), + hasErr: false, + }, + "Test work is adding one manifest but not found on the member cluster": { + spokeDynamicClient: func() *fake.FakeDynamicClient { + dynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonNotFound, + }} + }) + return dynamicClient + }(), + inputWork: generateWorkObj(&workIdentifier), + inputAppliedWork: generateAppliedWorkObj(nil), + expectedNewRes: []v1alpha1.AppliedResourceMeta(nil), + expectedStaleRes: []v1alpha1.AppliedResourceMeta(nil), + hasErr: false, + }, + "Test work is adding one manifest but failed to get it on the member cluster": { + spokeDynamicClient: func() *fake.FakeDynamicClient { + dynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("get failed") + }) + return dynamicClient + }(), + inputWork: generateWorkObj(&workIdentifier), + inputAppliedWork: generateAppliedWorkObj(nil), + expectedNewRes: nil, + expectedStaleRes: nil, + hasErr: true, + }, + } + for testName, tt := range tests { + t.Run(testName, func(t *testing.T) { + r := &ApplyWorkReconciler{ + spokeDynamicClient: tt.spokeDynamicClient, + } + newRes, staleRes, err := r.generateDiff(context.Background(), &tt.inputWork, &tt.inputAppliedWork) + if len(tt.expectedNewRes) != len(newRes) { + t.Errorf("Testcase %s: get newRes contains different number of elements than the expected newRes.", testName) + } + for i := 0; i < len(newRes); i++ { + diff := cmp.Diff(tt.expectedNewRes[i].ResourceIdentifier, newRes[i].ResourceIdentifier) + if len(diff) != 0 { + t.Errorf("Testcase %s: get newRes is different from the expected newRes, diff = %s", testName, diff) + } + } + if len(tt.expectedStaleRes) != len(staleRes) { + t.Errorf("Testcase %s: get staleRes contains different number of elements than the expected staleRes.", testName) + } + for i := 0; i < len(staleRes); i++ { + diff := cmp.Diff(tt.expectedStaleRes[i].ResourceIdentifier, staleRes[i].ResourceIdentifier) + if len(diff) != 0 { + t.Errorf("Testcase %s: get staleRes is different from the expected staleRes, diff = %s", testName, diff) + } + } + if tt.hasErr { + assert.Truef(t, err != nil, "Testcase %s: Should get an err.", testName) + } + }) + } +} + +func TestDeleteStaleManifest(t *testing.T) { + tests := map[string]struct { + spokeDynamicClient dynamic.Interface + staleManifests []v1alpha1.AppliedResourceMeta + owner metav1.OwnerReference + wantErr error + }{ + "test staled manifests already deleted": { + spokeDynamicClient: func() *fake.FakeDynamicClient { + dynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonNotFound, + }} + }) + return dynamicClient + }(), + staleManifests: []v1alpha1.AppliedResourceMeta{ + { + ResourceIdentifier: v1alpha1.ResourceIdentifier{ + Name: "does not matter 1", + }, + }, + { + ResourceIdentifier: v1alpha1.ResourceIdentifier{ + Name: "does not matter 2", + }, + }, + }, + owner: metav1.OwnerReference{ + APIVersion: "does not matter", + }, + wantErr: nil, + }, + "test failed to get staled manifest": { + spokeDynamicClient: func() *fake.FakeDynamicClient { + dynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("get failed") + }) + return dynamicClient + }(), + staleManifests: []v1alpha1.AppliedResourceMeta{ + { + ResourceIdentifier: v1alpha1.ResourceIdentifier{ + Name: "does not matter", + }, + }, + }, + owner: metav1.OwnerReference{ + APIVersion: "does not matter", + }, + wantErr: utilerrors.NewAggregate([]error{fmt.Errorf("get failed")}), + }, + "test not remove a staled manifest that work does not own": { + spokeDynamicClient: func() *fake.FakeDynamicClient { + uObj := unstructured.Unstructured{} + uObj.SetOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "not owned by work", + }, + }) + dynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, uObj.DeepCopy(), nil + }) + dynamicClient.PrependReactor("delete", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("should not call") + }) + return dynamicClient + }(), + staleManifests: []v1alpha1.AppliedResourceMeta{ + { + ResourceIdentifier: v1alpha1.ResourceIdentifier{ + Name: "does not matter", + }, + }, + }, + owner: metav1.OwnerReference{ + APIVersion: "does not match", + }, + wantErr: nil, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + r := &ApplyWorkReconciler{ + spokeDynamicClient: tt.spokeDynamicClient, + } + gotErr := r.deleteStaleManifest(context.Background(), tt.staleManifests, tt.owner) + if tt.wantErr == nil { + if gotErr != nil { + t.Errorf("test case `%s` didn't return the exepected error, want no error, got error = %+v ", name, gotErr) + } + } else if gotErr == nil || gotErr.Error() != tt.wantErr.Error() { + t.Errorf("test case `%s` didn't return the exepected error, want error = %+v, got error = %+v", name, tt.wantErr, gotErr) + } + }) + } +} + +func generateWorkObj(identifier *v1alpha1.ResourceIdentifier) v1alpha1.Work { + if identifier != nil { + return v1alpha1.Work{ + Status: v1alpha1.WorkStatus{ + ManifestConditions: []v1alpha1.ManifestCondition{ + { + Identifier: *identifier, + Conditions: []metav1.Condition{ + { + Type: ConditionTypeApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + }, + } + } + return v1alpha1.Work{} +} + +func generateAppliedWorkObj(identifier *v1alpha1.ResourceIdentifier) v1alpha1.AppliedWork { + if identifier != nil { + return v1alpha1.AppliedWork{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{}, + Spec: v1alpha1.AppliedWorkSpec{}, + Status: v1alpha1.AppliedtWorkStatus{ + AppliedResources: []v1alpha1.AppliedResourceMeta{ + { + ResourceIdentifier: *identifier, + UID: types.UID(rand.String(20)), + }, + }, + }, + } + } + return v1alpha1.AppliedWork{} +} + +func generateResourceIdentifier() v1alpha1.ResourceIdentifier { + return v1alpha1.ResourceIdentifier{ + Ordinal: rand.Int(), + Group: rand.String(10), + Version: rand.String(10), + Kind: rand.String(10), + Resource: rand.String(10), + Namespace: rand.String(10), + Name: rand.String(10), + } +} diff --git a/pkg/controllers/work/apply_controller.go b/pkg/controllers/work/apply_controller.go new file mode 100644 index 000000000..8af9f4625 --- /dev/null +++ b/pkg/controllers/work/apply_controller.go @@ -0,0 +1,636 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "time" + + "go.uber.org/atomic" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" +) + +const ( + workFieldManagerName = "work-api-agent" +) + +// ApplyWorkReconciler reconciles a Work object +type ApplyWorkReconciler struct { + client client.Client + spokeDynamicClient dynamic.Interface + spokeClient client.Client + restMapper meta.RESTMapper + recorder record.EventRecorder + concurrency int + workNameSpace string + joined *atomic.Bool +} + +func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client, + restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, workNameSpace string) *ApplyWorkReconciler { + return &ApplyWorkReconciler{ + client: hubClient, + spokeDynamicClient: spokeDynamicClient, + spokeClient: spokeClient, + restMapper: restMapper, + recorder: recorder, + concurrency: concurrency, + workNameSpace: workNameSpace, + joined: atomic.NewBool(false), + } +} + +// applyResult contains the result of a manifest being applied. +type applyResult struct { + identifier workv1alpha1.ResourceIdentifier + generation int64 + updated bool + err error +} + +// Reconcile implement the control loop logic for Work object. +func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if !r.joined.Load() { + klog.V(2).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + klog.InfoS("work apply controller reconcile loop triggered.", "work", req.NamespacedName) + + // Fetch the work resource + work := &workv1alpha1.Work{} + err := r.client.Get(ctx, req.NamespacedName, work) + switch { + case apierrors.IsNotFound(err): + klog.V(2).InfoS("the work resource is deleted", "work", req.NamespacedName) + return ctrl.Result{}, nil + case err != nil: + klog.ErrorS(err, "failed to retrieve the work", "work", req.NamespacedName) + return ctrl.Result{}, err + } + logObjRef := klog.KObj(work) + + // Handle deleting work, garbage collect the resources + if !work.DeletionTimestamp.IsZero() { + klog.V(2).InfoS("resource is in the process of being deleted", work.Kind, logObjRef) + return r.garbageCollectAppliedWork(ctx, work) + } + + // ensure that the appliedWork and the finalizer exist + appliedWork, err := r.ensureAppliedWork(ctx, work) + if err != nil { + return ctrl.Result{}, err + } + owner := metav1.OwnerReference{ + APIVersion: workv1alpha1.GroupVersion.String(), + Kind: workv1alpha1.AppliedWorkKind, + Name: appliedWork.GetName(), + UID: appliedWork.GetUID(), + BlockOwnerDeletion: pointer.Bool(false), + } + + // Apply the manifests to the member cluster + results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner) + + // generate the work condition based on the manifest apply result + errs := r.generateWorkCondition(results, work) + + // update the work status + if err = r.client.Status().Update(ctx, work, &client.UpdateOptions{}); err != nil { + klog.ErrorS(err, "failed to update work status", "work", logObjRef) + return ctrl.Result{}, err + } + if len(errs) == 0 { + klog.InfoS("successfully applied the work to the cluster", "work", logObjRef) + r.recorder.Event(work, v1.EventTypeNormal, "ApplyWorkSucceed", "apply the work successfully") + } + + // now we sync the status from work to appliedWork no matter if apply succeeds or not + newRes, staleRes, genErr := r.generateDiff(ctx, work, appliedWork) + if genErr != nil { + klog.ErrorS(err, "failed to generate the diff between work status and appliedWork status", work.Kind, logObjRef) + return ctrl.Result{}, err + } + // delete all the manifests that should not be in the cluster. + if err = r.deleteStaleManifest(ctx, staleRes, owner); err != nil { + klog.ErrorS(err, "resource garbage-collection incomplete; some Work owned resources could not be deleted", work.Kind, logObjRef) + // we can't proceed to update the applied + return ctrl.Result{}, err + } else if len(staleRes) > 0 { + klog.V(2).InfoS("successfully garbage-collected all stale manifests", work.Kind, logObjRef, "number of GCed res", len(staleRes)) + for _, res := range staleRes { + klog.V(5).InfoS("successfully garbage-collected a stale manifest", work.Kind, logObjRef, "res", res) + } + } + + // update the appliedWork with the new work after the stales are deleted + appliedWork.Status.AppliedResources = newRes + if err = r.spokeClient.Status().Update(ctx, appliedWork, &client.UpdateOptions{}); err != nil { + klog.ErrorS(err, "failed to update appliedWork status", appliedWork.Kind, appliedWork.GetName()) + return ctrl.Result{}, err + } + err = utilerrors.NewAggregate(errs) + if err != nil { + klog.ErrorS(err, "manifest apply incomplete; the message is queued again for reconciliation", + "work", logObjRef) + } + + // we periodically reconcile the work to make sure the member cluster state is in sync with the work + // even if the reconciling succeeds in case the resources on the member cluster is removed/changed. + return ctrl.Result{RequeueAfter: time.Minute * 5}, err +} + +// garbageCollectAppliedWork deletes the appliedWork and all the manifests associated with it from the cluster. +func (r *ApplyWorkReconciler) garbageCollectAppliedWork(ctx context.Context, work *workv1alpha1.Work) (ctrl.Result, error) { + deletePolicy := metav1.DeletePropagationBackground + if !controllerutil.ContainsFinalizer(work, workFinalizer) { + return ctrl.Result{}, nil + } + // delete the appliedWork which will remove all the manifests associated with it + // TODO: allow orphaned manifest + appliedWork := workv1alpha1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{Name: work.Name}, + } + err := r.spokeClient.Delete(ctx, &appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy}) + switch { + case apierrors.IsNotFound(err): + klog.V(2).InfoS("the appliedWork is already deleted", "appliedWork", work.Name) + case err != nil: + klog.ErrorS(err, "failed to delete the appliedWork", "appliedWork", work.Name) + return ctrl.Result{}, err + default: + klog.InfoS("successfully deleted the appliedWork", "appliedWork", work.Name) + } + controllerutil.RemoveFinalizer(work, workFinalizer) + return ctrl.Result{}, r.client.Update(ctx, work, &client.UpdateOptions{}) +} + +// ensureAppliedWork makes sure that an associated appliedWork and a finalizer on the work resource exsits on the cluster. +func (r *ApplyWorkReconciler) ensureAppliedWork(ctx context.Context, work *workv1alpha1.Work) (*workv1alpha1.AppliedWork, error) { + workRef := klog.KObj(work) + appliedWork := &workv1alpha1.AppliedWork{} + hasFinalizer := false + if controllerutil.ContainsFinalizer(work, workFinalizer) { + hasFinalizer = true + err := r.spokeClient.Get(ctx, types.NamespacedName{Name: work.Name}, appliedWork) + switch { + case apierrors.IsNotFound(err): + klog.ErrorS(err, "appliedWork finalizer resource does not exist even with the finalizer, it will be recreated", "appliedWork", workRef.Name) + case err != nil: + klog.ErrorS(err, "failed to retrieve the appliedWork ", "appliedWork", workRef.Name) + return nil, err + default: + return appliedWork, nil + } + } + + // we create the appliedWork before setting the finalizer, so it should always exist unless it's deleted behind our back + appliedWork = &workv1alpha1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: work.Name, + }, + Spec: workv1alpha1.AppliedWorkSpec{ + WorkName: work.Name, + WorkNamespace: work.Namespace, + }, + } + if err := r.spokeClient.Create(ctx, appliedWork); err != nil && !apierrors.IsAlreadyExists(err) { + klog.ErrorS(err, "appliedWork create failed", "appliedWork", workRef.Name) + return nil, err + } + if !hasFinalizer { + klog.InfoS("add the finalizer to the work", "work", workRef) + work.Finalizers = append(work.Finalizers, workFinalizer) + return appliedWork, r.client.Update(ctx, work, &client.UpdateOptions{}) + } + klog.InfoS("recreated the appliedWork resource", "appliedWork", workRef.Name) + return appliedWork, nil +} + +// applyManifests processes a given set of Manifests by: setting ownership, validating the manifest, and passing it on for application to the cluster. +func (r *ApplyWorkReconciler) applyManifests(ctx context.Context, manifests []workv1alpha1.Manifest, owner metav1.OwnerReference) []applyResult { + var appliedObj *unstructured.Unstructured + + results := make([]applyResult, len(manifests)) + for index, manifest := range manifests { + var result applyResult + gvr, rawObj, err := r.decodeManifest(manifest) + switch { + case err != nil: + result.err = err + result.identifier = workv1alpha1.ResourceIdentifier{ + Ordinal: index, + } + if rawObj != nil { + result.identifier.Group = rawObj.GroupVersionKind().Group + result.identifier.Version = rawObj.GroupVersionKind().Version + result.identifier.Kind = rawObj.GroupVersionKind().Kind + result.identifier.Namespace = rawObj.GetNamespace() + result.identifier.Name = rawObj.GetName() + } + + default: + addOwnerRef(owner, rawObj) + appliedObj, result.updated, result.err = r.applyUnstructured(ctx, gvr, rawObj) + result.identifier = buildResourceIdentifier(index, rawObj, gvr) + logObjRef := klog.ObjectRef{ + Name: result.identifier.Name, + Namespace: result.identifier.Namespace, + } + if result.err == nil { + result.generation = appliedObj.GetGeneration() + if result.updated { + klog.V(2).InfoS("manifest upsert succeeded", "gvr", gvr, "manifest", logObjRef, "new ObservedGeneration", result.generation) + } else { + klog.V(2).InfoS("manifest upsert unwarranted", "gvr", gvr, "manifest", logObjRef) + } + } else { + klog.ErrorS(result.err, "manifest upsert failed", "gvr", gvr, "manifest", logObjRef) + } + } + results[index] = result + } + return results +} + +// Decodes the manifest into usable structs. +func (r *ApplyWorkReconciler) decodeManifest(manifest workv1alpha1.Manifest) (schema.GroupVersionResource, *unstructured.Unstructured, error) { + unstructuredObj := &unstructured.Unstructured{} + err := unstructuredObj.UnmarshalJSON(manifest.Raw) + if err != nil { + return schema.GroupVersionResource{}, nil, fmt.Errorf("failed to decode object: %w", err) + } + + mapping, err := r.restMapper.RESTMapping(unstructuredObj.GroupVersionKind().GroupKind(), unstructuredObj.GroupVersionKind().Version) + if err != nil { + return schema.GroupVersionResource{}, unstructuredObj, fmt.Errorf("failed to find group/version/resource from restmapping: %w", err) + } + + return mapping.Resource, unstructuredObj, nil +} + +// Determines if an unstructured manifest object can & should be applied. If so, it applies (creates) the resource on the cluster. +func (r *ApplyWorkReconciler) applyUnstructured(ctx context.Context, gvr schema.GroupVersionResource, manifestObj *unstructured.Unstructured) (*unstructured.Unstructured, bool, error) { + manifestRef := klog.ObjectRef{ + Name: manifestObj.GetName(), + Namespace: manifestObj.GetNamespace(), + } + // compute the hash without taking into consider the last applied annotation + if err := setManifestHashAnnotation(manifestObj); err != nil { + return nil, false, err + } + + // extract the common create procedure to reuse + var createFunc = func() (*unstructured.Unstructured, bool, error) { + // record the raw manifest with the hash annotation in the manifest + if err := setModifiedConfigurationAnnotation(manifestObj); err != nil { + return nil, false, err + } + actual, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Create( + ctx, manifestObj, metav1.CreateOptions{FieldManager: workFieldManagerName}) + if err == nil { + klog.V(2).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef) + return actual, true, nil + } + return nil, false, err + } + + // support resources with generated name + if manifestObj.GetName() == "" && manifestObj.GetGenerateName() != "" { + klog.InfoS("create the resource with generated name regardless", "gvr", gvr, "manifest", manifestRef) + return createFunc() + } + + // get the current object and create one if not found + curObj, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Get(ctx, manifestObj.GetName(), metav1.GetOptions{}) + switch { + case apierrors.IsNotFound(err): + return createFunc() + case err != nil: + return nil, false, err + } + + // check if the existing manifest is managed by the work + if !isManifestManagedByWork(curObj.GetOwnerReferences()) { + err = fmt.Errorf("resource is not managed by the work controller") + klog.ErrorS(err, "skip applying a not managed manifest", "gvr", gvr, "obj", manifestRef) + return nil, false, err + } + + // We only try to update the object if its spec hash value has changed. + if manifestObj.GetAnnotations()[manifestHashAnnotation] != curObj.GetAnnotations()[manifestHashAnnotation] { + return r.patchCurrentResource(ctx, gvr, manifestObj, curObj) + } + + return curObj, false, nil +} + +// patchCurrentResource uses three way merge to patch the current resource with the new manifest we get from the work. +func (r *ApplyWorkReconciler) patchCurrentResource(ctx context.Context, gvr schema.GroupVersionResource, + manifestObj, curObj *unstructured.Unstructured) (*unstructured.Unstructured, bool, error) { + manifestRef := klog.ObjectRef{ + Name: manifestObj.GetName(), + Namespace: manifestObj.GetNamespace(), + } + klog.V(5).InfoS("manifest is modified", "gvr", gvr, "manifest", manifestRef, + "new hash", manifestObj.GetAnnotations()[manifestHashAnnotation], + "existing hash", curObj.GetAnnotations()[manifestHashAnnotation]) + + // we need to merge the owner reference between the current and the manifest since we support one manifest + // belong to multiple work so it contains the union of all the appliedWork + manifestObj.SetOwnerReferences(mergeOwnerReference(curObj.GetOwnerReferences(), manifestObj.GetOwnerReferences())) + // record the raw manifest with the hash annotation in the manifest + if err := setModifiedConfigurationAnnotation(manifestObj); err != nil { + return nil, false, err + } + // create the three-way merge patch between the current, original and manifest similar to how kubectl apply does + patch, err := threeWayMergePatch(curObj, manifestObj) + if err != nil { + klog.ErrorS(err, "failed to generate the three way patch", "gvr", gvr, "manifest", manifestRef) + return nil, false, err + } + data, err := patch.Data(manifestObj) + if err != nil { + klog.ErrorS(err, "failed to generate the three way patch", "gvr", gvr, "manifest", manifestRef) + return nil, false, err + } + // Use client side apply the patch to the member cluster + manifestObj, patchErr := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()). + Patch(ctx, manifestObj.GetName(), patch.Type(), data, metav1.PatchOptions{FieldManager: workFieldManagerName}) + if patchErr != nil { + klog.ErrorS(patchErr, "failed to patch the manifest", "gvr", gvr, "manifest", manifestRef) + return nil, false, patchErr + } + klog.V(2).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef) + return manifestObj, true, nil +} + +// generateWorkCondition constructs the work condition based on the apply result +func (r *ApplyWorkReconciler) generateWorkCondition(results []applyResult, work *workv1alpha1.Work) []error { + var errs []error + // Update manifestCondition based on the results. + manifestConditions := make([]workv1alpha1.ManifestCondition, len(results)) + for index, result := range results { + if result.err != nil { + errs = append(errs, result.err) + } + appliedCondition := buildManifestAppliedCondition(result.err, result.updated, result.generation) + manifestCondition := workv1alpha1.ManifestCondition{ + Identifier: result.identifier, + Conditions: []metav1.Condition{appliedCondition}, + } + foundmanifestCondition := findManifestConditionByIdentifier(result.identifier, work.Status.ManifestConditions) + if foundmanifestCondition != nil { + manifestCondition.Conditions = foundmanifestCondition.Conditions + meta.SetStatusCondition(&manifestCondition.Conditions, appliedCondition) + } + manifestConditions[index] = manifestCondition + } + + work.Status.ManifestConditions = manifestConditions + workCond := generateWorkAppliedCondition(manifestConditions, work.Generation) + work.Status.Conditions = []metav1.Condition{workCond} + return errs +} + +// Join starts to reconcile +func (r *ApplyWorkReconciler) Join(ctx context.Context) error { + if !r.joined.Load() { + klog.InfoS("mark the apply work reconciler joined") + } + r.joined.Store(true) + return nil +} + +// Leave start +func (r *ApplyWorkReconciler) Leave(ctx context.Context) error { + var works workv1alpha1.WorkList + if r.joined.Load() { + klog.InfoS("mark the apply work reconciler left") + } + r.joined.Store(false) + // list all the work object we created in the member cluster namespace + listOpts := []client.ListOption{ + client.InNamespace(r.workNameSpace), + } + if err := r.client.List(ctx, &works, listOpts...); err != nil { + klog.ErrorS(err, "failed to list all the work object", "clusterNS", r.workNameSpace) + return client.IgnoreNotFound(err) + } + // we leave the resources on the member cluster for now + for _, work := range works.Items { + staleWork := work.DeepCopy() + if controllerutil.ContainsFinalizer(staleWork, workFinalizer) { + controllerutil.RemoveFinalizer(staleWork, workFinalizer) + if updateErr := r.client.Update(ctx, staleWork, &client.UpdateOptions{}); updateErr != nil { + klog.ErrorS(updateErr, "failed to remove the work finalizer from the work", + "clusterNS", r.workNameSpace, "work", klog.KObj(staleWork)) + return updateErr + } + } + } + klog.V(2).InfoS("successfully removed all the work finalizers in the cluster namespace", + "clusterNS", r.workNameSpace, "number of work", len(works.Items)) + return nil +} + +// SetupWithManager wires up the controller. +func (r *ApplyWorkReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + MaxConcurrentReconciles: r.concurrency, + }). + For(&workv1alpha1.Work{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Complete(r) +} + +// Generates a hash of the spec annotation from an unstructured object after we remove all the fields +// we have modified. +func computeManifestHash(obj *unstructured.Unstructured) (string, error) { + manifest := obj.DeepCopy() + // remove the last applied Annotation to avoid unlimited recursion + annotation := manifest.GetAnnotations() + if annotation != nil { + delete(annotation, manifestHashAnnotation) + delete(annotation, lastAppliedConfigAnnotation) + if len(annotation) == 0 { + manifest.SetAnnotations(nil) + } else { + manifest.SetAnnotations(annotation) + } + } + // strip the live object related fields just in case + manifest.SetResourceVersion("") + manifest.SetGeneration(0) + manifest.SetUID("") + manifest.SetSelfLink("") + manifest.SetDeletionTimestamp(nil) + manifest.SetManagedFields(nil) + unstructured.RemoveNestedField(manifest.Object, "metadata", "creationTimestamp") + unstructured.RemoveNestedField(manifest.Object, "status") + // compute the sha256 hash of the remaining data + jsonBytes, err := json.Marshal(manifest.Object) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", sha256.Sum256(jsonBytes)), nil +} + +// isManifestManagedByWork determines if an object is managed by the work controller. +func isManifestManagedByWork(ownerRefs []metav1.OwnerReference) bool { + // an object is managed by the work if any of its owner reference is of type appliedWork + for _, ownerRef := range ownerRefs { + if ownerRef.APIVersion == workv1alpha1.GroupVersion.String() && ownerRef.Kind == workv1alpha1.AppliedWorkKind { + return true + } + } + return false +} + +// findManifestConditionByIdentifier return a ManifestCondition by identifier +// 1. Find the manifest condition with the whole identifier. +// 2. If identifier only has ordinal, and a matched cannot be found, return nil. +// 3. Try to find properties, other than the ordinal, within the identifier. +func findManifestConditionByIdentifier(identifier workv1alpha1.ResourceIdentifier, manifestConditions []workv1alpha1.ManifestCondition) *workv1alpha1.ManifestCondition { + for _, manifestCondition := range manifestConditions { + if identifier == manifestCondition.Identifier { + return &manifestCondition + } + } + + if identifier == (workv1alpha1.ResourceIdentifier{Ordinal: identifier.Ordinal}) { + return nil + } + + identifierCopy := identifier.DeepCopy() + for _, manifestCondition := range manifestConditions { + identifierCopy.Ordinal = manifestCondition.Identifier.Ordinal + if *identifierCopy == manifestCondition.Identifier { + return &manifestCondition + } + } + return nil +} + +// setManifestHashAnnotation computes the hash of the provided manifest and sets an annotation of the +// hash on the provided unstructured object. +func setManifestHashAnnotation(manifestObj *unstructured.Unstructured) error { + manifestHash, err := computeManifestHash(manifestObj) + if err != nil { + return err + } + + annotation := manifestObj.GetAnnotations() + if annotation == nil { + annotation = map[string]string{} + } + annotation[manifestHashAnnotation] = manifestHash + manifestObj.SetAnnotations(annotation) + return nil +} + +// Builds a resource identifier for a given unstructured.Unstructured object. +func buildResourceIdentifier(index int, object *unstructured.Unstructured, gvr schema.GroupVersionResource) workv1alpha1.ResourceIdentifier { + return workv1alpha1.ResourceIdentifier{ + Ordinal: index, + Group: object.GroupVersionKind().Group, + Version: object.GroupVersionKind().Version, + Kind: object.GroupVersionKind().Kind, + Namespace: object.GetNamespace(), + Name: object.GetName(), + Resource: gvr.Resource, + } +} + +func buildManifestAppliedCondition(err error, updated bool, observedGeneration int64) metav1.Condition { + if err != nil { + return metav1.Condition{ + Type: ConditionTypeApplied, + Status: metav1.ConditionFalse, + ObservedGeneration: observedGeneration, + LastTransitionTime: metav1.Now(), + Reason: "appliedManifestFailed", + Message: fmt.Sprintf("Failed to apply manifest: %v", err), + } + } + + if updated { + return metav1.Condition{ + Type: ConditionTypeApplied, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + ObservedGeneration: observedGeneration, + Reason: "appliedManifestUpdated", + Message: "appliedManifest updated", + } + } + return metav1.Condition{ + Type: ConditionTypeApplied, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + ObservedGeneration: observedGeneration, + Reason: "appliedManifestComplete", + Message: "Apply manifest complete", + } +} + +// generateWorkAppliedCondition generate appied status condition for work. +// If one of the manifests is applied failed on the spoke, the applied status condition of the work is false. +func generateWorkAppliedCondition(manifestConditions []workv1alpha1.ManifestCondition, observedGeneration int64) metav1.Condition { + for _, manifestCond := range manifestConditions { + if meta.IsStatusConditionFalse(manifestCond.Conditions, ConditionTypeApplied) { + return metav1.Condition{ + Type: ConditionTypeApplied, + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.Now(), + Reason: "appliedWorkFailed", + Message: "Failed to apply work", + ObservedGeneration: observedGeneration, + } + } + } + + return metav1.Condition{ + Type: ConditionTypeApplied, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "appliedWorkComplete", + Message: "Apply work complete", + ObservedGeneration: observedGeneration, + } +} diff --git a/pkg/controllers/work/apply_controller_helper_test.go b/pkg/controllers/work/apply_controller_helper_test.go new file mode 100644 index 000000000..e4aaf29e1 --- /dev/null +++ b/pkg/controllers/work/apply_controller_helper_test.go @@ -0,0 +1,95 @@ +package controllers + +import ( + "context" + + "github.com/google/go-cmp/cmp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilrand "k8s.io/apimachinery/pkg/util/rand" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" +) + +// createWorkWithManifest creates a work given a manifest +func createWorkWithManifest(workNamespace string, manifest runtime.Object) *workv1alpha1.Work { + manifestCopy := manifest.DeepCopyObject() + newWork := workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work-" + utilrand.String(5), + Namespace: workNamespace, + }, + Spec: workv1alpha1.WorkSpec{ + Workload: workv1alpha1.WorkloadTemplate{ + Manifests: []workv1alpha1.Manifest{ + { + RawExtension: runtime.RawExtension{Object: manifestCopy}, + }, + }, + }, + }, + } + return &newWork +} + +// verifyAppliedConfigMap verifies that the applied CM is the same as the CM we want to apply +func verifyAppliedConfigMap(cm *corev1.ConfigMap) *corev1.ConfigMap { + var appliedCM corev1.ConfigMap + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm.GetName(), Namespace: cm.GetNamespace()}, &appliedCM)).Should(Succeed()) + + By("Check the config map label") + Expect(cmp.Diff(appliedCM.Labels, cm.Labels)).Should(BeEmpty()) + + By("Check the config map annotation value") + Expect(len(appliedCM.Annotations)).Should(Equal(len(cm.Annotations) + 2)) // we added 2 more annotations + for key := range cm.Annotations { + Expect(appliedCM.Annotations[key]).Should(Equal(cm.Annotations[key])) + } + Expect(appliedCM.Annotations[manifestHashAnnotation]).ShouldNot(BeNil()) + Expect(appliedCM.Annotations[lastAppliedConfigAnnotation]).ShouldNot(BeNil()) + + By("Check the config map data") + Expect(cmp.Diff(appliedCM.Data, cm.Data)).Should(BeEmpty()) + return &appliedCM +} + +// waitForWorkToApply waits for a work to be applied +func waitForWorkToApply(workName, workNS string) *workv1alpha1.Work { + var resultWork workv1alpha1.Work + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: workName, Namespace: workNS}, &resultWork) + if err != nil { + return false + } + applyCond := meta.FindStatusCondition(resultWork.Status.Conditions, ConditionTypeApplied) + if applyCond == nil || applyCond.Status != metav1.ConditionTrue || applyCond.ObservedGeneration != resultWork.Generation { + return false + } + for _, manifestCondition := range resultWork.Status.ManifestConditions { + if !meta.IsStatusConditionTrue(manifestCondition.Conditions, ConditionTypeApplied) { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + return &resultWork +} + +// waitForWorkToBeHandled waits for a work to have a finalizer +func waitForWorkToBeHandled(workName, workNS string) *workv1alpha1.Work { + var resultWork workv1alpha1.Work + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: workName, Namespace: workNS}, &resultWork) + if err != nil { + return false + } + return controllerutil.ContainsFinalizer(&resultWork, workFinalizer) + }, timeout, interval).Should(BeTrue()) + return &resultWork +} diff --git a/pkg/controllers/work/apply_controller_integration_test.go b/pkg/controllers/work/apply_controller_integration_test.go new file mode 100644 index 000000000..09ce74f79 --- /dev/null +++ b/pkg/controllers/work/apply_controller_integration_test.go @@ -0,0 +1,655 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/go-cmp/cmp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + kruisev1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + utilrand "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" +) + +const timeout = time.Second * 10 +const interval = time.Millisecond * 250 + +var _ = Describe("Work Controller", func() { + var workNamespace string + var ns corev1.Namespace + var cm *corev1.ConfigMap + var work *workv1alpha1.Work + const defaultNS = "default" + + BeforeEach(func() { + workNamespace = "work-" + utilrand.String(5) + // Create namespace + ns = corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: workNamespace, + }, + } + err := k8sClient.Create(context.Background(), &ns) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + // Add any teardown steps that needs to be executed after each test + err := k8sClient.Delete(context.Background(), &ns) + Expect(err).ToNot(HaveOccurred()) + }) + + Context("Test single work propagation", func() { + It("Should have a configmap deployed correctly", func() { + cmName := "testcm" + cmNamespace := defaultNS + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: cmNamespace, + }, + Data: map[string]string{ + "test": "test", + }, + } + + By("create the work") + work = createWorkWithManifest(workNamespace, cm) + err := k8sClient.Create(context.Background(), work) + Expect(err).ToNot(HaveOccurred()) + + resultWork := waitForWorkToApply(work.GetName(), work.GetNamespace()) + Expect(len(resultWork.Status.ManifestConditions)).Should(Equal(1)) + Expect(meta.IsStatusConditionTrue(resultWork.Status.Conditions, ConditionTypeApplied)).Should(BeTrue()) + Expect(meta.IsStatusConditionTrue(resultWork.Status.ManifestConditions[0].Conditions, ConditionTypeApplied)).Should(BeTrue()) + expectedResourceID := workv1alpha1.ResourceIdentifier{ + Ordinal: 0, + Group: "", + Version: "v1", + Kind: "ConfigMap", + Resource: "configmaps", + Namespace: cmNamespace, + Name: cm.Name, + } + Expect(cmp.Diff(resultWork.Status.ManifestConditions[0].Identifier, expectedResourceID)).Should(BeEmpty()) + + By("Check applied config map") + var configMap corev1.ConfigMap + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cmName, Namespace: cmNamespace}, &configMap)).Should(Succeed()) + Expect(cmp.Diff(configMap.Labels, cm.Labels)).Should(BeEmpty()) + Expect(cmp.Diff(configMap.Data, cm.Data)).Should(BeEmpty()) + + }) + + It("Should apply the same manifest in two work properly", func() { + cmName := "test-multiple-owner" + cmNamespace := defaultNS + cm := &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: cmNamespace, + }, + Data: map[string]string{ + "data1": "test1", + }, + } + + work1 := createWorkWithManifest(workNamespace, cm) + work2 := work1.DeepCopy() + work2.Name = "test-work-2" + + By("create the first work") + err := k8sClient.Create(context.Background(), work1) + Expect(err).ToNot(HaveOccurred()) + + By("create the second work") + err = k8sClient.Create(context.Background(), work2) + Expect(err).ToNot(HaveOccurred()) + + waitForWorkToApply(work1.GetName(), workNamespace) + waitForWorkToApply(work2.GetName(), workNamespace) + + By("Check applied config map") + var configMap corev1.ConfigMap + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cmName, Namespace: cmNamespace}, &configMap)).Should(Succeed()) + Expect(len(configMap.Data)).Should(Equal(1)) + Expect(configMap.Data["data1"]).Should(Equal(cm.Data["data1"])) + Expect(len(configMap.OwnerReferences)).Should(Equal(2)) + Expect(configMap.OwnerReferences[0].APIVersion).Should(Equal(workv1alpha1.GroupVersion.String())) + Expect(configMap.OwnerReferences[0].Kind).Should(Equal(workv1alpha1.AppliedWorkKind)) + Expect(configMap.OwnerReferences[1].APIVersion).Should(Equal(workv1alpha1.GroupVersion.String())) + Expect(configMap.OwnerReferences[1].Kind).Should(Equal(workv1alpha1.AppliedWorkKind)) + // GC does not work in the testEnv + By("delete the second work") + Expect(k8sClient.Delete(context.Background(), work2)).Should(Succeed()) + By("check that the applied work2 is deleted") + var appliedWork workv1alpha1.AppliedWork + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: work2.Name}, &appliedWork) + return apierrors.IsNotFound(err) + }, timeout, interval).Should(BeTrue()) + + By("delete the first work") + Expect(k8sClient.Delete(context.Background(), work1)).Should(Succeed()) + By("check that the applied work1 and config map is deleted") + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: work2.Name}, &appliedWork) + return apierrors.IsNotFound(err) + }, timeout, interval).Should(BeTrue()) + }) + + It("Should pick up the built-in manifest change correctly", func() { + cmName := "testconfig" + cmNamespace := defaultNS + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: cmNamespace, + Labels: map[string]string{ + "labelKey1": "value1", + "labelKey2": "value2", + }, + Annotations: map[string]string{ + "annotationKey1": "annotation1", + "annotationKey2": "annotation2", + }, + }, + Data: map[string]string{ + "data1": "test1", + }, + } + + By("create the work") + work = createWorkWithManifest(workNamespace, cm) + Expect(k8sClient.Create(context.Background(), work)).ToNot(HaveOccurred()) + + By("wait for the work to be applied") + waitForWorkToApply(work.GetName(), work.GetNamespace()) + + By("Check applied config map") + verifyAppliedConfigMap(cm) + + By("Modify the configMap manifest") + // add new data + cm.Data["data2"] = "test2" + // modify one data + cm.Data["data1"] = "newValue" + // modify label key1 + cm.Labels["labelKey1"] = "newValue" + // remove label key2 + delete(cm.Labels, "labelKey2") + // add annotations key3 + cm.Annotations["annotationKey3"] = "annotation3" + // remove annotations key1 + delete(cm.Annotations, "annotationKey1") + + By("update the work") + resultWork := waitForWorkToApply(work.GetName(), work.GetNamespace()) + rawCM, err := json.Marshal(cm) + Expect(err).Should(Succeed()) + resultWork.Spec.Workload.Manifests[0].Raw = rawCM + Expect(k8sClient.Update(ctx, resultWork)).Should(Succeed()) + + By("wait for the change of the work to be applied") + waitForWorkToApply(work.GetName(), work.GetNamespace()) + + By("verify that applied configMap took all the changes") + verifyAppliedConfigMap(cm) + }) + + It("Should merge the third party change correctly", func() { + cmName := "test-merge" + cmNamespace := defaultNS + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: cmNamespace, + Labels: map[string]string{ + "labelKey1": "value1", + "labelKey2": "value2", + "labelKey3": "value3", + }, + }, + Data: map[string]string{ + "data1": "test1", + }, + } + + By("create the work") + work = createWorkWithManifest(workNamespace, cm) + err := k8sClient.Create(context.Background(), work) + Expect(err).ToNot(HaveOccurred()) + + By("wait for the work to be applied") + waitForWorkToApply(work.GetName(), work.GetNamespace()) + + By("Check applied configMap") + appliedCM := verifyAppliedConfigMap(cm) + + By("Modify and update the applied configMap") + // add a new data + appliedCM.Data["data2"] = "another value" + // add a new data + appliedCM.Data["data3"] = "added data by third party" + // modify label key1 + appliedCM.Labels["labelKey1"] = "third-party-label" + // remove label key2 and key3 + delete(cm.Labels, "labelKey2") + delete(cm.Labels, "labelKey3") + Expect(k8sClient.Update(context.Background(), appliedCM)).Should(Succeed()) + + By("Get the last applied config map and verify it's updated") + var modifiedCM corev1.ConfigMap + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm.GetName(), Namespace: cm.GetNamespace()}, &modifiedCM)).Should(Succeed()) + Expect(cmp.Diff(appliedCM.Labels, modifiedCM.Labels)).Should(BeEmpty()) + Expect(cmp.Diff(appliedCM.Data, modifiedCM.Data)).Should(BeEmpty()) + + By("Modify the manifest") + // modify one data + cm.Data["data1"] = "modifiedValue" + // add a conflict data + cm.Data["data2"] = "added by manifest" + // change label key3 with a new value + cm.Labels["labelKey3"] = "added-back-by-manifest" + + By("update the work") + resultWork := waitForWorkToApply(work.GetName(), work.GetNamespace()) + rawCM, err := json.Marshal(cm) + Expect(err).Should(Succeed()) + resultWork.Spec.Workload.Manifests[0].Raw = rawCM + Expect(k8sClient.Update(context.Background(), resultWork)).Should(Succeed()) + + By("wait for the change of the work to be applied") + waitForWorkToApply(work.GetName(), work.GetNamespace()) + + By("Get the last applied config map") + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cmName, Namespace: cmNamespace}, appliedCM)).Should(Succeed()) + + By("Check the config map data") + // data1's value picks up our change + // data2 is value is overridden by our change + // data3 is added by the third party + expectedData := map[string]string{ + "data1": "modifiedValue", + "data2": "added by manifest", + "data3": "added data by third party", + } + Expect(cmp.Diff(appliedCM.Data, expectedData)).Should(BeEmpty()) + + By("Check the config map label") + // key1's value is override back even if we didn't change it + // key2 is deleted by third party since we didn't change it + // key3's value added back after we change the value + expectedLabel := map[string]string{ + "labelKey1": "value1", + "labelKey3": "added-back-by-manifest", + } + Expect(cmp.Diff(appliedCM.Labels, expectedLabel)).Should(BeEmpty()) + }) + + It("Should pick up the crd change correctly", func() { + cloneName := "testcloneset" + cloneNamespace := defaultNS + cloneSet := &kruisev1alpha1.CloneSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: kruisev1alpha1.SchemeGroupVersion.String(), + Kind: "CloneSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cloneName, + Namespace: cloneNamespace, + }, + Spec: kruisev1alpha1.CloneSetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "region", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"us", "eu"}, + }, + { + Key: "prod", + Operator: metav1.LabelSelectorOpDoesNotExist, + }, + }, + }, + }, + } + + By("create the work") + work = createWorkWithManifest(workNamespace, cloneSet) + err := k8sClient.Create(context.Background(), work) + Expect(err).ToNot(HaveOccurred()) + + By("wait for the work to be applied") + waitForWorkToApply(work.GetName(), work.GetNamespace()) + + By("Check applied CloneSet") + var appliedCloneSet kruisev1alpha1.CloneSet + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cloneName, Namespace: cloneNamespace}, &appliedCloneSet)).Should(Succeed()) + + By("verify the CloneSet spec") + Expect(cmp.Diff(appliedCloneSet.Spec, cloneSet.Spec)).Should(BeEmpty()) + + By("Modify and update the applied CloneSet") + // add/modify/remove a match + appliedCloneSet.Spec.Selector.MatchExpressions = []metav1.LabelSelectorRequirement{ + { + Key: "region", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"asia"}, + }, + { + Key: "extra", + Operator: metav1.LabelSelectorOpExists, + }, + } + appliedCloneSet.Spec.ScaleStrategy.PodsToDelete = []string{"a", "b"} + appliedCloneSet.Spec.MinReadySeconds = 10 + Expect(k8sClient.Update(context.Background(), &appliedCloneSet)).Should(Succeed()) + + By("Verify applied CloneSet modified") + var modifiedCloneSet kruisev1alpha1.CloneSet + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cloneName, Namespace: cloneNamespace}, &modifiedCloneSet)).Should(Succeed()) + Expect(cmp.Diff(appliedCloneSet.Spec, modifiedCloneSet.Spec)).Should(BeEmpty()) + + By("Modify the cloneset") + cloneSet.Spec.Selector.MatchExpressions = []metav1.LabelSelectorRequirement{ + { + Key: "region", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"us", "asia", "eu"}, + }, + } + cloneSet.Spec.Replicas = pointer.Int32Ptr(10) + cloneSet.Spec.MinReadySeconds = 1 + maxuavail := intstr.FromInt(10) + cloneSet.Spec.ScaleStrategy.MaxUnavailable = &maxuavail + By("update the work") + resultWork := waitForWorkToApply(work.GetName(), work.GetNamespace()) + rawCM, err := json.Marshal(cloneSet) + Expect(err).Should(Succeed()) + resultWork.Spec.Workload.Manifests[0].Raw = rawCM + Expect(k8sClient.Update(context.Background(), resultWork)).Should(Succeed()) + waitForWorkToApply(work.GetName(), work.GetNamespace()) + + By("Get the last applied cloneset") + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cloneName, Namespace: cloneNamespace}, &appliedCloneSet)).Should(Succeed()) + + By("Check the cloneset spec, its an overide for arrays") + expectStrategy := kruisev1alpha1.CloneSetScaleStrategy{ + PodsToDelete: []string{"a", "b"}, + MaxUnavailable: &maxuavail, + } + Expect(cmp.Diff(appliedCloneSet.Spec.ScaleStrategy, expectStrategy)).Should(BeEmpty()) + Expect(cmp.Diff(appliedCloneSet.Spec.Selector, cloneSet.Spec.Selector)).Should(BeEmpty()) + Expect(cmp.Diff(appliedCloneSet.Spec.Replicas, pointer.Int32Ptr(10))).Should(BeEmpty()) + Expect(cmp.Diff(appliedCloneSet.Spec.MinReadySeconds, int32(1))).Should(BeEmpty()) + }) + + It("Check that owner references is merged instead of override", func() { + cmName := "test-ownerreference-merge" + cmNamespace := defaultNS + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: cmNamespace, + }, + Data: map[string]string{ + "test": "test", + }, + } + + By("create the work") + work = createWorkWithManifest(workNamespace, cm) + Expect(k8sClient.Create(context.Background(), work)).ToNot(HaveOccurred()) + + By("create another work that includes the configMap") + work2 := createWorkWithManifest(workNamespace, cm) + Expect(k8sClient.Create(context.Background(), work2)).ToNot(HaveOccurred()) + + By("wait for the change of the work1 to be applied") + waitForWorkToApply(work.GetName(), work.GetNamespace()) + + By("wait for the change of the work2 to be applied") + waitForWorkToApply(work2.GetName(), work2.GetNamespace()) + + By("verify the owner reference is merged") + var appliedCM corev1.ConfigMap + Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm.GetName(), Namespace: cm.GetNamespace()}, &appliedCM)).Should(Succeed()) + + By("Check the config map label") + Expect(len(appliedCM.OwnerReferences)).Should(Equal(2)) + Expect(appliedCM.OwnerReferences[0].APIVersion).Should(Equal(workv1alpha1.GroupVersion.String())) + Expect(appliedCM.OwnerReferences[0].Name).Should(SatisfyAny(Equal(work.GetName()), Equal(work2.GetName()))) + Expect(appliedCM.OwnerReferences[1].APIVersion).Should(Equal(workv1alpha1.GroupVersion.String())) + Expect(appliedCM.OwnerReferences[1].Name).Should(SatisfyAny(Equal(work.GetName()), Equal(work2.GetName()))) + }) + + It("Check that failed to apply manifest has the proper identification", func() { + broadcastName := "testfail" + namespace := defaultNS + broadcastJob := &kruisev1alpha1.BroadcastJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: kruisev1alpha1.SchemeGroupVersion.String(), + Kind: "BroadcastJob", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: broadcastName, + Namespace: namespace, + }, + Spec: kruisev1alpha1.BroadcastJobSpec{ + Paused: true, + }, + } + work = createWorkWithManifest(workNamespace, broadcastJob) + err := k8sClient.Create(context.Background(), work) + Expect(err).ToNot(HaveOccurred()) + + By("wait for the work to be applied") + var resultWork workv1alpha1.Work + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name, Namespace: work.GetNamespace()}, &resultWork) + if err != nil { + return false + } + applyCond := meta.FindStatusCondition(resultWork.Status.Conditions, ConditionTypeApplied) + if applyCond == nil || applyCond.Status != metav1.ConditionFalse || applyCond.ObservedGeneration != resultWork.Generation { + return false + } + if !meta.IsStatusConditionFalse(resultWork.Status.ManifestConditions[0].Conditions, ConditionTypeApplied) { + return false + } + return true + }, timeout, interval).Should(BeTrue()) + expectedResourceID := workv1alpha1.ResourceIdentifier{ + Ordinal: 0, + Group: "apps.kruise.io", + Version: "v1alpha1", + Kind: "BroadcastJob", + Namespace: broadcastJob.GetNamespace(), + Name: broadcastJob.GetName(), + } + Expect(cmp.Diff(resultWork.Status.ManifestConditions[0].Identifier, expectedResourceID)).Should(BeEmpty()) + }) + }) + + Context("Test multiple work propagation", func() { + var works []*workv1alpha1.Work + + AfterEach(func() { + for _, staleWork := range works { + err := k8sClient.Delete(context.Background(), staleWork) + Expect(err).ToNot(HaveOccurred()) + } + }) + + It("Test join and leave work correctly", func() { + By("create the works") + var configMap corev1.ConfigMap + cmNamespace := defaultNS + var cmNames []string + numWork := 10 + data := map[string]string{ + "test-key-1": "test-value-1", + "test-key-2": "test-value-2", + "test-key-3": "test-value-3", + } + + for i := 0; i < numWork; i++ { + cmName := "testcm-" + utilrand.String(10) + cmNames = append(cmNames, cmName) + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: cmNamespace, + }, + Data: data, + } + // make sure we can call join as many as possible + Expect(workController.Join(ctx)).Should(Succeed()) + work = createWorkWithManifest(workNamespace, cm) + err := k8sClient.Create(ctx, work) + Expect(err).ToNot(HaveOccurred()) + By(fmt.Sprintf("created the work = %s", work.GetName())) + works = append(works, work) + } + + By("make sure the works are handled") + for i := 0; i < numWork; i++ { + waitForWorkToBeHandled(works[i].GetName(), works[i].GetNamespace()) + } + + By("mark the work controller as leave") + Eventually(func() error { + return workController.Leave(ctx) + }, timeout, interval).Should(Succeed()) + + By("make sure the manifests have no finalizer and its status match the member cluster") + newData := map[string]string{ + "test-key-1": "test-value-1", + "test-key-2": "test-value-2", + "test-key-3": "test-value-3", + "new-test-key-1": "test-value-4", + "new-test-key-2": "test-value-5", + } + for i := 0; i < numWork; i++ { + var resultWork workv1alpha1.Work + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: works[i].GetName(), Namespace: workNamespace}, &resultWork)).Should(Succeed()) + Expect(controllerutil.ContainsFinalizer(work, workFinalizer)).Should(BeFalse()) + applyCond := meta.FindStatusCondition(resultWork.Status.Conditions, ConditionTypeApplied) + if applyCond != nil && applyCond.Status == metav1.ConditionTrue && applyCond.ObservedGeneration == resultWork.Generation { + By("the work is applied, check if the applied config map is still there") + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed()) + Expect(cmp.Diff(configMap.Data, data)).Should(BeEmpty()) + } else { + By("the work is not applied, verify that the applied config map is not there") + err := k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap) + Expect(apierrors.IsNotFound(err)).Should(BeTrue()) + } + // make sure that leave can be called as many times as possible + Expect(workController.Leave(ctx)).Should(Succeed()) + By(fmt.Sprintf("change the work = %s", work.GetName())) + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmNames[i], + Namespace: cmNamespace, + }, + Data: newData, + } + rawCM, err := json.Marshal(cm) + Expect(err).Should(Succeed()) + resultWork.Spec.Workload.Manifests[0].Raw = rawCM + Expect(k8sClient.Update(ctx, &resultWork)).Should(Succeed()) + } + + By("make sure the update in the work is not picked up") + Consistently(func() bool { + for i := 0; i < numWork; i++ { + By(fmt.Sprintf("updated the work = %s", works[i].GetName())) + var resultWork workv1alpha1.Work + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: works[i].GetName(), Namespace: workNamespace}, &resultWork) + Expect(err).Should(Succeed()) + Expect(controllerutil.ContainsFinalizer(&resultWork, workFinalizer)).Should(BeFalse()) + applyCond := meta.FindStatusCondition(resultWork.Status.Conditions, ConditionTypeApplied) + if applyCond != nil && applyCond.Status == metav1.ConditionTrue && applyCond.ObservedGeneration == resultWork.Generation { + return false + } + By("check if the config map is not changed") + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed()) + Expect(cmp.Diff(configMap.Data, data)).Should(BeEmpty()) + } + return true + }, timeout, interval).Should(BeTrue()) + + By("enable the work controller again") + Expect(workController.Join(ctx)).Should(Succeed()) + + By("make sure the work change get picked up") + for i := 0; i < numWork; i++ { + resultWork := waitForWorkToApply(works[i].GetName(), works[i].GetNamespace()) + Expect(len(resultWork.Status.ManifestConditions)).Should(Equal(1)) + Expect(meta.IsStatusConditionTrue(resultWork.Status.ManifestConditions[0].Conditions, ConditionTypeApplied)).Should(BeTrue()) + By("the work is applied, check if the applied config map is updated") + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed()) + Expect(cmp.Diff(configMap.Data, newData)).Should(BeEmpty()) + } + }) + }) +}) diff --git a/pkg/controllers/work/apply_controller_test.go b/pkg/controllers/work/apply_controller_test.go new file mode 100644 index 000000000..e2192f18f --- /dev/null +++ b/pkg/controllers/work/apply_controller_test.go @@ -0,0 +1,908 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "encoding/json" + "fmt" + "math" + "reflect" + "testing" + "time" + + "github.com/crossplane/crossplane-runtime/pkg/test" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + utilrand "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/fake" + testingclient "k8s.io/client-go/testing" + "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" + + "go.goms.io/fleet/pkg/utils" +) + +var ( + fakeDynamicClient = fake.NewSimpleDynamicClient(runtime.NewScheme()) + ownerRef = metav1.OwnerReference{ + APIVersion: workv1alpha1.GroupVersion.String(), + Kind: "AppliedWork", + } + testGvr = schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "Deployment", + } + testDeployment = appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "Deployment", + OwnerReferences: []metav1.OwnerReference{ + ownerRef, + }, + }, + Spec: appsv1.DeploymentSpec{ + MinReadySeconds: 5, + }, + } + rawTestDeployment, _ = json.Marshal(testDeployment) + testManifest = workv1alpha1.Manifest{RawExtension: runtime.RawExtension{ + Raw: rawTestDeployment, + }} +) + +// This interface is needed for testMapper abstract class. +type testMapper struct { + meta.RESTMapper +} + +func (m testMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) { + if gk.Kind == "Deployment" { + return &meta.RESTMapping{ + Resource: testGvr, + GroupVersionKind: testDeployment.GroupVersionKind(), + Scope: nil, + }, nil + } + return nil, errors.New("test error: mapping does not exist") +} + +func TestSetManifestHashAnnotation(t *testing.T) { + // basic setup + manifestObj := appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "Deployment", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: utilrand.String(10), + Kind: utilrand.String(10), + Name: utilrand.String(10), + UID: types.UID(utilrand.String(10)), + }, + }, + Annotations: map[string]string{utilrand.String(10): utilrand.String(10)}, + }, + Spec: appsv1.DeploymentSpec{ + Paused: true, + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RecreateDeploymentStrategyType, + }, + }, + Status: appsv1.DeploymentStatus{ + ReadyReplicas: 1, + }, + } + // pre-compute the hash + preObj := manifestObj.DeepCopy() + var uPreObj unstructured.Unstructured + uPreObj.Object, _ = runtime.DefaultUnstructuredConverter.ToUnstructured(preObj) + preHash, _ := computeManifestHash(&uPreObj) + + tests := map[string]struct { + manifestObj interface{} + isSame bool + }{ + "manifest same, same": { + manifestObj: func() *appsv1.Deployment { + extraObj := manifestObj.DeepCopy() + return extraObj + }(), + isSame: true, + }, + "manifest status changed, same": { + manifestObj: func() *appsv1.Deployment { + extraObj := manifestObj.DeepCopy() + extraObj.Status.ReadyReplicas = 10 + return extraObj + }(), + isSame: true, + }, + "manifest's has hashAnnotation, same": { + manifestObj: func() *appsv1.Deployment { + alterObj := manifestObj.DeepCopy() + alterObj.Annotations[manifestHashAnnotation] = utilrand.String(10) + return alterObj + }(), + isSame: true, + }, + "manifest has extra metadata, same": { + manifestObj: func() *appsv1.Deployment { + noObj := manifestObj.DeepCopy() + noObj.SetSelfLink(utilrand.String(2)) + noObj.SetResourceVersion(utilrand.String(4)) + noObj.SetGeneration(3) + noObj.SetUID(types.UID(utilrand.String(3))) + noObj.SetCreationTimestamp(metav1.Now()) + return noObj + }(), + isSame: true, + }, + "manifest has a new appliedWork ownership, need update": { + manifestObj: func() *appsv1.Deployment { + alterObj := manifestObj.DeepCopy() + alterObj.OwnerReferences[0].APIVersion = workv1alpha1.GroupVersion.String() + alterObj.OwnerReferences[0].Kind = workv1alpha1.AppliedWorkKind + return alterObj + }(), + isSame: false, + }, + "manifest is has changed ownership, need update": { + manifestObj: func() *appsv1.Deployment { + alterObj := manifestObj.DeepCopy() + alterObj.OwnerReferences[0].APIVersion = utilrand.String(10) + return alterObj + }(), + isSame: false, + }, + "manifest has a different label, need update": { + manifestObj: func() *appsv1.Deployment { + alterObj := manifestObj.DeepCopy() + alterObj.SetLabels(map[string]string{utilrand.String(5): utilrand.String(10)}) + return alterObj + }(), + isSame: false, + }, + "manifest has a different annotation, need update": { + manifestObj: func() *appsv1.Deployment { + alterObj := manifestObj.DeepCopy() + alterObj.SetAnnotations(map[string]string{utilrand.String(5): utilrand.String(10)}) + return alterObj + }(), + isSame: false, + }, + "manifest has a different spec, need update": { + manifestObj: func() *appsv1.Deployment { + alterObj := manifestObj.DeepCopy() + alterObj.Spec.Replicas = pointer.Int32Ptr(100) + return alterObj + }(), + isSame: false, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + var uManifestObj unstructured.Unstructured + uManifestObj.Object, _ = runtime.DefaultUnstructuredConverter.ToUnstructured(tt.manifestObj) + err := setManifestHashAnnotation(&uManifestObj) + if err != nil { + t.Error("failed to marshall the manifest", err.Error()) + } + manifestHash := uManifestObj.GetAnnotations()[manifestHashAnnotation] + if tt.isSame != (manifestHash == preHash) { + t.Errorf("testcase %s failed: manifestObj = (%+v)", name, tt.manifestObj) + } + }) + } +} + +func TestIsManifestManagedByWork(t *testing.T) { + tests := map[string]struct { + ownerRefs []metav1.OwnerReference + isManaged bool + }{ + "empty owner list": { + ownerRefs: nil, + isManaged: false, + }, + "no appliedWork": { + ownerRefs: []metav1.OwnerReference{ + { + APIVersion: workv1alpha1.GroupVersion.String(), + Kind: workv1alpha1.WorkKind, + }, + }, + isManaged: false, + }, + "one appliedWork": { + ownerRefs: []metav1.OwnerReference{ + { + APIVersion: workv1alpha1.GroupVersion.String(), + Kind: workv1alpha1.AppliedWorkKind, + Name: utilrand.String(10), + UID: types.UID(utilrand.String(10)), + }, + }, + isManaged: true, + }, + "multiple appliedWork": { + ownerRefs: []metav1.OwnerReference{ + { + APIVersion: workv1alpha1.GroupVersion.String(), + Kind: workv1alpha1.AppliedWorkKind, + Name: utilrand.String(10), + UID: types.UID(utilrand.String(10)), + }, + { + APIVersion: workv1alpha1.GroupVersion.String(), + Kind: workv1alpha1.AppliedWorkKind, + UID: types.UID(utilrand.String(10)), + }, + }, + isManaged: true, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + assert.Equalf(t, tt.isManaged, isManifestManagedByWork(tt.ownerRefs), "isManifestManagedByWork(%v)", tt.ownerRefs) + }) + } +} + +func TestApplyUnstructured(t *testing.T) { + correctObj, correctDynamicClient, correctSpecHash := createObjAndDynamicClient(testManifest.Raw) + + testDeploymentGenerated := testDeployment.DeepCopy() + testDeploymentGenerated.Name = "" + testDeploymentGenerated.GenerateName = utilrand.String(10) + rawGenerated, _ := json.Marshal(testDeploymentGenerated) + generatedSpecObj, generatedSpecDynamicClient, generatedSpecHash := createObjAndDynamicClient(rawGenerated) + + testDeploymentDiffSpec := testDeployment.DeepCopy() + testDeploymentDiffSpec.Spec.MinReadySeconds = 0 + rawDiffSpec, _ := json.Marshal(testDeploymentDiffSpec) + diffSpecObj, diffSpecDynamicClient, diffSpecHash := createObjAndDynamicClient(rawDiffSpec) + + patchFailClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + patchFailClient.PrependReactor("patch", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("patch failed") + }) + patchFailClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, diffSpecObj.DeepCopy(), nil + }) + + dynamicClientNotFound := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClientNotFound.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return false, + nil, + &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonNotFound, + }} + }) + + dynamicClientError := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClientError.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, + nil, + errors.New("client error") + }) + + testDeploymentWithDifferentOwner := appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "Deployment", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: utilrand.String(10), + Kind: utilrand.String(10), + Name: utilrand.String(10), + UID: types.UID(utilrand.String(10)), + }, + }, + }, + } + rawTestDeploymentWithDifferentOwner, _ := json.Marshal(testDeploymentWithDifferentOwner) + _, diffOwnerDynamicClient, _ := createObjAndDynamicClient(rawTestDeploymentWithDifferentOwner) + + specHashFailObj := correctObj.DeepCopy() + specHashFailObj.Object["test"] = math.Inf(1) + + testCases := map[string]struct { + reconciler ApplyWorkReconciler + workObj *unstructured.Unstructured + resultSpecHash string + resultBool bool + resultErr error + }{ + "test creation succeeds when the object does not exist": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: dynamicClientNotFound, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + }, + workObj: correctObj.DeepCopy(), + resultSpecHash: correctSpecHash, + resultBool: true, + resultErr: nil, + }, + "test creation succeeds when the object has a generated name": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: generatedSpecDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + }, + workObj: generatedSpecObj.DeepCopy(), + resultSpecHash: generatedSpecHash, + resultBool: true, + resultErr: nil, + }, + "client error looking for object / fail": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: dynamicClientError, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + }, + workObj: correctObj.DeepCopy(), + resultBool: false, + resultErr: errors.New("client error"), + }, + "owner reference comparison failure / fail": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: diffOwnerDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + }, + workObj: correctObj.DeepCopy(), + resultBool: false, + resultErr: errors.New("resource is not managed by the work controller"), + }, + "equal spec hash of current vs work object / succeed without updates": { + reconciler: ApplyWorkReconciler{ + spokeDynamicClient: correctDynamicClient, + recorder: utils.NewFakeRecorder(1), + }, + workObj: correctObj.DeepCopy(), + resultSpecHash: correctSpecHash, + resultBool: false, + resultErr: nil, + }, + "unequal spec hash of current vs work object / client patch fail": { + reconciler: ApplyWorkReconciler{ + spokeDynamicClient: patchFailClient, + recorder: utils.NewFakeRecorder(1), + }, + workObj: correctObj.DeepCopy(), + resultBool: false, + resultErr: errors.New("patch failed"), + }, + "happy path - with updates": { + reconciler: ApplyWorkReconciler{ + spokeDynamicClient: diffSpecDynamicClient, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + }, + workObj: correctObj, + resultSpecHash: diffSpecHash, + resultBool: true, + resultErr: nil, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + applyResult, applyResultBool, err := testCase.reconciler.applyUnstructured(context.Background(), testGvr, testCase.workObj) + assert.Equalf(t, testCase.resultBool, applyResultBool, "updated boolean not matching for Testcase %s", testName) + if testCase.resultErr != nil { + assert.Containsf(t, err.Error(), testCase.resultErr.Error(), "error not matching for Testcase %s", testName) + } else { + assert.Truef(t, err == nil, "err is not nil for Testcase %s", testName) + assert.Truef(t, applyResult != nil, "applyResult is not nil for Testcase %s", testName) + assert.Equalf(t, testCase.resultSpecHash, applyResult.GetAnnotations()[manifestHashAnnotation], + "specHash not matching for Testcase %s", testName) + assert.Equalf(t, ownerRef, applyResult.GetOwnerReferences()[0], "ownerRef not matching for Testcase %s", testName) + } + }) + } +} + +func TestApplyManifest(t *testing.T) { + failMsg := "manifest apply failed" + // Manifests + rawInvalidResource, _ := json.Marshal([]byte(utilrand.String(10))) + rawMissingResource, _ := json.Marshal( + v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "core/v1", + }, + }) + InvalidManifest := workv1alpha1.Manifest{RawExtension: runtime.RawExtension{ + Raw: rawInvalidResource, + }} + MissingManifest := workv1alpha1.Manifest{RawExtension: runtime.RawExtension{ + Raw: rawMissingResource, + }} + + // GVRs + expectedGvr := schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "Deployment", + } + emptyGvr := schema.GroupVersionResource{} + + // DynamicClients + clientFailDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + clientFailDynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New(failMsg) + }) + + testCases := map[string]struct { + reconciler ApplyWorkReconciler + manifestList []workv1alpha1.Manifest + generation int64 + updated bool + wantGvr schema.GroupVersionResource + wantErr error + }{ + "manifest is in proper format/ happy path": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: fakeDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + manifestList: append([]workv1alpha1.Manifest{}, testManifest), + generation: 0, + updated: true, + wantGvr: expectedGvr, + wantErr: nil, + }, + "manifest has incorrect syntax/ decode fail": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: fakeDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + manifestList: append([]workv1alpha1.Manifest{}, InvalidManifest), + generation: 0, + updated: false, + wantGvr: emptyGvr, + wantErr: &json.UnmarshalTypeError{ + Value: "string", + Type: reflect.TypeOf(map[string]interface{}{}), + }, + }, + "manifest is correct / object not mapped in restmapper / decode fail": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: fakeDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + manifestList: append([]workv1alpha1.Manifest{}, MissingManifest), + generation: 0, + updated: false, + wantGvr: emptyGvr, + wantErr: errors.New("failed to find group/version/resource from restmapping: test error: mapping does not exist"), + }, + "manifest is in proper format/ should fail applyUnstructured": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: clientFailDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + manifestList: append([]workv1alpha1.Manifest{}, testManifest), + generation: 0, + updated: false, + wantGvr: expectedGvr, + wantErr: errors.New(failMsg), + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + resultList := testCase.reconciler.applyManifests(context.Background(), testCase.manifestList, ownerRef) + for _, result := range resultList { + if testCase.wantErr != nil { + assert.Containsf(t, result.err.Error(), testCase.wantErr.Error(), "Incorrect error for Testcase %s", testName) + } + assert.Equalf(t, testCase.generation, result.generation, "Testcase %s: generation incorrect", testName) + assert.Equalf(t, testCase.updated, result.updated, "Testcase %s: Updated boolean incorrect", testName) + } + }) + } +} + +func TestReconcile(t *testing.T) { + failMsg := "manifest apply failed" + workNamespace := utilrand.String(10) + workName := utilrand.String(10) + appliedWorkName := utilrand.String(10) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workNamespace, + Name: workName, + }, + } + wrongReq := ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: utilrand.String(10), + Name: utilrand.String(10), + }, + } + invalidReq := ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "", + Name: "", + }, + } + + getMock := func(ctx context.Context, key client.ObjectKey, obj client.Object) error { + if key.Namespace != workNamespace { + return &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonNotFound, + }} + } + o, _ := obj.(*workv1alpha1.Work) + *o = workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: workNamespace, + Name: workName, + Finalizers: []string{workFinalizer}, + }, + Spec: workv1alpha1.WorkSpec{Workload: workv1alpha1.WorkloadTemplate{Manifests: []workv1alpha1.Manifest{testManifest}}}, + } + return nil + } + + happyDeployment := appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "Deployment", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: workv1alpha1.GroupVersion.String(), + Kind: "AppliedWork", + Name: appliedWorkName, + }, + }, + }, + Spec: appsv1.DeploymentSpec{ + MinReadySeconds: 5, + }, + } + rawHappyDeployment, _ := json.Marshal(happyDeployment) + happyManifest := workv1alpha1.Manifest{RawExtension: runtime.RawExtension{ + Raw: rawHappyDeployment, + }} + _, happyDynamicClient, _ := createObjAndDynamicClient(happyManifest.Raw) + + getMockAppliedWork := func(ctx context.Context, key client.ObjectKey, obj client.Object) error { + if key.Name != workName { + return &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonNotFound, + }} + } + o, _ := obj.(*workv1alpha1.AppliedWork) + *o = workv1alpha1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: appliedWorkName, + }, + Spec: workv1alpha1.AppliedWorkSpec{ + WorkName: workNamespace, + WorkNamespace: workName, + }, + } + return nil + } + + clientFailDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + clientFailDynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New(failMsg) + }) + + testCases := map[string]struct { + reconciler ApplyWorkReconciler + req ctrl.Request + wantErr error + requeue bool + }{ + "controller is being stopped": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{}, + spokeDynamicClient: happyDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(false), + }, + req: req, + wantErr: nil, + requeue: true, + }, + "work cannot be retrieved, client failed due to client error": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{ + MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error { + return fmt.Errorf("client failing") + }, + }, + spokeDynamicClient: fakeDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + req: invalidReq, + wantErr: errors.New("client failing"), + }, + "work cannot be retrieved, client failed due to not found error": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{ + MockGet: getMock, + }, + spokeDynamicClient: fakeDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + req: wrongReq, + wantErr: nil, + }, + "work without finalizer / no error": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{ + MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error { + o, _ := obj.(*workv1alpha1.Work) + *o = workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: workNamespace, + Name: workName, + }, + } + return nil + }, + MockUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + }, + spokeDynamicClient: fakeDynamicClient, + spokeClient: &test.MockClient{ + MockCreate: func(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + return nil + }, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + }, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + req: req, + wantErr: nil, + }, + "work with non-zero deletion-timestamp / succeed": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{ + MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error { + o, _ := obj.(*workv1alpha1.Work) + *o = workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: workNamespace, + Name: workName, + Finalizers: []string{"multicluster.x-k8s.io/work-cleanup"}, + DeletionTimestamp: &metav1.Time{Time: time.Now()}, + }, + } + return nil + }, + }, + spokeDynamicClient: fakeDynamicClient, + spokeClient: &test.MockClient{}, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + req: req, + wantErr: nil, + }, + "Retrieving appliedwork fails, will create": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{ + MockGet: getMock, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + }, + spokeDynamicClient: fakeDynamicClient, + spokeClient: &test.MockClient{ + MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error { + return &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonNotFound, + }} + }, + MockCreate: func(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + return nil + }, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + }, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + req: req, + wantErr: nil, + }, + "ApplyManifest fails": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{ + MockGet: getMock, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + }, + spokeDynamicClient: clientFailDynamicClient, + spokeClient: &test.MockClient{ + MockGet: getMockAppliedWork, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + }, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(2), + joined: atomic.NewBool(true), + }, + req: req, + wantErr: errors.New(failMsg), + }, + "client update fails": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{ + MockGet: getMock, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return errors.New("failed") + }, + }, + spokeDynamicClient: clientFailDynamicClient, + spokeClient: &test.MockClient{ + MockGet: getMockAppliedWork, + }, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(2), + joined: atomic.NewBool(true), + }, + req: req, + wantErr: errors.New("failed"), + }, + "Happy Path": { + reconciler: ApplyWorkReconciler{ + client: &test.MockClient{ + MockGet: getMock, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + }, + spokeDynamicClient: happyDynamicClient, + spokeClient: &test.MockClient{ + MockGet: getMockAppliedWork, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil + }, + }, + restMapper: testMapper{}, + recorder: utils.NewFakeRecorder(1), + joined: atomic.NewBool(true), + }, + req: req, + wantErr: nil, + requeue: true, + }, + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + ctrlResult, err := testCase.reconciler.Reconcile(context.Background(), testCase.req) + if testCase.wantErr != nil { + assert.Containsf(t, err.Error(), testCase.wantErr.Error(), "incorrect error for Testcase %s", testName) + } else { + if testCase.requeue { + if testCase.reconciler.joined.Load() { + assert.Equal(t, ctrl.Result{RequeueAfter: time.Minute * 5}, ctrlResult, "incorrect ctrlResult for Testcase %s", testName) + } else { + assert.Equal(t, ctrl.Result{RequeueAfter: time.Second * 5}, ctrlResult, "incorrect ctrlResult for Testcase %s", testName) + } + } + assert.Equalf(t, false, ctrlResult.Requeue, "incorrect ctrlResult for Testcase %s", testName) + } + }) + } +} + +func createObjAndDynamicClient(rawManifest []byte) (*unstructured.Unstructured, dynamic.Interface, string) { + uObj := unstructured.Unstructured{} + _ = uObj.UnmarshalJSON(rawManifest) + validSpecHash, _ := computeManifestHash(&uObj) + uObj.SetAnnotations(map[string]string{manifestHashAnnotation: validSpecHash}) + _ = setModifiedConfigurationAnnotation(&uObj) + dynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicClient.PrependReactor("get", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, uObj.DeepCopy(), nil + }) + dynamicClient.PrependReactor("patch", "*", func(action testingclient.Action) (handled bool, ret runtime.Object, err error) { + return true, uObj.DeepCopy(), nil + }) + return &uObj, dynamicClient, validSpecHash +} diff --git a/pkg/controllers/work/manager.go b/pkg/controllers/work/manager.go new file mode 100644 index 000000000..067471bc7 --- /dev/null +++ b/pkg/controllers/work/manager.go @@ -0,0 +1,95 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "os" + + "github.com/go-logr/logr" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const ( + workFinalizer = "fleet.azure.com/work-cleanup" + + manifestHashAnnotation = "fleet.azure.com/spec-hash" + + lastAppliedConfigAnnotation = "fleet.azure.com/last-applied-configuration" + + ConditionTypeApplied = "Applied" + ConditionTypeAvailable = "Available" + + // number of concurrent reconcile loop for work + maxWorkConcurrency = 5 +) + +// CreateControllers create the controllers with the supplied config +func CreateControllers(ctx context.Context, hubCfg, spokeCfg *rest.Config, setupLog logr.Logger, opts ctrl.Options) (manager.Manager, *ApplyWorkReconciler, error) { + hubMgr, err := ctrl.NewManager(hubCfg, opts) + if err != nil { + setupLog.Error(err, "unable to create hub manager") + os.Exit(1) + } + + spokeDynamicClient, err := dynamic.NewForConfig(spokeCfg) + if err != nil { + setupLog.Error(err, "unable to create spoke dynamic client") + os.Exit(1) + } + + restMapper, err := apiutil.NewDynamicRESTMapper(spokeCfg, apiutil.WithLazyDiscovery) + if err != nil { + setupLog.Error(err, "unable to create spoke rest mapper") + os.Exit(1) + } + + spokeClient, err := client.New(spokeCfg, client.Options{ + Scheme: opts.Scheme, Mapper: restMapper, + }) + if err != nil { + setupLog.Error(err, "unable to create spoke client") + os.Exit(1) + } + + workController := NewApplyWorkReconciler( + hubMgr.GetClient(), + spokeDynamicClient, + spokeClient, + restMapper, + hubMgr.GetEventRecorderFor("work_controller"), + maxWorkConcurrency, + opts.Namespace, + ) + + if err = workController.SetupWithManager(hubMgr); err != nil { + setupLog.Error(err, "unable to create the controller", "controller", "Work") + return nil, nil, err + } + + if err = workController.Join(ctx); err != nil { + setupLog.Error(err, "unable to mark the controller joined", "controller", "Work") + return nil, nil, err + } + + return hubMgr, workController, nil +} diff --git a/pkg/controllers/work/owner_reference_util.go b/pkg/controllers/work/owner_reference_util.go new file mode 100644 index 000000000..58fb719dc --- /dev/null +++ b/pkg/controllers/work/owner_reference_util.go @@ -0,0 +1,70 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// addOwnerRef creates or inserts the owner reference to the object +func addOwnerRef(ref metav1.OwnerReference, object metav1.Object) { + owners := object.GetOwnerReferences() + if idx := indexOwnerRef(owners, ref); idx == -1 { + owners = append(owners, ref) + } else { + owners[idx] = ref + } + object.SetOwnerReferences(owners) +} + +// mergeOwnerReference merges two owner reference arrays. +func mergeOwnerReference(owners, newOwners []metav1.OwnerReference) []metav1.OwnerReference { + for _, newOwner := range newOwners { + if idx := indexOwnerRef(owners, newOwner); idx == -1 { + owners = append(owners, newOwner) + } else { + owners[idx] = newOwner + } + } + return owners +} + +// indexOwnerRef returns the index of the owner reference in the slice if found, or -1. +func indexOwnerRef(ownerReferences []metav1.OwnerReference, ref metav1.OwnerReference) int { + for index, r := range ownerReferences { + if isReferSameObject(r, ref) { + return index + } + } + return -1 +} + +// isReferSameObject returns true if a and b point to the same object. +func isReferSameObject(a, b metav1.OwnerReference) bool { + aGV, err := schema.ParseGroupVersion(a.APIVersion) + if err != nil { + return false + } + + bGV, err := schema.ParseGroupVersion(b.APIVersion) + if err != nil { + return false + } + + return aGV.Group == bGV.Group && aGV.Version == bGV.Version && a.Kind == b.Kind && a.Name == b.Name +} diff --git a/pkg/controllers/work/patch_util.go b/pkg/controllers/work/patch_util.go new file mode 100644 index 000000000..1258952d5 --- /dev/null +++ b/pkg/controllers/work/patch_util.go @@ -0,0 +1,126 @@ +package controllers + +import ( + "encoding/json" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/jsonmergepatch" + "k8s.io/apimachinery/pkg/util/mergepatch" + "k8s.io/apimachinery/pkg/util/strategicpatch" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var builtinScheme = runtime.NewScheme() +var metadataAccessor = meta.NewAccessor() + +func init() { + // we use this trick to check if a resource is k8s built-in + _ = clientgoscheme.AddToScheme(builtinScheme) +} + +// threeWayMergePatch creates a patch by computing a three-way diff based on +// an object's current state, modified state, and last-applied-state recorded in its annotation. +func threeWayMergePatch(currentObj, manifestObj client.Object) (client.Patch, error) { + //TODO: see if we should use something like json.ConfigCompatibleWithStandardLibrary.Marshal to make sure that + // the json we created is compatible with the format that json merge patch requires. + current, err := json.Marshal(currentObj) + if err != nil { + return nil, err + } + original, err := getOriginalConfiguration(currentObj) + if err != nil { + return nil, err + } + manifest, err := json.Marshal(manifestObj) + if err != nil { + return nil, err + } + + var patchType types.PatchType + var patchData []byte + var lookupPatchMeta strategicpatch.LookupPatchMeta + + versionedObject, err := builtinScheme.New(currentObj.GetObjectKind().GroupVersionKind()) + switch { + case runtime.IsNotRegisteredError(err): + // use JSONMergePatch for custom resources + // because StrategicMergePatch doesn't support custom resources + patchType = types.MergePatchType + preconditions := []mergepatch.PreconditionFunc{ + mergepatch.RequireKeyUnchanged("apiVersion"), + mergepatch.RequireKeyUnchanged("kind"), + mergepatch.RequireMetadataKeyUnchanged("name")} + patchData, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, manifest, current, preconditions...) + if err != nil { + return nil, err + } + case err != nil: + return nil, err + default: + // use StrategicMergePatch for K8s built-in resources + patchType = types.StrategicMergePatchType + lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject) + if err != nil { + return nil, err + } + patchData, err = strategicpatch.CreateThreeWayMergePatch(original, manifest, current, lookupPatchMeta, true) + if err != nil { + return nil, err + } + } + return client.RawPatch(patchType, patchData), nil +} + +// setModifiedConfigurationAnnotation serializes the object into byte stream. +// If `updateAnnotation` is true, it embeds the result as an annotation in the +// modified configuration. +func setModifiedConfigurationAnnotation(obj runtime.Object) error { + var modified []byte + annotations, err := metadataAccessor.Annotations(obj) + if err != nil { + return errors.Wrap(err, "cannot access metadata.annotations") + } + if annotations == nil { + annotations = make(map[string]string) + } + + // remove the annotation to avoid recursion + delete(annotations, lastAppliedConfigAnnotation) + // do not include an empty map + if len(annotations) == 0 { + _ = metadataAccessor.SetAnnotations(obj, nil) + } else { + _ = metadataAccessor.SetAnnotations(obj, annotations) + } + + //TODO: see if we should use something like json.ConfigCompatibleWithStandardLibrary.Marshal to make sure that + // the produced json format is more three way merge friendly + modified, err = json.Marshal(obj) + if err != nil { + return err + } + // set the last applied annotation back + annotations[lastAppliedConfigAnnotation] = string(modified) + return metadataAccessor.SetAnnotations(obj, annotations) +} + +// getOriginalConfiguration gets original configuration of the object +// form the annotation, or return an error if no annotation found. +func getOriginalConfiguration(obj runtime.Object) ([]byte, error) { + annots, err := metadataAccessor.Annotations(obj) + if err != nil { + return nil, errors.Wrap(err, "cannot access metadata.annotations") + } + if annots == nil { + return nil, errors.New("object does not have lastAppliedConfigAnnotation") + } + original, ok := annots[lastAppliedConfigAnnotation] + if !ok { + return nil, errors.New("object does not have lastAppliedConfigAnnotation") + } + return []byte(original), nil +} diff --git a/pkg/controllers/work/suite_test.go b/pkg/controllers/work/suite_test.go new file mode 100644 index 000000000..5e77968b5 --- /dev/null +++ b/pkg/controllers/work/suite_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "flag" + "os" + "path/filepath" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + kruisev1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/manager" + + workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. +var ( + cfg *rest.Config + // TODO: Separate k8sClient into hub and spoke + k8sClient client.Client + testEnv *envtest.Environment + workController *ApplyWorkReconciler + setupLog = ctrl.Log.WithName("test") + ctx context.Context + cancel context.CancelFunc +) + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Work-API Controller Suite") +} + +var _ = BeforeSuite(func() { + By("Setup klog") + fs := flag.NewFlagSet("klog", flag.ContinueOnError) + klog.InitFlags(fs) + Expect(fs.Parse([]string{"--v", "5", "-add_dir_header", "true"})).Should(Succeed()) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{ + filepath.Join("../../../", "config", "crd", "bases"), + filepath.Join("../../../", "test", "integration", "manifests", "resources"), + }, + } + + var err error + cfg, err = testEnv.Start() + Expect(err).ToNot(HaveOccurred()) + Expect(cfg).ToNot(BeNil()) + + err = workv1alpha1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = kruisev1alpha1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + opts := ctrl.Options{ + Scheme: scheme.Scheme, + } + k8sClient, err = client.New(cfg, client.Options{ + Scheme: scheme.Scheme, + }) + Expect(err).NotTo(HaveOccurred()) + + By("start controllers") + var hubMgr manager.Manager + if hubMgr, workController, err = CreateControllers(ctx, cfg, cfg, setupLog, opts); err != nil { + setupLog.Error(err, "problem creating controllers") + os.Exit(1) + } + go func() { + ctx, cancel = context.WithCancel(context.Background()) + if err = hubMgr.Start(ctx); err != nil { + setupLog.Error(err, "problem running controllers") + os.Exit(1) + } + Expect(err).ToNot(HaveOccurred()) + }() +}) + +var _ = AfterSuite(func() { + cancel() + By("tearing down the test environment") + err := testEnv.Stop() + Expect(err).ToNot(HaveOccurred()) +}) diff --git a/test/integration/suite_test.go b/test/integration/suite_test.go index 63b494264..3eafb1b42 100644 --- a/test/integration/suite_test.go +++ b/test/integration/suite_test.go @@ -11,8 +11,6 @@ import ( "path/filepath" "testing" - "go.goms.io/fleet/cmd/hubagent/options" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" kruisev1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -28,6 +26,7 @@ import ( // +kubebuilder:scaffold:imports fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" + "go.goms.io/fleet/cmd/hubagent/options" "go.goms.io/fleet/cmd/hubagent/workload" "go.goms.io/fleet/pkg/controllers/membercluster" )