diff --git a/apis/placement/v1beta1/clusterresourceplacement_types.go b/apis/placement/v1beta1/clusterresourceplacement_types.go index dcb2e46d9..59127f589 100644 --- a/apis/placement/v1beta1/clusterresourceplacement_types.go +++ b/apis/placement/v1beta1/clusterresourceplacement_types.go @@ -15,6 +15,9 @@ const ( // ClusterResourcePlacementCleanupFinalizer is a finalizer added by the CRP controller to all CRPs, to make sure // that the CRP controller can react to CRP deletions if necessary. ClusterResourcePlacementCleanupFinalizer = fleetPrefix + "crp-cleanup" + + // RevisionHistoryLimitDefaultValue is the default value of RevisionHistoryLimit. + RevisionHistoryLimitDefaultValue = int32(10) ) // +genclient diff --git a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go b/pkg/controllers/clusterresourceplacement/controller.go similarity index 83% rename from pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go rename to pkg/controllers/clusterresourceplacement/controller.go index aabec1bc3..99421e3dc 100644 --- a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go +++ b/pkg/controllers/clusterresourceplacement/controller.go @@ -4,10 +4,12 @@ import ( "context" "crypto/sha256" "fmt" + "sort" "strconv" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/json" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -136,11 +138,17 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp * // set the latest label to false first to make sure there is only one or none active policy snapshot latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false) if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { - klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(latestPolicySnapshot)) return nil, controller.NewAPIServerError(false, err) } } + // delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots + // won't exceed the limit. + if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp); err != nil { + return nil, err + } + // create a new policy snapshot latestPolicySnapshotIndex++ latestPolicySnapshot = &fleetv1beta1.ClusterSchedulingPolicySnapshot{ @@ -179,6 +187,36 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp * return latestPolicySnapshot, nil } +func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) error { + sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp) + if err != nil { + return err + } + + crpKObj := klog.KObj(crp) + // respect the revisionHistoryLimit field + revisionLimit := fleetv1beta1.RevisionHistoryLimitDefaultValue + if crp.Spec.RevisionHistoryLimit != nil { + revisionLimit = *crp.Spec.RevisionHistoryLimit + if revisionLimit <= 0 { + err := fmt.Errorf("invalid clusterResourcePlacement %s: invalid revisionHistoryLimit %d", crpKObj, revisionLimit) + klog.ErrorS(controller.NewExpectedBehaviorError(err), "Invalid revisionHistoryLimit value and using default value instead", "clusterResourcePlacement", crpKObj) + // use the default value instead + revisionLimit = fleetv1beta1.RevisionHistoryLimitDefaultValue + } + } + if len(sortedList.Items) < int(revisionLimit) { + return nil + } + for i := 0; i <= len(sortedList.Items)-int(revisionLimit); i++ { + if err := r.Client.Delete(ctx, &sortedList.Items[i]); err != nil && !errors.IsNotFound(err) { + klog.ErrorS(err, "Failed to delete clusterSchedulingPolicySnapshot", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&sortedList.Items[i])) + return controller.NewAPIServerError(false, err) + } + } + return nil +} + // TODO handle all the resources selected by placement larger than 1MB size limit of k8s objects. func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec) (*fleetv1beta1.ClusterResourceSnapshot, error) { resourceHash, err := generateResourceHash(resourceSnapshotSpec) @@ -324,12 +362,14 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp crpKObj := klog.KObj(crp) if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil { klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, controller.NewAPIServerError(false, err) + // CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request. + // So the snapshots should be read from cache. + return nil, -1, controller.NewAPIServerError(true, err) } if len(snapshotList.Items) == 1 { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) if err != nil { - klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0])) + klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0])) return nil, -1, controller.NewUnexpectedBehaviorError(err) } return &snapshotList.Items[0], policyIndex, nil @@ -340,31 +380,58 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp return nil, -1, controller.NewUnexpectedBehaviorError(err) } // When there are no active snapshots, find the one who has the largest policy index. - if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil { - klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, controller.NewAPIServerError(false, err) + // It should be rare only when CRP is crashed before creating the new active snapshot. + sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp) + if err != nil { + return nil, -1, err } - if len(snapshotList.Items) == 0 { + + if len(sortedList.Items) == 0 { // The policy index of the first snapshot will start from 0. return nil, -1, nil } - index := -1 // the index of the cluster policy snapshot array - lastPolicyIndex := -1 // the assigned policy index of the cluster policy snapshot - for i := range snapshotList.Items { - policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) + latestSnapshot := &sortedList.Items[len(sortedList.Items)-1] + policyIndex, err := parsePolicyIndexFromLabel(latestSnapshot) + if err != nil { + klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterPolicySnapshot", klog.KObj(latestSnapshot)) + return nil, -1, controller.NewUnexpectedBehaviorError(err) + } + return latestSnapshot, policyIndex, nil +} + +// listSortedClusterSchedulingPolicySnapshots returns the policy snapshots sorted by the policy index. +func (r *Reconciler) listSortedClusterSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshotList, error) { + snapshotList := &fleetv1beta1.ClusterSchedulingPolicySnapshotList{} + crpKObj := klog.KObj(crp) + if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil { + klog.ErrorS(err, "Failed to list all clusterSchedulingPolicySnapshots", "clusterResourcePlacement", crpKObj) + // CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request. + // So the snapshots should be read from cache. + return nil, controller.NewAPIServerError(true, err) + } + var errs []error + sort.Slice(snapshotList.Items, func(i, j int) bool { + ii, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) if err != nil { - klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[i])) - return nil, -1, controller.NewUnexpectedBehaviorError(err) + klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[i])) + errs = append(errs, err) } - if lastPolicyIndex < policyIndex { - index = i - lastPolicyIndex = policyIndex + ji, err := parsePolicyIndexFromLabel(&snapshotList.Items[j]) + if err != nil { + klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[j])) + errs = append(errs, err) } + return ii < ji + }) + + if len(errs) > 0 { + return nil, controller.NewUnexpectedBehaviorError(utilerrors.NewAggregate(errs)) } - return &snapshotList.Items[index], lastPolicyIndex, nil + + return snapshotList, nil } -// lookupLatestResourceSnapshot finds the latest snapshots and its resource index. +// lookupLatestResourceSnapshot finds the latest snapshots and. // There will be only one active resource snapshot if exists. // It first checks whether there is an active resource snapshot. // If not, it finds the one whose resourceIndex label is the largest. diff --git a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller_test.go b/pkg/controllers/clusterresourceplacement/controller_test.go similarity index 96% rename from pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller_test.go rename to pkg/controllers/clusterresourceplacement/controller_test.go index 7a92fbc50..0ce90002d 100644 --- a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/controller_test.go @@ -87,7 +87,7 @@ func clusterResourcePlacementForTest() *fleetv1beta1.ClusterResourcePlacement { } } -func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { +func TestGetOrCreateClusterSchedulingPolicySnapshot(t *testing.T) { wantPolicy := placementPolicyForTest() wantPolicy.NumberOfClusters = nil jsonBytes, err := json.Marshal(wantPolicy) @@ -100,8 +100,12 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { t.Fatalf("failed to create the policy hash: %v", err) } unspecifiedPolicyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) + singleRevisionLimit := int32(1) + multipleRevisionLimit := int32(2) + invalidRevisionLimit := int32(0) tests := []struct { name string + revisionHistoryLimit *int32 policySnapshots []fleetv1beta1.ClusterSchedulingPolicySnapshot wantPolicySnapshots []fleetv1beta1.ClusterSchedulingPolicySnapshot wantLatestSnapshotIndex int // index of the wantPolicySnapshots array @@ -162,8 +166,32 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { wantLatestSnapshotIndex: 1, }, { - name: "crp policy has no change", + name: "new clusterResourcePolicy with invalidRevisionLimit and no existing policy snapshots owned by my-crp", + revisionHistoryLimit: &invalidRevisionLimit, policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "another-crp-1", + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "1", + fleetv1beta1.IsLatestSnapshotLabel: "true", + fleetv1beta1.CRPTrackingLabel: "another-crp", + }, + }, + }, + }, + wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "another-crp-1", + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "1", + fleetv1beta1.IsLatestSnapshotLabel: "true", + fleetv1beta1.CRPTrackingLabel: "another-crp", + }, + }, + }, + // new policy snapshot owned by the my-crp { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -191,7 +219,12 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, }, }, - wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ + wantLatestSnapshotIndex: 1, + }, + { + name: "crp policy has no change", + revisionHistoryLimit: &singleRevisionLimit, + policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -219,19 +252,13 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, }, }, - wantLatestSnapshotIndex: 0, - }, - { - name: "crp policy has changed and there is no active snapshot", - // It happens when last reconcile loop fails after setting the latest label to false and - // before creating a new policy snapshot. - policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ + wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), Labels: map[string]string{ fleetv1beta1.PolicyIndexLabel: "0", - fleetv1beta1.IsLatestSnapshotLabel: "false", + fleetv1beta1.IsLatestSnapshotLabel: "true", fleetv1beta1.CRPTrackingLabel: testName, }, OwnerReferences: []metav1.OwnerReference{ @@ -243,12 +270,24 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { Kind: "ClusterResourcePlacement", }, }, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(3), + }, }, Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{ - // Policy is not specified. - PolicyHash: unspecifiedPolicyHash, + Policy: wantPolicy, + PolicyHash: policyHash, }, }, + }, + wantLatestSnapshotIndex: 0, + }, + { + name: "crp policy has changed and there is no active snapshot", + // It happens when last reconcile loop fails after setting the latest label to false and + // before creating a new policy snapshot. + revisionHistoryLimit: &multipleRevisionLimit, + policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 3), @@ -272,8 +311,6 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { PolicyHash: unspecifiedPolicyHash, }, }, - }, - wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -297,6 +334,8 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { PolicyHash: unspecifiedPolicyHash, }, }, + }, + wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 3), @@ -347,10 +386,11 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, }, }, - wantLatestSnapshotIndex: 2, + wantLatestSnapshotIndex: 1, }, { - name: "crp policy has changed and there is an active snapshot", + name: "crp policy has changed and there is an active snapshot", + revisionHistoryLimit: &singleRevisionLimit, policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ @@ -377,29 +417,6 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, }, wantPolicySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), - Labels: map[string]string{ - fleetv1beta1.PolicyIndexLabel: "0", - fleetv1beta1.IsLatestSnapshotLabel: "false", - fleetv1beta1.CRPTrackingLabel: testName, - }, - OwnerReferences: []metav1.OwnerReference{ - { - Name: testName, - BlockOwnerDeletion: pointer.Bool(true), - Controller: pointer.Bool(true), - APIVersion: fleetAPIVersion, - Kind: "ClusterResourcePlacement", - }, - }, - }, - Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{ - // Policy is not specified. - PolicyHash: unspecifiedPolicyHash, - }, - }, { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), @@ -427,7 +444,7 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, }, }, - wantLatestSnapshotIndex: 1, + wantLatestSnapshotIndex: 0, }, { name: "crp policy has been changed and reverted back and there is no active snapshot", @@ -536,6 +553,8 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { }, { name: "crp policy has not been changed and only the numberOfCluster is changed", + // cause no new policy snapshot is created, it does not trigger the history limit check. + revisionHistoryLimit: &singleRevisionLimit, policySnapshots: []fleetv1beta1.ClusterSchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ @@ -645,6 +664,7 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() crp := clusterResourcePlacementForTest() + crp.Spec.RevisionHistoryLimit = tc.revisionHistoryLimit objects := []client.Object{crp} for i := range tc.policySnapshots { objects = append(objects, &tc.policySnapshots[i]) @@ -676,7 +696,7 @@ func TestGetOrCreateClusterPolicySnapshot(t *testing.T) { } } -func TestGetOrCreateClusterPolicySnapshot_failure(t *testing.T) { +func TestGetOrCreateClusterSchedulingPolicySnapshot_failure(t *testing.T) { wantPolicy := placementPolicyForTest() wantPolicy.NumberOfClusters = nil jsonBytes, err := json.Marshal(wantPolicy) @@ -685,8 +705,9 @@ func TestGetOrCreateClusterPolicySnapshot_failure(t *testing.T) { } policyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) tests := []struct { - name string - policySnapshots []fleetv1beta1.ClusterSchedulingPolicySnapshot + name string + revisionHistoryLimit *int32 + policySnapshots []fleetv1beta1.ClusterSchedulingPolicySnapshot }{ { // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. @@ -753,6 +774,22 @@ func TestGetOrCreateClusterPolicySnapshot_failure(t *testing.T) { }, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1beta1.PolicyIndexLabel: "abc", + fleetv1beta1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, }, }, { diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controllerv1alpha1.go similarity index 100% rename from pkg/controllers/clusterresourceplacement/placement_controller.go rename to pkg/controllers/clusterresourceplacement/placement_controllerv1alpha1.go