Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apis/placement/v1beta1/policysnapshot_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (

// PolicySnapshotNameFmt is clusterPolicySnapshot name format: {CRPName}-{PolicySnapshotIndex}.
PolicySnapshotNameFmt = "%s-%d"

// NumberOfClustersAnnotation is the annotation that indicates how many clusters should be selected for selectN placement type.
NumberOfClustersAnnotation = fleetPrefix + "numberOfClusters"
)

// +genclient
Expand Down
92 changes: 76 additions & 16 deletions pkg/controllers/clusterresourceplacement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl
// If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling.
func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (ctrl.Result, error) {
crpKObj := klog.KObj(crp)
policyHash, err := generatePolicyHash(crp.Spec.Policy)
schedulingPolicy := *crp.Spec.Policy // will exclude the numberOfClusters
schedulingPolicy.NumberOfClusters = nil
policyHash, err := generatePolicyHash(&schedulingPolicy)
if err != nil {
klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj)
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
Expand All @@ -369,7 +371,11 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
return ctrl.Result{}, controller.NewAPIServerError(err)
}
}
if latestPolicySnapshot == nil || string(latestPolicySnapshot.Spec.PolicyHash) != policyHash {
if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) == policyHash {
if err := r.ensureLatestPolicySnapshot(ctx, crp, latestPolicySnapshot); err != nil {
return ctrl.Result{}, err
}
} else {
// create a new policy snapshot
latestPolicySnapshotIndex++
latestPolicySnapshot = &fleetv1beta1.ClusterPolicySnapshot{
Expand All @@ -382,36 +388,73 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
},
},
Spec: fleetv1beta1.PolicySnapshotSpec{
Policy: crp.Spec.Policy,
Policy: &schedulingPolicy,
PolicyHash: []byte(policyHash),
},
}
policySnapshotKObj := klog.KObj(latestPolicySnapshot)
if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil {
klog.ErrorS(err, "Failed to create set owner reference", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
klog.ErrorS(err, "Failed to set owner reference", "clusterPolicySnapshot", policySnapshotKObj)
// should never happen
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
}
// make sure each policySnapshot should always have the annotation if CRP is selectN type
if crp.Spec.Policy != nil &&
crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType &&
crp.Spec.Policy.NumberOfClusters != nil {
latestPolicySnapshot.Annotations = map[string]string{
fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(int(*crp.Spec.Policy.NumberOfClusters)),
}
}

if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", policySnapshotKObj)
return ctrl.Result{}, controller.NewAPIServerError(err)
}
} else if latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] != strconv.FormatBool(true) {
}

// create clusterResourceSnapshot
// update the status based on the latestPolicySnapshot status
// update the status based on the work
return ctrl.Result{}, nil
}

// ensureLatestPolicySnapshot ensures the latest policySnapshot has the isLatest label and the numberOfClusters are updated.
func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, latest *fleetv1beta1.ClusterPolicySnapshot) error {
needUpdate := false
if latest.Labels[fleetv1beta1.IsLatestSnapshotLabel] != strconv.FormatBool(true) {
// When latestPolicySnapshot.Spec.PolicyHash == policyHash,
// It could happen when the controller just sets the latest label to false for the old snapshot, and fails to
// create a new policy snapshot.
// And then the customers revert back their policy to the old one again.
// In this case, the "latest" snapshot without isLatest label has the same policy hash as the current policy.

latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(true)
if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to true", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
return ctrl.Result{}, controller.NewAPIServerError(err)
latest.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(true)
needUpdate = true
}

if crp.Spec.Policy != nil &&
crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType &&
crp.Spec.Policy.NumberOfClusters != nil {
oldCount, err := parseNumberOfClustersFromAnnotation(latest)
if err != nil {
klog.ErrorS(err, "Failed to parse the numberOfClusterAnnotation", "clusterPolicySnapshot", klog.KObj(latest))
return controller.NewUnexpectedBehaviorError(err)
}
newCount := int(*crp.Spec.Policy.NumberOfClusters)
if oldCount != newCount {
latest.Annotations[fleetv1beta1.NumberOfClustersAnnotation] = strconv.Itoa(newCount)
needUpdate = true
}
}
// create clusterResourceSnapshot
// update the status based on the latestPolicySnapshot status
// update the status based on the work
return ctrl.Result{}, nil
if !needUpdate {
return nil
}
if err := r.Client.Update(ctx, latest); err != nil {
klog.ErrorS(err, "Failed to update the clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latest))
return controller.NewAPIServerError(err)
}
return nil
}

// lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index.
Expand All @@ -436,6 +479,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
if len(snapshotList.Items) == 1 {
policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0])
if err != nil {
klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0]))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
return &snapshotList.Items[0], policyIndex, nil
Expand All @@ -459,6 +503,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
for i := range snapshotList.Items {
policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[i]))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
if lastPolicyIndex < policyIndex {
Expand All @@ -469,14 +514,29 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
return &snapshotList.Items[index], lastPolicyIndex, nil
}

// parsePolicyIndexFromLabel returns error when parsing the label which should never return error in production.
func parsePolicyIndexFromLabel(s *fleetv1beta1.ClusterPolicySnapshot) (int, error) {
indexLabel := s.Labels[fleetv1beta1.PolicyIndexLabel]
v, err := strconv.Atoi(indexLabel)
if err != nil {
klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel)
// should never happen
return -1, err
}
if v < 0 {
return -1, fmt.Errorf("policy index should not be negative: %d", v)
}
return v, nil
}

// parseNumberOfClustersFromAnnotation returns error when parsing the annotation which should never return error in production.
func parseNumberOfClustersFromAnnotation(s *fleetv1beta1.ClusterPolicySnapshot) (int, error) {
n := s.Annotations[fleetv1beta1.NumberOfClustersAnnotation]
v, err := strconv.Atoi(n)
if err != nil {
return -1, err
}
if v < 0 {
return -1, fmt.Errorf("numberOfCluster should not be negative: %d", v)
}
return v, nil
}

Expand Down
Loading