From ba01fa087708cac15b2bf7fd360f804a83dc2db9 Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Mon, 3 Jul 2023 16:46:50 +0800 Subject: [PATCH 1/2] feat: enforce revisionHistoryLimit for scheduling policy snapshots --- .../v1beta1/clusterresourceplacement_types.go | 3 + ...eplacement_controller.go => controller.go} | 109 ++++++++++++++---- ..._controller_test.go => controller_test.go} | 98 +++++++--------- ...ler.go => placement_controllerv1alpha1.go} | 0 4 files changed, 129 insertions(+), 81 deletions(-) rename pkg/controllers/clusterresourceplacement/{clusterresourceplacement_controller.go => controller.go} (82%) rename pkg/controllers/clusterresourceplacement/{clusterresourceplacement_controller_test.go => controller_test.go} (97%) rename pkg/controllers/clusterresourceplacement/{placement_controller.go => placement_controllerv1alpha1.go} (100%) diff --git a/apis/placement/v1beta1/clusterresourceplacement_types.go b/apis/placement/v1beta1/clusterresourceplacement_types.go index dcb2e46d9..59127f589 100644 --- a/apis/placement/v1beta1/clusterresourceplacement_types.go +++ b/apis/placement/v1beta1/clusterresourceplacement_types.go @@ -15,6 +15,9 @@ const ( // ClusterResourcePlacementCleanupFinalizer is a finalizer added by the CRP controller to all CRPs, to make sure // that the CRP controller can react to CRP deletions if necessary. ClusterResourcePlacementCleanupFinalizer = fleetPrefix + "crp-cleanup" + + // RevisionHistoryLimitDefaultValue is the default value of RevisionHistoryLimit. + RevisionHistoryLimitDefaultValue = int32(10) ) // +genclient diff --git a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go b/pkg/controllers/clusterresourceplacement/controller.go similarity index 82% rename from pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go rename to pkg/controllers/clusterresourceplacement/controller.go index aabec1bc3..15a9c31cf 100644 --- a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go +++ b/pkg/controllers/clusterresourceplacement/controller.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "fmt" + "sort" "strconv" "k8s.io/apimachinery/pkg/api/errors" @@ -116,7 +117,7 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp * } // latestPolicySnapshotIndex should be -1 when there is no snapshot. - latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestClusterPolicySnapshot(ctx, crp) + latestPolicySnapshot, latestPolicySnapshotIndex, sortedList, err := r.lookupLatestClusterPolicySnapshot(ctx, crp) if err != nil { return nil, err } @@ -136,11 +137,17 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp * // set the latest label to false first to make sure there is only one or none active policy snapshot latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false) if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { - klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(latestPolicySnapshot)) return nil, controller.NewAPIServerError(false, err) } } + // delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots + // won't exceed the limit. + if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp, sortedList); err != nil { + return nil, err + } + // create a new policy snapshot latestPolicySnapshotIndex++ latestPolicySnapshot = &fleetv1beta1.ClusterSchedulingPolicySnapshot{ @@ -179,6 +186,37 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp * return latestPolicySnapshot, nil } +func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, sortedList *fleetv1beta1.ClusterSchedulingPolicySnapshotList) error { + var err error + if sortedList == nil { + sortedList, err = r.listSortedClusterSchedulingPolicySnapshots(ctx, crp) + if err != nil { + return err + } + } + crpKObj := klog.KObj(crp) + // respect the revisionHistoryLimit field + revisionLimit := fleetv1beta1.RevisionHistoryLimitDefaultValue + if crp.Spec.RevisionHistoryLimit != nil { + revisionLimit = *crp.Spec.RevisionHistoryLimit + if revisionLimit <= 0 { + err := fmt.Errorf("invalid clusterResourcePlacement %s: invalid revisionHistoryLimit %d", crpKObj, revisionLimit) + klog.ErrorS(err, "Invalid revisionHistoryLimit value", "clusterResourcePlacement", crpKObj) + return controller.NewUnexpectedBehaviorError(err) // webhook should block this user error + } + } + if len(sortedList.Items) < int(revisionLimit) { + return nil + } + for i := 0; i <= len(sortedList.Items)-int(revisionLimit); i++ { + if err := r.Client.Delete(ctx, &sortedList.Items[i]); err != nil && !errors.IsNotFound(err) { + klog.ErrorS(err, "Failed to delete clusterSchedulingPolicySnapshot", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&sortedList.Items[i])) + return controller.NewAPIServerError(false, err) + } + } + return nil +} + // TODO handle all the resources selected by placement larger than 1MB size limit of k8s objects. func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec) (*fleetv1beta1.ClusterResourceSnapshot, error) { resourceHash, err := generateResourceHash(resourceSnapshotSpec) @@ -308,6 +346,7 @@ func (r *Reconciler) ensureLatestResourceSnapshot(ctx context.Context, latest *f } // lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index. +// It will return the sorted policy snapshots if it needs to list all the snapshots. // There will be only one active policy snapshot if exists. // It first checks whether there is an active policy snapshot. // If not, it finds the one whose policyIndex label is the largest. @@ -315,7 +354,7 @@ func (r *Reconciler) ensureLatestResourceSnapshot(ctx context.Context, latest *f // Return error when 1) cannot list the snapshots 2) there are more than one active policy snapshots 3) snapshot has the // invalid label value. // 2 & 3 should never happen. -func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, int, error) { +func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, int, *fleetv1beta1.ClusterSchedulingPolicySnapshotList, error) { snapshotList := &fleetv1beta1.ClusterSchedulingPolicySnapshotList{} latestSnapshotLabelMatcher := client.MatchingLabels{ fleetv1beta1.CRPTrackingLabel: crp.Name, @@ -324,47 +363,67 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp crpKObj := klog.KObj(crp) if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil { klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, controller.NewAPIServerError(false, err) + // CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request. + // So the snapshots should be read from cache. + return nil, -1, nil, controller.NewAPIServerError(true, err) } if len(snapshotList.Items) == 1 { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) if err != nil { - klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0])) - return nil, -1, controller.NewUnexpectedBehaviorError(err) + klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0])) + return nil, -1, nil, controller.NewUnexpectedBehaviorError(err) } - return &snapshotList.Items[0], policyIndex, nil + return &snapshotList.Items[0], policyIndex, nil, nil } else if len(snapshotList.Items) > 1 { // It means there are multiple active snapshots and should never happen. err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) klog.ErrorS(err, "Invalid clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, controller.NewUnexpectedBehaviorError(err) + return nil, -1, nil, controller.NewUnexpectedBehaviorError(err) } // When there are no active snapshots, find the one who has the largest policy index. - if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil { - klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, controller.NewAPIServerError(false, err) + sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp) + if err != nil { + return nil, -1, nil, err } - if len(snapshotList.Items) == 0 { + + if len(sortedList.Items) == 0 { // The policy index of the first snapshot will start from 0. - return nil, -1, nil + return nil, -1, sortedList, nil } - index := -1 // the index of the cluster policy snapshot array - lastPolicyIndex := -1 // the assigned policy index of the cluster policy snapshot - for i := range snapshotList.Items { - policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) + latestSnapshot := &sortedList.Items[len(sortedList.Items)-1] + policyIndex, err := parsePolicyIndexFromLabel(latestSnapshot) + if err != nil { + klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterPolicySnapshot", klog.KObj(latestSnapshot)) + return nil, -1, nil, controller.NewUnexpectedBehaviorError(err) + } + return latestSnapshot, policyIndex, sortedList, nil +} + +// listSortedClusterSchedulingPolicySnapshots returns the policy snapshots sorted by the policy index. +func (r *Reconciler) listSortedClusterSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshotList, error) { + snapshotList := &fleetv1beta1.ClusterSchedulingPolicySnapshotList{} + crpKObj := klog.KObj(crp) + if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil { + klog.ErrorS(err, "Failed to list all clusterSchedulingPolicySnapshots", "clusterResourcePlacement", crpKObj) + // CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request. + // So the snapshots should be read from cache. + return nil, controller.NewAPIServerError(true, err) + } + sort.Slice(snapshotList.Items, func(i, j int) bool { + ii, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) if err != nil { - klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[i])) - return nil, -1, controller.NewUnexpectedBehaviorError(err) + klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Failed to parse the policy index label", "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[i])) } - if lastPolicyIndex < policyIndex { - index = i - lastPolicyIndex = policyIndex + ji, err := parsePolicyIndexFromLabel(&snapshotList.Items[j]) + if err != nil { + klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Failed to parse the policy index label", "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[j])) } - } - return &snapshotList.Items[index], lastPolicyIndex, nil + return ii < ji + }) + return snapshotList, nil } -// lookupLatestResourceSnapshot finds the latest snapshots and its resource index. +// lookupLatestResourceSnapshot finds the latest snapshots and. // There will be only one active resource snapshot if exists. // It first checks whether there is an active resource snapshot. // If not, it finds the one whose resourceIndex label is the largest. diff --git a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller_test.go b/pkg/controllers/clusterresourceplacement/controller_test.go similarity index 97% rename from pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller_test.go rename to pkg/controllers/clusterresourceplacement/controller_test.go index 7a92fbc50..5cf1723c3 100644 --- a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/controller_test.go @@ -87,7 +87,7 @@ func clusterResourcePlacementForTest() *fleetv1beta1.ClusterResourcePlacement { } } -func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { +func TestGetOrCreateClusterSchedulingPolicySnapshot(t *testing.T) { wantPolicy := placementPolicyForTest() wantPolicy.NumberOfClusters = nil jsonBytes, err := json.Marshal(wantPolicy) @@ -100,8 +100,11 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { t.Fatalf("failed to create the policy hash: %v", err) } unspecifiedPolicyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) + singleRevisionLimit := int32(1) + multipleRevisionLimit := int32(2) tests := []struct { name string + revisionHistoryLimit *int32 policySnapshots []fleetv1beta1.ClusterSchedulingPolicySnapshot wantPolicySnapshots []fleetv1beta1.ClusterSchedulingPolicySnapshot wantLatestSnapshotIndex int // index of the wantPolicySnapshots array @@ -162,7 +165,8 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { wantLatestSnapshotIndex: 1, }, { - name: "crp policy has no change", + name: "crp policy has no change", + revisionHistoryLimit: &singleRevisionLimit, policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ @@ -225,30 +229,8 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { name: "crp policy has changed and there is no active snapshot", // It happens when last reconcile loop fails after setting the latest label to false and // before creating a new policy snapshot. + revisionHistoryLimit: &multipleRevisionLimit, policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), - Labels: map[string]string{ - fleetv1beta1.PolicyIndexLabel: "0", - fleetv1beta1.IsLatestSnapshotLabel: "false", - fleetv1beta1.CRPTrackingLabel: testName, - }, - OwnerReferences: []metav1.OwnerReference{ - { - Name: testName, - BlockOwnerDeletion: pointer.Bool(true), - Controller: pointer.Bool(true), - APIVersion: fleetAPIVersion, - Kind: "ClusterResourcePlacement", - }, - }, - }, - Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{ - // Policy is not specified. - PolicyHash: unspecifiedPolicyHash, - }, - }, { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 3), @@ -272,8 +254,6 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { PolicyHash: unspecifiedPolicyHash, }, }, - }, - wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -297,6 +277,8 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { PolicyHash: unspecifiedPolicyHash, }, }, + }, + wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 3), @@ -347,10 +329,11 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, }, }, - wantLatestSnapshotIndex: 2, + wantLatestSnapshotIndex: 1, }, { - name: "crp policy has changed and there is an active snapshot", + name: "crp policy has changed and there is an active snapshot", + revisionHistoryLimit: &singleRevisionLimit, policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ @@ -377,29 +360,6 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, }, wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), - Labels: map[string]string{ - fleetv1beta1.PolicyIndexLabel: "0", - fleetv1beta1.IsLatestSnapshotLabel: "false", - fleetv1beta1.CRPTrackingLabel: testName, - }, - OwnerReferences: []metav1.OwnerReference{ - { - Name: testName, - BlockOwnerDeletion: pointer.Bool(true), - Controller: pointer.Bool(true), - APIVersion: fleetAPIVersion, - Kind: "ClusterResourcePlacement", - }, - }, - }, - Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{ - // Policy is not specified. - PolicyHash: unspecifiedPolicyHash, - }, - }, { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), @@ -427,7 +387,7 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, }, }, - wantLatestSnapshotIndex: 1, + wantLatestSnapshotIndex: 0, }, { name: "crp policy has been changed and reverted back and there is no active snapshot", @@ -536,6 +496,8 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, { name: "crp policy has not been changed and only the numberOfCluster is changed", + // cause no new policy snapshot is created, it does not trigger the history limit check. + revisionHistoryLimit: &singleRevisionLimit, policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ @@ -645,6 +607,7 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() crp := clusterResourcePlacementForTest() + crp.Spec.RevisionHistoryLimit = tc.revisionHistoryLimit objects := []client.Object{crp} for i := range tc.policySnapshots { objects = append(objects, &tc.policySnapshots[i]) @@ -676,7 +639,7 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { } } -func TestGetOrCreateClusterPolicySnapshot_failure(t *testing.T) { +func TestGetOrCreateClusterSchedulingPolicySnapshot_failure(t *testing.T) { wantPolicy := placementPolicyForTest() wantPolicy.NumberOfClusters = nil jsonBytes, err := json.Marshal(wantPolicy) @@ -684,9 +647,11 @@ func TestGetOrCreateClusterPolicySnapshot_failure(t *testing.T) { t.Fatalf("failed to create the policy hash: %v", err) } policyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) + invalidRevisionLimit := int32(0) tests := []struct { - name string - policySnapshots []fleetv1beta1.ClusterSchedulingPolicySnapshot + name string + revisionHistoryLimit *int32 + policySnapshots []fleetv1beta1.ClusterSchedulingPolicySnapshot }{ { // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. @@ -753,6 +718,22 @@ func TestGetOrCreateClusterPolicySnapshot_failure(t *testing.T) { }, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "abc", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, }, }, { @@ -880,11 +861,16 @@ func TestGetOrCreateClusterPolicySnapshot_failure(t *testing.T) { }, }, }, + { + name: "invalid revision limit", + revisionHistoryLimit: &invalidRevisionLimit, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() crp := clusterResourcePlacementForTest() + crp.Spec.RevisionHistoryLimit = tc.revisionHistoryLimit objects := []client.Object{crp} for i := range tc.policySnapshots { objects = append(objects, &tc.policySnapshots[i]) diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controllerv1alpha1.go similarity index 100% rename from pkg/controllers/clusterresourceplacement/placement_controller.go rename to pkg/controllers/clusterresourceplacement/placement_controllerv1alpha1.go From f7a5864653a76f4556b0913ab4906fa38c22077c Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Wed, 5 Jul 2023 16:50:02 +0800 Subject: [PATCH 2/2] address comment --- .../clusterresourceplacement/controller.go | 54 +++++++++------- .../controller_test.go | 63 +++++++++++++++++-- 2 files changed, 88 insertions(+), 29 deletions(-) diff --git a/pkg/controllers/clusterresourceplacement/controller.go b/pkg/controllers/clusterresourceplacement/controller.go index 15a9c31cf..99421e3dc 100644 --- a/pkg/controllers/clusterresourceplacement/controller.go +++ b/pkg/controllers/clusterresourceplacement/controller.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/json" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -117,7 +118,7 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp * } // latestPolicySnapshotIndex should be -1 when there is no snapshot. - latestPolicySnapshot, latestPolicySnapshotIndex, sortedList, err := r.lookupLatestClusterPolicySnapshot(ctx, crp) + latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestClusterPolicySnapshot(ctx, crp) if err != nil { return nil, err } @@ -144,7 +145,7 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp * // delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots // won't exceed the limit. - if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp, sortedList); err != nil { + if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp); err != nil { return nil, err } @@ -186,14 +187,12 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp * return latestPolicySnapshot, nil } -func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, sortedList *fleetv1beta1.ClusterSchedulingPolicySnapshotList) error { - var err error - if sortedList == nil { - sortedList, err = r.listSortedClusterSchedulingPolicySnapshots(ctx, crp) - if err != nil { - return err - } +func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) error { + sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp) + if err != nil { + return err } + crpKObj := klog.KObj(crp) // respect the revisionHistoryLimit field revisionLimit := fleetv1beta1.RevisionHistoryLimitDefaultValue @@ -201,8 +200,9 @@ func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Contex revisionLimit = *crp.Spec.RevisionHistoryLimit if revisionLimit <= 0 { err := fmt.Errorf("invalid clusterResourcePlacement %s: invalid revisionHistoryLimit %d", crpKObj, revisionLimit) - klog.ErrorS(err, "Invalid revisionHistoryLimit value", "clusterResourcePlacement", crpKObj) - return controller.NewUnexpectedBehaviorError(err) // webhook should block this user error + klog.ErrorS(controller.NewExpectedBehaviorError(err), "Invalid revisionHistoryLimit value and using default value instead", "clusterResourcePlacement", crpKObj) + // use the default value instead + revisionLimit = fleetv1beta1.RevisionHistoryLimitDefaultValue } } if len(sortedList.Items) < int(revisionLimit) { @@ -346,7 +346,6 @@ func (r *Reconciler) ensureLatestResourceSnapshot(ctx context.Context, latest *f } // lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index. -// It will return the sorted policy snapshots if it needs to list all the snapshots. // There will be only one active policy snapshot if exists. // It first checks whether there is an active policy snapshot. // If not, it finds the one whose policyIndex label is the largest. @@ -354,7 +353,7 @@ func (r *Reconciler) ensureLatestResourceSnapshot(ctx context.Context, latest *f // Return error when 1) cannot list the snapshots 2) there are more than one active policy snapshots 3) snapshot has the // invalid label value. // 2 & 3 should never happen. -func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, int, *fleetv1beta1.ClusterSchedulingPolicySnapshotList, error) { +func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, int, error) { snapshotList := &fleetv1beta1.ClusterSchedulingPolicySnapshotList{} latestSnapshotLabelMatcher := client.MatchingLabels{ fleetv1beta1.CRPTrackingLabel: crp.Name, @@ -365,38 +364,39 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) // CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request. // So the snapshots should be read from cache. - return nil, -1, nil, controller.NewAPIServerError(true, err) + return nil, -1, controller.NewAPIServerError(true, err) } if len(snapshotList.Items) == 1 { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) if err != nil { klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0])) - return nil, -1, nil, controller.NewUnexpectedBehaviorError(err) + return nil, -1, controller.NewUnexpectedBehaviorError(err) } - return &snapshotList.Items[0], policyIndex, nil, nil + return &snapshotList.Items[0], policyIndex, nil } else if len(snapshotList.Items) > 1 { // It means there are multiple active snapshots and should never happen. err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) klog.ErrorS(err, "Invalid clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, nil, controller.NewUnexpectedBehaviorError(err) + return nil, -1, controller.NewUnexpectedBehaviorError(err) } // When there are no active snapshots, find the one who has the largest policy index. + // It should be rare only when CRP is crashed before creating the new active snapshot. sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp) if err != nil { - return nil, -1, nil, err + return nil, -1, err } if len(sortedList.Items) == 0 { // The policy index of the first snapshot will start from 0. - return nil, -1, sortedList, nil + return nil, -1, nil } latestSnapshot := &sortedList.Items[len(sortedList.Items)-1] policyIndex, err := parsePolicyIndexFromLabel(latestSnapshot) if err != nil { klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterPolicySnapshot", klog.KObj(latestSnapshot)) - return nil, -1, nil, controller.NewUnexpectedBehaviorError(err) + return nil, -1, controller.NewUnexpectedBehaviorError(err) } - return latestSnapshot, policyIndex, sortedList, nil + return latestSnapshot, policyIndex, nil } // listSortedClusterSchedulingPolicySnapshots returns the policy snapshots sorted by the policy index. @@ -409,17 +409,25 @@ func (r *Reconciler) listSortedClusterSchedulingPolicySnapshots(ctx context.Cont // So the snapshots should be read from cache. return nil, controller.NewAPIServerError(true, err) } + var errs []error sort.Slice(snapshotList.Items, func(i, j int) bool { ii, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) if err != nil { - klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Failed to parse the policy index label", "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[i])) + klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[i])) + errs = append(errs, err) } ji, err := parsePolicyIndexFromLabel(&snapshotList.Items[j]) if err != nil { - klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Failed to parse the policy index label", "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[j])) + klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[j])) + errs = append(errs, err) } return ii < ji }) + + if len(errs) > 0 { + return nil, controller.NewUnexpectedBehaviorError(utilerrors.NewAggregate(errs)) + } + return snapshotList, nil } diff --git a/pkg/controllers/clusterresourceplacement/controller_test.go b/pkg/controllers/clusterresourceplacement/controller_test.go index 5cf1723c3..0ce90002d 100644 --- a/pkg/controllers/clusterresourceplacement/controller_test.go +++ b/pkg/controllers/clusterresourceplacement/controller_test.go @@ -102,6 +102,7 @@ func TestGetOrCreateClusterSchedulingPolicySnapshot(t *testing.T) { unspecifiedPolicyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) singleRevisionLimit := int32(1) multipleRevisionLimit := int32(2) + invalidRevisionLimit := int32(0) tests := []struct { name string revisionHistoryLimit *int32 @@ -164,6 +165,62 @@ func TestGetOrCreateClusterSchedulingPolicySnapshot(t *testing.T) { }, wantLatestSnapshotIndex: 1, }, + { + name: "new clusterResourcePolicy with invalidRevisionLimit and no existing policy snapshots owned by my-crp", + revisionHistoryLimit: &invalidRevisionLimit, + policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "another-crp-1", + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "1", + fleetv1beta1.IsLatestSnapshotLabel: "true", + fleetv1beta1.CRPTrackingLabel: "another-crp", + }, + }, + }, + }, + wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "another-crp-1", + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "1", + fleetv1beta1.IsLatestSnapshotLabel: "true", + fleetv1beta1.CRPTrackingLabel: "another-crp", + }, + }, + }, + // new policy snapshot owned by the my-crp + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "0", + fleetv1beta1.IsLatestSnapshotLabel: "true", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", + }, + }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, + }, + Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{ + Policy: wantPolicy, + PolicyHash: policyHash, + }, + }, + }, + wantLatestSnapshotIndex: 1, + }, { name: "crp policy has no change", revisionHistoryLimit: &singleRevisionLimit, @@ -647,7 +704,6 @@ func TestGetOrCreateClusterSchedulingPolicySnapshot_failure(t *testing.T) { t.Fatalf("failed to create the policy hash: %v", err) } policyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) - invalidRevisionLimit := int32(0) tests := []struct { name string revisionHistoryLimit *int32 @@ -861,16 +917,11 @@ func TestGetOrCreateClusterSchedulingPolicySnapshot_failure(t *testing.T) { }, }, }, - { - name: "invalid revision limit", - revisionHistoryLimit: &invalidRevisionLimit, - }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() crp := clusterResourcePlacementForTest() - crp.Spec.RevisionHistoryLimit = tc.revisionHistoryLimit objects := []client.Object{crp} for i := range tc.policySnapshots { objects = append(objects, &tc.policySnapshots[i])