Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apis/placement/v1beta1/clusterresourceplacement_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (
// ClusterResourcePlacementCleanupFinalizer is a finalizer added by the CRP controller to all CRPs, to make sure
// that the CRP controller can react to CRP deletions if necessary.
ClusterResourcePlacementCleanupFinalizer = fleetPrefix + "crp-cleanup"

// RevisionHistoryLimitDefaultValue is the default value of RevisionHistoryLimit.
RevisionHistoryLimitDefaultValue = int32(10)
)

// +genclient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"crypto/sha256"
"fmt"
"sort"
"strconv"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -136,11 +138,17 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp *
// set the latest label to false first to make sure there is only one or none active policy snapshot
latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false)
if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(latestPolicySnapshot))
return nil, controller.NewAPIServerError(false, err)
}
}

// delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots
// won't exceed the limit.
if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp); err != nil {
return nil, err
}

// create a new policy snapshot
latestPolicySnapshotIndex++
latestPolicySnapshot = &fleetv1beta1.ClusterSchedulingPolicySnapshot{
Expand Down Expand Up @@ -179,6 +187,36 @@ func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp *
return latestPolicySnapshot, nil
}

func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) error {
sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp)
if err != nil {
return err
}

crpKObj := klog.KObj(crp)
// respect the revisionHistoryLimit field
revisionLimit := fleetv1beta1.RevisionHistoryLimitDefaultValue
if crp.Spec.RevisionHistoryLimit != nil {
revisionLimit = *crp.Spec.RevisionHistoryLimit
if revisionLimit <= 0 {
err := fmt.Errorf("invalid clusterResourcePlacement %s: invalid revisionHistoryLimit %d", crpKObj, revisionLimit)
klog.ErrorS(controller.NewExpectedBehaviorError(err), "Invalid revisionHistoryLimit value and using default value instead", "clusterResourcePlacement", crpKObj)
// use the default value instead
revisionLimit = fleetv1beta1.RevisionHistoryLimitDefaultValue
}
}
if len(sortedList.Items) < int(revisionLimit) {
return nil
}
for i := 0; i <= len(sortedList.Items)-int(revisionLimit); i++ {
if err := r.Client.Delete(ctx, &sortedList.Items[i]); err != nil && !errors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete clusterSchedulingPolicySnapshot", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&sortedList.Items[i]))
return controller.NewAPIServerError(false, err)
}
}
return nil
}

// TODO handle all the resources selected by placement larger than 1MB size limit of k8s objects.
func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec) (*fleetv1beta1.ClusterResourceSnapshot, error) {
resourceHash, err := generateResourceHash(resourceSnapshotSpec)
Expand Down Expand Up @@ -324,12 +362,14 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil {
klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(false, err)
// CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request.
// So the snapshots should be read from cache.
return nil, -1, controller.NewAPIServerError(true, err)
}
if len(snapshotList.Items) == 1 {
policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0])
if err != nil {
klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0]))
klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0]))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
return &snapshotList.Items[0], policyIndex, nil
Expand All @@ -340,31 +380,58 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
// When there are no active snapshots, find the one who has the largest policy index.
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(false, err)
// It should be rare only when CRP is crashed before creating the new active snapshot.
sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp)
if err != nil {
return nil, -1, err
}
if len(snapshotList.Items) == 0 {

if len(sortedList.Items) == 0 {
// The policy index of the first snapshot will start from 0.
return nil, -1, nil
}
index := -1 // the index of the cluster policy snapshot array
lastPolicyIndex := -1 // the assigned policy index of the cluster policy snapshot
for i := range snapshotList.Items {
policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i])
latestSnapshot := &sortedList.Items[len(sortedList.Items)-1]
policyIndex, err := parsePolicyIndexFromLabel(latestSnapshot)
if err != nil {
klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterPolicySnapshot", klog.KObj(latestSnapshot))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
return latestSnapshot, policyIndex, nil
}

// listSortedClusterSchedulingPolicySnapshots returns the policy snapshots sorted by the policy index.
func (r *Reconciler) listSortedClusterSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshotList, error) {
snapshotList := &fleetv1beta1.ClusterSchedulingPolicySnapshotList{}
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterSchedulingPolicySnapshots", "clusterResourcePlacement", crpKObj)
// CRP controller needs a scheduling policy snapshot watcher to enqueue the CRP request.
// So the snapshots should be read from cache.
return nil, controller.NewAPIServerError(true, err)
}
var errs []error
sort.Slice(snapshotList.Items, func(i, j int) bool {
ii, err := parsePolicyIndexFromLabel(&snapshotList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[i]))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[i]))
errs = append(errs, err)
}
if lastPolicyIndex < policyIndex {
index = i
lastPolicyIndex = policyIndex
ji, err := parsePolicyIndexFromLabel(&snapshotList.Items[j])
if err != nil {
klog.ErrorS(err, "Failed to parse the policy index label", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&snapshotList.Items[j]))
errs = append(errs, err)
}
return ii < ji
})

if len(errs) > 0 {
return nil, controller.NewUnexpectedBehaviorError(utilerrors.NewAggregate(errs))
}
return &snapshotList.Items[index], lastPolicyIndex, nil

return snapshotList, nil
}

// lookupLatestResourceSnapshot finds the latest snapshots and its resource index.
// lookupLatestResourceSnapshot finds the latest snapshots and.
// There will be only one active resource snapshot if exists.
// It first checks whether there is an active resource snapshot.
// If not, it finds the one whose resourceIndex label is the largest.
Expand Down
Loading