diff --git a/apis/placement/v1beta1/policysnapshot_types.go b/apis/placement/v1beta1/policysnapshot_types.go index 07eb2e597..7ff78402e 100644 --- a/apis/placement/v1beta1/policysnapshot_types.go +++ b/apis/placement/v1beta1/policysnapshot_types.go @@ -16,6 +16,9 @@ const ( // PolicySnapshotNameFmt is clusterPolicySnapshot name format: {CRPName}-{PolicySnapshotIndex}. PolicySnapshotNameFmt = "%s-%d" + + // NumberOfClustersAnnotation is the annotation that indicates how many clusters should be selected for selectN placement type. + NumberOfClustersAnnotation = fleetPrefix + "numberOfClusters" ) // +genclient diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index a77f4058d..088d09187 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -347,7 +347,9 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl // If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling. func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (ctrl.Result, error) { crpKObj := klog.KObj(crp) - policyHash, err := generatePolicyHash(crp.Spec.Policy) + schedulingPolicy := *crp.Spec.Policy // will exclude the numberOfClusters + schedulingPolicy.NumberOfClusters = nil + policyHash, err := generatePolicyHash(&schedulingPolicy) if err != nil { klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) @@ -369,7 +371,11 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster return ctrl.Result{}, controller.NewAPIServerError(err) } } - if latestPolicySnapshot == nil || string(latestPolicySnapshot.Spec.PolicyHash) != policyHash { + if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) == policyHash { + if err := r.ensureLatestPolicySnapshot(ctx, crp, latestPolicySnapshot); err != nil { + return ctrl.Result{}, err + } + } else { // create a new policy snapshot latestPolicySnapshotIndex++ latestPolicySnapshot = &fleetv1beta1.ClusterPolicySnapshot{ @@ -382,36 +388,73 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster }, }, Spec: fleetv1beta1.PolicySnapshotSpec{ - Policy: crp.Spec.Policy, + Policy: &schedulingPolicy, PolicyHash: []byte(policyHash), }, } + policySnapshotKObj := klog.KObj(latestPolicySnapshot) if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil { - klog.ErrorS(err, "Failed to create set owner reference", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + klog.ErrorS(err, "Failed to set owner reference", "clusterPolicySnapshot", policySnapshotKObj) // should never happen return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } + // make sure each policySnapshot should always have the annotation if CRP is selectN type + if crp.Spec.Policy != nil && + crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType && + crp.Spec.Policy.NumberOfClusters != nil { + latestPolicySnapshot.Annotations = map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(int(*crp.Spec.Policy.NumberOfClusters)), + } + } + if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil { - klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", policySnapshotKObj) return ctrl.Result{}, controller.NewAPIServerError(err) } - } else if latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] != strconv.FormatBool(true) { + } + + // create clusterResourceSnapshot + // update the status based on the latestPolicySnapshot status + // update the status based on the work + return ctrl.Result{}, nil +} + +// ensureLatestPolicySnapshot ensures the latest policySnapshot has the isLatest label and the numberOfClusters are updated. +func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, latest *fleetv1beta1.ClusterPolicySnapshot) error { + needUpdate := false + if latest.Labels[fleetv1beta1.IsLatestSnapshotLabel] != strconv.FormatBool(true) { // When latestPolicySnapshot.Spec.PolicyHash == policyHash, // It could happen when the controller just sets the latest label to false for the old snapshot, and fails to // create a new policy snapshot. // And then the customers revert back their policy to the old one again. // In this case, the "latest" snapshot without isLatest label has the same policy hash as the current policy. - latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(true) - if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { - klog.ErrorS(err, "Failed to set the isLatestSnapshot label to true", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) - return ctrl.Result{}, controller.NewAPIServerError(err) + latest.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(true) + needUpdate = true + } + + if crp.Spec.Policy != nil && + crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType && + crp.Spec.Policy.NumberOfClusters != nil { + oldCount, err := parseNumberOfClustersFromAnnotation(latest) + if err != nil { + klog.ErrorS(err, "Failed to parse the numberOfClusterAnnotation", "clusterPolicySnapshot", klog.KObj(latest)) + return controller.NewUnexpectedBehaviorError(err) + } + newCount := int(*crp.Spec.Policy.NumberOfClusters) + if oldCount != newCount { + latest.Annotations[fleetv1beta1.NumberOfClustersAnnotation] = strconv.Itoa(newCount) + needUpdate = true } } - // create clusterResourceSnapshot - // update the status based on the latestPolicySnapshot status - // update the status based on the work - return ctrl.Result{}, nil + if !needUpdate { + return nil + } + if err := r.Client.Update(ctx, latest); err != nil { + klog.ErrorS(err, "Failed to update the clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latest)) + return controller.NewAPIServerError(err) + } + return nil } // lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index. @@ -436,6 +479,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp if len(snapshotList.Items) == 1 { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) if err != nil { + klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0])) return nil, -1, controller.NewUnexpectedBehaviorError(err) } return &snapshotList.Items[0], policyIndex, nil @@ -459,6 +503,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp for i := range snapshotList.Items { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) if err != nil { + klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[i])) return nil, -1, controller.NewUnexpectedBehaviorError(err) } if lastPolicyIndex < policyIndex { @@ -469,14 +514,29 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp return &snapshotList.Items[index], lastPolicyIndex, nil } +// parsePolicyIndexFromLabel returns error when parsing the label which should never return error in production. func parsePolicyIndexFromLabel(s *fleetv1beta1.ClusterPolicySnapshot) (int, error) { indexLabel := s.Labels[fleetv1beta1.PolicyIndexLabel] v, err := strconv.Atoi(indexLabel) if err != nil { - klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel) - // should never happen return -1, err } + if v < 0 { + return -1, fmt.Errorf("policy index should not be negative: %d", v) + } + return v, nil +} + +// parseNumberOfClustersFromAnnotation returns error when parsing the annotation which should never return error in production. +func parseNumberOfClustersFromAnnotation(s *fleetv1beta1.ClusterPolicySnapshot) (int, error) { + n := s.Annotations[fleetv1beta1.NumberOfClustersAnnotation] + v, err := strconv.Atoi(n) + if err != nil { + return -1, err + } + if v < 0 { + return -1, fmt.Errorf("numberOfCluster should not be negative: %d", v) + } return v, nil } diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go index 554efcc40..fe9eb2474 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "strconv" "testing" "github.com/google/go-cmp/cmp" @@ -71,7 +72,9 @@ func clusterResourcePlacementForTest() *fleetv1beta1.ClusterResourcePlacement { } func TestHandleUpdate(t *testing.T) { - jsonBytes, err := json.Marshal(placementPolicyForTest()) + wantPolicy := placementPolicyForTest() + wantPolicy.NumberOfClusters = nil + jsonBytes, err := json.Marshal(wantPolicy) if err != nil { t.Fatalf("failed to create the policy hash: %v", err) } @@ -129,9 +132,12 @@ func TestHandleUpdate(t *testing.T) { Kind: "ClusterResourcePlacement", }, }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, }, Spec: fleetv1beta1.PolicySnapshotSpec{ - Policy: placementPolicyForTest(), + Policy: wantPolicy, PolicyHash: policyHash, }, }, @@ -157,9 +163,12 @@ func TestHandleUpdate(t *testing.T) { Kind: "ClusterResourcePlacement", }, }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, }, Spec: fleetv1beta1.PolicySnapshotSpec{ - Policy: placementPolicyForTest(), + Policy: wantPolicy, PolicyHash: policyHash, }, }, @@ -182,9 +191,12 @@ func TestHandleUpdate(t *testing.T) { Kind: "ClusterResourcePlacement", }, }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, }, Spec: fleetv1beta1.PolicySnapshotSpec{ - Policy: placementPolicyForTest(), + Policy: wantPolicy, PolicyHash: policyHash, }, }, @@ -306,9 +318,12 @@ func TestHandleUpdate(t *testing.T) { Kind: "ClusterResourcePlacement", }, }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, }, Spec: fleetv1beta1.PolicySnapshotSpec{ - Policy: placementPolicyForTest(), + Policy: wantPolicy, PolicyHash: policyHash, }, }, @@ -382,9 +397,12 @@ func TestHandleUpdate(t *testing.T) { Kind: "ClusterResourcePlacement", }, }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, }, Spec: fleetv1beta1.PolicySnapshotSpec{ - Policy: placementPolicyForTest(), + Policy: wantPolicy, PolicyHash: policyHash, }, }, @@ -432,9 +450,117 @@ func TestHandleUpdate(t *testing.T) { Kind: "ClusterResourcePlacement", }, }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, + }, + Spec: fleetv1beta1.PolicySnapshotSpec{ + Policy: wantPolicy, + PolicyHash: policyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "0", + fleetv1beta1.IsLatestSnapshotLabel: "false", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1beta1.PolicySnapshotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "1", + fleetv1beta1.IsLatestSnapshotLabel: "true", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, + }, + Spec: fleetv1beta1.PolicySnapshotSpec{ + Policy: wantPolicy, + PolicyHash: policyHash, + }, + }, + }, + }, + { + name: "crp policy has not been changed and only the numberOfCluster is changed", + policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "0", + fleetv1beta1.IsLatestSnapshotLabel: "false", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + }, + Spec: fleetv1beta1.PolicySnapshotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "1", + fleetv1beta1.CRPTrackingLabel: testName, + fleetv1beta1.IsLatestSnapshotLabel: "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(1), + }, }, Spec: fleetv1beta1.PolicySnapshotSpec{ - Policy: placementPolicyForTest(), + Policy: wantPolicy, PolicyHash: policyHash, }, }, @@ -480,9 +606,12 @@ func TestHandleUpdate(t *testing.T) { Kind: "ClusterResourcePlacement", }, }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, }, Spec: fleetv1beta1.PolicySnapshotSpec{ - Policy: placementPolicyForTest(), + Policy: wantPolicy, PolicyHash: policyHash, }, }, @@ -526,6 +655,13 @@ func TestHandleUpdate(t *testing.T) { } func TestHandleUpdate_failure(t *testing.T) { + wantPolicy := placementPolicyForTest() + wantPolicy.NumberOfClusters = nil + jsonBytes, err := json.Marshal(wantPolicy) + if err != nil { + t.Fatalf("failed to create the policy hash: %v", err) + } + policyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) tests := []struct { name string policySnapshots []fleetv1beta1.ClusterPolicySnapshot @@ -637,6 +773,91 @@ func TestHandleUpdate_failure(t *testing.T) { }, }, }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "no active policy snapshot exists and policySnapshot with invalid policyIndex label (negative value)", + policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "-1", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "active policy snapshot exists and policySnapshot with invalid numberOfClusters annotation", + policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "1", + fleetv1beta1.IsLatestSnapshotLabel: "true", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: "invalid", + }, + }, + Spec: fleetv1beta1.PolicySnapshotSpec{ + Policy: wantPolicy, + PolicyHash: policyHash, + }, + }, + }, + }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "no active policy snapshot exists and policySnapshot with invalid numberOfClusters annotation (negative)", + policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "1", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: "-123", + }, + }, + Spec: fleetv1beta1.PolicySnapshotSpec{ + Policy: wantPolicy, + PolicyHash: policyHash, + }, + }, + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) {