diff --git a/apis/v1/policySnapshot_types.go b/apis/v1/policysnapshot_types.go similarity index 95% rename from apis/v1/policySnapshot_types.go rename to apis/v1/policysnapshot_types.go index 4c4e636a2..05182559b 100644 --- a/apis/v1/policySnapshot_types.go +++ b/apis/v1/policysnapshot_types.go @@ -13,6 +13,9 @@ import ( const ( // PolicyIndexLabel is the label that indicate the policy snapshot index of a cluster policy. PolicyIndexLabel = fleetPrefix + "policyIndex" + + // PolicySnapshotNameFmt is clusterPolicySnapshot name format: {CRPName}-{PolicySnapshotIndex}. + PolicySnapshotNameFmt = "%s-%d" ) // +genclient @@ -26,7 +29,8 @@ const ( // ClusterPolicySnapshot is used to store a snapshot of cluster placement policy. // Its spec is immutable. -// The name convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex} +// The naming convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex}. +// PolicySnapshotIndex will begin with 0. // Each snapshot must have the following labels: // - `CRPTrackingLabel` which points to its owner CRP. // - `PolicyIndexLabel` which is the index of the policy snapshot. diff --git a/apis/v1/resourceSnapshot_types.go b/apis/v1/resourcesnapshot_types.go similarity index 100% rename from apis/v1/resourceSnapshot_types.go rename to apis/v1/resourcesnapshot_types.go diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go index 759b96c1c..fe449663f 100644 --- a/cmd/hubagent/workload/setup.go +++ b/cmd/hubagent/workload/setup.go @@ -88,6 +88,7 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config InformerManager: dynamicInformerManager, DisabledResourceConfig: disabledResourceConfig, SkippedNamespaces: skippedNamespaces, + Scheme: mgr.GetScheme(), } ratelimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts) diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index 5026cef92..77ff01922 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -7,8 +7,10 @@ package clusterresourceplacement import ( "context" + "crypto/sha256" "errors" "fmt" + "strconv" "time" corev1 "k8s.io/api/core/v1" @@ -16,11 +18,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + fleetv1 "go.goms.io/fleet/apis/v1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/metrics" "go.goms.io/fleet/pkg/utils" @@ -57,6 +62,8 @@ type Reconciler struct { SkippedNamespaces map[string]bool Recorder record.EventRecorder + + Scheme *runtime.Scheme } func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) { @@ -333,3 +340,150 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl } } } + +// handleUpdate handles the create/update clusterResourcePlacement event. +// It creates corresponding clusterPolicySnapshot and clusterResourceSnapshot if needed and updates the status based on +// clusterPolicySnapshot status and work status. +// If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling. +func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResourcePlacement) (ctrl.Result, error) { + crpKObj := klog.KObj(crp) + policyHash, err := generatePolicyHash(crp.Spec.Policy) + if err != nil { + klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj) + return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) + } + + latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestClusterPolicySnapshot(ctx, crp) + if err != nil { + return ctrl.Result{}, err + } + + // mark the last policy snapshot as inactive if it is different from what we have now + if latestPolicySnapshot != nil && + string(latestPolicySnapshot.Spec.PolicyHash) != policyHash && + latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] == strconv.FormatBool(true) { + // set the latest label to false first to make sure there is only one or none active policy snapshot + latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(false) + if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { + klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + return ctrl.Result{}, controller.NewAPIServerError(err) + } + } + if latestPolicySnapshot == nil || string(latestPolicySnapshot.Spec.PolicyHash) != policyHash { + // create a new policy snapshot + latestPolicySnapshotIndex++ + latestPolicySnapshot = &fleetv1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, crp.Name, latestPolicySnapshotIndex), + Labels: map[string]string{ + fleetv1.CRPTrackingLabel: crp.Name, + fleetv1.IsLatestSnapshotLabel: strconv.FormatBool(true), + fleetv1.PolicyIndexLabel: strconv.Itoa(latestPolicySnapshotIndex), + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: crp.Spec.Policy, + PolicyHash: []byte(policyHash), + }, + } + if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil { + klog.ErrorS(err, "Failed to create set owner reference", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + // should never happen + return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) + } + if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil { + klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + return ctrl.Result{}, controller.NewAPIServerError(err) + } + } else if latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] != strconv.FormatBool(true) { + // When latestPolicySnapshot.Spec.PolicyHash == policyHash, + // It could happen when the controller just sets the latest label to false for the old snapshot, and fails to + // create a new policy snapshot. + // And then the customers revert back their policy to the old one again. + // In this case, the "latest" snapshot without isLatest label has the same policy hash as the current policy. + + latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(true) + if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { + klog.ErrorS(err, "Failed to set the isLatestSnapshot label to true", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + return ctrl.Result{}, controller.NewAPIServerError(err) + } + } + // create clusterResourceSnapshot + // update the status based on the latestPolicySnapshot status + // update the status based on the work + return ctrl.Result{}, nil +} + +// lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index. +// There will be only one active policy snapshot if exists. +// It first checks whether there is an active policy snapshot. +// If not, it finds the one whose policyIndex label is the largest. +// The policy index will always start from 0. +// Return error when 1) cannot list the snapshots 2) there are more than one active policy snapshots 3) snapshot has the +// invalid label value. +// 2 & 3 should never happen. +func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp *fleetv1.ClusterResourcePlacement) (*fleetv1.ClusterPolicySnapshot, int, error) { + snapshotList := &fleetv1.ClusterPolicySnapShotList{} + latestSnapshotLabelMatcher := client.MatchingLabels{ + fleetv1.CRPTrackingLabel: crp.Name, + fleetv1.IsLatestSnapshotLabel: strconv.FormatBool(true), + } + crpKObj := klog.KObj(crp) + if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil { + klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) + return nil, -1, controller.NewAPIServerError(err) + } + if len(snapshotList.Items) == 1 { + policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) + if err != nil { + return nil, -1, controller.NewUnexpectedBehaviorError(err) + } + return &snapshotList.Items[0], policyIndex, nil + } else if len(snapshotList.Items) > 1 { + // It means there are multiple active snapshots and should never happen. + err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) + klog.ErrorS(err, "It should never happen", "clusterResourcePlacement", crpKObj) + return nil, -1, controller.NewUnexpectedBehaviorError(err) + } + // When there are no active snapshots, find the one who has the largest policy index. + if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1.CRPTrackingLabel: crp.Name}); err != nil { + klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) + return nil, -1, controller.NewAPIServerError(err) + } + if len(snapshotList.Items) == 0 { + // The policy index of the first snapshot will start from 0. + return nil, -1, nil + } + index := -1 // the index of the cluster policy snapshot array + lastPolicyIndex := -1 // the assigned policy index of the cluster policy snapshot + for i := range snapshotList.Items { + policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) + if err != nil { + return nil, -1, controller.NewUnexpectedBehaviorError(err) + } + if lastPolicyIndex < policyIndex { + index = i + lastPolicyIndex = policyIndex + } + } + return &snapshotList.Items[index], lastPolicyIndex, nil +} + +func parsePolicyIndexFromLabel(s *fleetv1.ClusterPolicySnapshot) (int, error) { + indexLabel := s.Labels[fleetv1.PolicyIndexLabel] + v, err := strconv.Atoi(indexLabel) + if err != nil { + klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel) + // should never happen + return -1, err + } + return v, nil +} + +func generatePolicyHash(policy *fleetv1.PlacementPolicy) (string, error) { + jsonBytes, err := json.Marshal(policy) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", sha256.Sum256(jsonBytes)), nil +} diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go new file mode 100644 index 000000000..a1dc6cd28 --- /dev/null +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -0,0 +1,665 @@ +package clusterresourceplacement + +import ( + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + fleetv1 "go.goms.io/fleet/apis/v1" + "go.goms.io/fleet/pkg/utils/controller" +) + +const ( + testName = "my-crp" + fleetAPIVersion = "fleet.azure.com/v1" +) + +func serviceScheme(t *testing.T) *runtime.Scheme { + scheme := runtime.NewScheme() + if err := fleetv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add scheme: %v", err) + } + return scheme +} + +func placementPolicyForTest() *fleetv1.PlacementPolicy { + return &fleetv1.PlacementPolicy{ + PlacementType: fleetv1.PickNPlacementType, + NumberOfClusters: pointer.Int32(3), + Affinity: &fleetv1.Affinity{ + ClusterAffinity: &fleetv1.ClusterAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &fleetv1.ClusterSelector{ + ClusterSelectorTerms: []fleetv1.ClusterSelectorTerm{ + { + LabelSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key1": "value1", + }, + }, + }, + }, + }, + }, + }, + } +} + +func clusterResourcePlacementForTest() *fleetv1.ClusterResourcePlacement { + return &fleetv1.ClusterResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: testName, + }, + Spec: fleetv1.ClusterResourcePlacementSpec{ + Policy: placementPolicyForTest(), + }, + } +} + +func TestHandleUpdate(t *testing.T) { + jsonBytes, err := json.Marshal(placementPolicyForTest()) + if err != nil { + t.Fatalf("failed to create the policy hash: %v", err) + } + policyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) + jsonBytes, err = json.Marshal(nil) + if err != nil { + t.Fatalf("failed to create the policy hash: %v", err) + } + unspecifiedPolicyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) + tests := []struct { + name string + policySnapshots []fleetv1.ClusterPolicySnapshot + wantPolicySnapshots []fleetv1.ClusterPolicySnapshot + }{ + { + name: "new clusterResourcePolicy and no existing policy snapshots owned by my-crp", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "another-crp-1", + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: "another-crp", + }, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "another-crp-1", + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: "another-crp", + }, + }, + }, + // new policy snapshot owned by the my-crp + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + { + name: "crp policy has no change", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + { + name: "crp policy has changed and there is no active snapshot", + // It happens when last reconcile loop fails after setting the latest label to false and + // before creating a new policy snapshot. + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 3), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "3", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 3), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "3", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 4), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "4", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + { + name: "crp policy has changed and there is an active snapshot", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + { + name: "crp policy has been changed and reverted back and there is no active snapshot", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + crp := clusterResourcePlacementForTest() + objects := []client.Object{crp} + for i := range tc.policySnapshots { + objects = append(objects, &tc.policySnapshots[i]) + } + scheme := serviceScheme(t) + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + Build() + r := Reconciler{Client: fakeClient, Scheme: scheme} + got, err := r.handleUpdate(ctx, crp) + if err != nil { + t.Fatalf("failed to handle update: %v", err) + } + want := ctrl.Result{} + if !cmp.Equal(got, want) { + t.Errorf("handleUpdate() = %+v, want %+v", got, want) + } + clusterPolicySnapshotList := &fleetv1.ClusterPolicySnapShotList{} + if err := fakeClient.List(ctx, clusterPolicySnapshotList); err != nil { + t.Fatalf("clusterPolicySnapShot List() got error %v, want no error", err) + } + options := []cmp.Option{ + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + } + if diff := cmp.Diff(tc.wantPolicySnapshots, clusterPolicySnapshotList.Items, options...); diff != "" { + t.Errorf("clusterPolicySnapShot List() mismatch (-want, +got):\n%s", diff) + } + }) + } +} + +func TestHandleUpdate_failure(t *testing.T) { + tests := []struct { + name string + policySnapshots []fleetv1.ClusterPolicySnapshot + }{ + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "existing active policy snapshot does not have policyIndex label", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "existing active policy snapshot has an invalid policyIndex label", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0bc", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "no active policy snapshot exists and policySnapshot with invalid policyIndex label", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "abc", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "multiple active policy snapshot exist", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.CRPTrackingLabel: testName, + fleetv1.IsLatestSnapshotLabel: "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.CRPTrackingLabel: testName, + fleetv1.IsLatestSnapshotLabel: "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + crp := clusterResourcePlacementForTest() + objects := []client.Object{crp} + for i := range tc.policySnapshots { + objects = append(objects, &tc.policySnapshots[i]) + } + scheme := serviceScheme(t) + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + Build() + r := Reconciler{Client: fakeClient, Scheme: scheme} + got, err := r.handleUpdate(ctx, crp) + if err == nil { // if error is nil + t.Fatal("handleUpdate = nil, want err") + } + if !errors.Is(err, controller.ErrUnexpectedBehavior) { + t.Errorf("handleUpdate() got %v, want %v type", err, controller.ErrUnexpectedBehavior) + } + want := ctrl.Result{} + if !cmp.Equal(got, want) { + t.Errorf("handleUpdate() = %+v, want %+v", got, want) + } + }) + } +} diff --git a/pkg/utils/controller/controller.go b/pkg/utils/controller/controller.go index 49cef22b5..7338e317f 100644 --- a/pkg/utils/controller/controller.go +++ b/pkg/utils/controller/controller.go @@ -7,6 +7,7 @@ package controller import ( "context" + "errors" "fmt" "sync" "time" @@ -27,6 +28,32 @@ const ( labelSuccess = "success" ) +var ( + // ErrUnexpectedBehavior indicates the current situation is not expected. + // There should be something wrong with the system and cannot be recovered by itself. + ErrUnexpectedBehavior = errors.New("unexpected behavior which cannot be handled by the controller") + + // ErrAPIServerError indicates the error is returned by the API server. + ErrAPIServerError = errors.New("error returned by the API server") +) + +// NewUnexpectedBehaviorError returns ErrUnexpectedBehavior type error. +func NewUnexpectedBehaviorError(err error) error { + // TODO(zhiying) emit error metrics or well defined logs + if err != nil { + return fmt.Errorf("%w: %v", ErrUnexpectedBehavior, err.Error()) + } + return ErrUnexpectedBehavior +} + +// NewAPIServerError returns ErrAPIServerError type error. +func NewAPIServerError(err error) error { + if err != nil { + return fmt.Errorf("%w: %v", ErrAPIServerError, err.Error()) + } + return ErrAPIServerError +} + // Controller maintains a rate limiting queue and the items in the queue will be reconciled by a "ReconcileFunc". // The item will be re-queued if "ReconcileFunc" returns an error, maximum re-queue times defined by "maxRetries" above, // after that the item will be discarded from the queue.