Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apis/placement/v1beta1/resourcesnapshot_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (

// NumberOfResourceSnapshotsAnnotation is the annotation that contains the total number of resource snapshots.
NumberOfResourceSnapshotsAnnotation = fleetPrefix + "numberOfResourceSnapshots"

// ResourceSnapshotNameFmt is resourcePolicySnapshot name format: {CRPName}-{resourceIndex}.
ResourceSnapshotNameFmt = "%s-%d"
)

// +genclient
Expand All @@ -38,6 +41,7 @@ const (
// We assign an ever-increasing index for each such group of resourceSnapshots.
// The name convention of a clusterResourceSnapshot is {CRPName}-{resourceIndex}(-{subindex})*
// where the name of the first snapshot of a group has no subindex part so its name is {CRPName}-{resourceIndex}.
// resourceIndex will begin with 0.
// Each snapshot MUST have the following labels:
// - `CRPTrackingLabel` which points to its owner CRP.
// - `ResourceIndexLabel` which is the index of the snapshot group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,62 @@ import (
"go.goms.io/fleet/pkg/utils/controller"
)

func (r *Reconciler) Reconcile(_ context.Context, _ controller.QueueKey) (ctrl.Result, error) {
return ctrl.Result{}, nil
func (r *Reconciler) Reconcile(ctx context.Context, _ controller.QueueKey) (ctrl.Result, error) {
// TODO workaround to bypass lint check
return r.handleUpdate(ctx, nil)
}

// handleUpdate handles the create/update clusterResourcePlacement event.
// It creates corresponding clusterPolicySnapshot and clusterResourceSnapshot if needed and updates the status based on
// clusterPolicySnapshot status and work status.
// If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling.
func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (ctrl.Result, error) {
latestPolicySnapshot, err := r.getOrCreateClusterPolicySnapshot(ctx, crp)
if err != nil {
return ctrl.Result{}, err
}
selectedResources, err := r.selectResourcesForPlacement(crp)
if err != nil {
return ctrl.Result{}, err
}
resourceSnapshotSpec := fleetv1beta1.ResourceSnapshotSpec{
SelectedResources: selectedResources,
PolicySnapshotName: latestPolicySnapshot.Name,
}
_, err = r.getOrCreateClusterResourceSnapshot(ctx, crp, &resourceSnapshotSpec)
if err != nil {
return ctrl.Result{}, err
}

// update the status based on the latestPolicySnapshot status
// update the status based on the work
return ctrl.Result{}, nil
}

func (r *Reconciler) getOrCreateClusterPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, error) {
crpKObj := klog.KObj(crp)
schedulingPolicy := *crp.Spec.Policy // will exclude the numberOfClusters
schedulingPolicy.NumberOfClusters = nil
policyHash, err := generatePolicyHash(&schedulingPolicy)
if err != nil {
klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj)
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
return nil, controller.NewUnexpectedBehaviorError(err)
}

// latestPolicySnapshotIndex should be -1 when there is no snapshot.
latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestClusterPolicySnapshot(ctx, crp)
if err != nil {
return ctrl.Result{}, err
return nil, err
}

if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) == policyHash {
if err := r.ensureLatestPolicySnapshot(ctx, crp, latestPolicySnapshot); err != nil {
return nil, err
}
return latestPolicySnapshot, nil
}

// Need to create new snapshot when 1) there is no snapshots or 2) the latest snapshot hash != current one.
// mark the last policy snapshot as inactive if it is different from what we have now
if latestPolicySnapshot != nil &&
string(latestPolicySnapshot.Spec.PolicyHash) != policyHash &&
Expand All @@ -48,59 +81,119 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false)
if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
return ctrl.Result{}, controller.NewAPIServerError(false, err)
return nil, controller.NewAPIServerError(false, err)
}
}
if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) == policyHash {
if err := r.ensureLatestPolicySnapshot(ctx, crp, latestPolicySnapshot); err != nil {
return ctrl.Result{}, err
}
} else {
// create a new policy snapshot
latestPolicySnapshotIndex++
latestPolicySnapshot = &fleetv1beta1.ClusterSchedulingPolicySnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, crp.Name, latestPolicySnapshotIndex),
Labels: map[string]string{
fleetv1beta1.CRPTrackingLabel: crp.Name,
fleetv1beta1.IsLatestSnapshotLabel: strconv.FormatBool(true),
fleetv1beta1.PolicyIndexLabel: strconv.Itoa(latestPolicySnapshotIndex),
},
},
Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{
Policy: &schedulingPolicy,
PolicyHash: []byte(policyHash),

// create a new policy snapshot
latestPolicySnapshotIndex++
latestPolicySnapshot = &fleetv1beta1.ClusterSchedulingPolicySnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, crp.Name, latestPolicySnapshotIndex),
Labels: map[string]string{
fleetv1beta1.CRPTrackingLabel: crp.Name,
fleetv1beta1.IsLatestSnapshotLabel: strconv.FormatBool(true),
fleetv1beta1.PolicyIndexLabel: strconv.Itoa(latestPolicySnapshotIndex),
},
},
Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{
Policy: &schedulingPolicy,
PolicyHash: []byte(policyHash),
},
}
policySnapshotKObj := klog.KObj(latestPolicySnapshot)
if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil {
klog.ErrorS(err, "Failed to set owner reference", "clusterPolicySnapshot", policySnapshotKObj)
// should never happen
return nil, controller.NewUnexpectedBehaviorError(err)
}
// make sure each policySnapshot should always have the annotation if CRP is selectN type
if crp.Spec.Policy != nil &&
crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType &&
crp.Spec.Policy.NumberOfClusters != nil {
latestPolicySnapshot.Annotations = map[string]string{
fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(int(*crp.Spec.Policy.NumberOfClusters)),
}
policySnapshotKObj := klog.KObj(latestPolicySnapshot)
if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil {
klog.ErrorS(err, "Failed to set owner reference", "clusterPolicySnapshot", policySnapshotKObj)
// should never happen
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
}

if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", policySnapshotKObj)
return nil, controller.NewAPIServerError(false, err)
}
return latestPolicySnapshot, nil
}

// TODO handle all the resources selected by placement larger than 1MB size limit of k8s objects.
func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec) (*fleetv1beta1.ClusterResourceSnapshot, error) {
resourceHash, err := generateResourceHash(resourceSnapshotSpec)
if err != nil {
klog.ErrorS(err, "Failed to generate resource hash of crp", "clusterResourcePlacement", klog.KObj(crp))
return nil, controller.NewUnexpectedBehaviorError(err)
}

// latestResourceSnapshotIndex should be -1 when there is no snapshot.
latestResourceSnapshot, latestResourceSnapshotIndex, err := r.lookupLatestResourceSnapshot(ctx, crp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the resourceSnapshot and schedulingPolicySnapshot are generated following the same rule. Do we need to create two functions to look up the last? Will generic help unite them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation will diverge when we handle the 1MB limit for the resourceSnapshot.

So prefer not to unify them, otherwise there will be too many "if else" conditions.

if err != nil {
return nil, err
}

latestResourceSnapshotHash := ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are already retrieving latest resource snapshot and its index in the lookup function, why not retrieve hashes there? It would simplify the logic a little bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lookup func is little complicated for now, so prefer to get the hash outside.

Refactored the code. PTAL to see if the readability is better.

if latestResourceSnapshot != nil {
latestResourceSnapshotHash, err = parseResourceGroupHashFromAnnotation(latestResourceSnapshot)
if err != nil {
klog.ErrorS(err, "Failed to get the ResourceGroupHashAnnotation", "clusterResourceSnapshot", klog.KObj(latestResourceSnapshot))
return nil, controller.NewUnexpectedBehaviorError(err)
}
// make sure each policySnapshot should always have the annotation if CRP is selectN type
if crp.Spec.Policy != nil &&
crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType &&
crp.Spec.Policy.NumberOfClusters != nil {
latestPolicySnapshot.Annotations = map[string]string{
fleetv1beta1.NumberOfClustersAnnotation: strconv.Itoa(int(*crp.Spec.Policy.NumberOfClusters)),
}
}

if latestResourceSnapshot != nil && latestResourceSnapshotHash == resourceHash {
if err := r.ensureLatestResourceSnapshot(ctx, latestResourceSnapshot); err != nil {
return nil, err
}
return latestResourceSnapshot, nil
}

if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", policySnapshotKObj)
return ctrl.Result{}, controller.NewAPIServerError(false, err)
// Need to create new snapshot when 1) there is no snapshots or 2) the latest snapshot hash != current one.
// mark the last resource snapshot as inactive if it is different from what we have now
if latestResourceSnapshot != nil &&
latestResourceSnapshotHash != resourceHash &&
latestResourceSnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] == strconv.FormatBool(true) {
// set the latest label to false first to make sure there is only one or none active resource snapshot
latestResourceSnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false)
if err := r.Client.Update(ctx, latestResourceSnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterResourceSnapshot", klog.KObj(latestResourceSnapshot))
return nil, controller.NewAPIServerError(false, err)
}
}

// create clusterResourceSnapshot
// TODO
if _, err := r.selectResourcesForPlacement(crp); err != nil {
return ctrl.Result{}, err
// create a new resource snapshot
latestResourceSnapshotIndex++
latestResourceSnapshot = &fleetv1beta1.ClusterResourceSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(fleetv1beta1.ResourceSnapshotNameFmt, crp.Name, latestResourceSnapshotIndex),
Labels: map[string]string{
fleetv1beta1.CRPTrackingLabel: crp.Name,
fleetv1beta1.IsLatestSnapshotLabel: strconv.FormatBool(true),
fleetv1beta1.ResourceIndexLabel: strconv.Itoa(latestResourceSnapshotIndex),
},
Annotations: map[string]string{
fleetv1beta1.ResourceGroupHashAnnotation: resourceHash,
},
},
Spec: *resourceSnapshotSpec,
}
// update the status based on the latestPolicySnapshot status
// update the status based on the work
return ctrl.Result{}, nil
resourceSnapshotKObj := klog.KObj(latestResourceSnapshot)
if err := controllerutil.SetControllerReference(crp, latestResourceSnapshot, r.Scheme); err != nil {
klog.ErrorS(err, "Failed to set owner reference", "clusterResourceSnapshot", resourceSnapshotKObj)
// should never happen
return nil, controller.NewUnexpectedBehaviorError(err)
}

if err := r.Client.Create(ctx, latestResourceSnapshot); err != nil {
klog.ErrorS(err, "Failed to create new clusterResourceSnapshot", "clusterResourceSnapshot", resourceSnapshotKObj)
return nil, controller.NewAPIServerError(false, err)
}
return latestResourceSnapshot, nil
}

// ensureLatestPolicySnapshot ensures the latest policySnapshot has the isLatest label and the numberOfClusters are updated.
Expand Down Expand Up @@ -141,6 +234,23 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv
return nil
}

// ensureLatestResourceSnapshot ensures the latest resourceSnapshot has the isLatest label.
func (r *Reconciler) ensureLatestResourceSnapshot(ctx context.Context, latest *fleetv1beta1.ClusterResourceSnapshot) error {
if latest.Labels[fleetv1beta1.IsLatestSnapshotLabel] == strconv.FormatBool(true) {
return nil
}
// It could happen when the controller just sets the latest label to false for the old snapshot, and fails to
// create a new resource snapshot.
// And then the customers revert back their resource to the old one again.
// In this case, the "latest" snapshot without isLatest label has the same resource hash as the current one.
latest.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(true)
if err := r.Client.Update(ctx, latest); err != nil {
klog.ErrorS(err, "Failed to update the clusterResourceSnapshot", "ClusterResourceSnapshot", klog.KObj(latest))
return controller.NewAPIServerError(false, err)
}
return nil
}

// lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index.
// There will be only one active policy snapshot if exists.
// It first checks whether there is an active policy snapshot.
Expand Down Expand Up @@ -170,7 +280,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
} else if len(snapshotList.Items) > 1 {
// It means there are multiple active snapshots and should never happen.
err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name)
klog.ErrorS(err, "It should never happen", "clusterResourcePlacement", crpKObj)
klog.ErrorS(err, "Invalid clusterPolicySnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
// When there are no active snapshots, find the one who has the largest policy index.
Expand Down Expand Up @@ -198,15 +308,79 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
return &snapshotList.Items[index], lastPolicyIndex, nil
}

// lookupLatestResourceSnapshot finds the latest snapshots and its resource index.
// There will be only one active resource snapshot if exists.
// It first checks whether there is an active resource snapshot.
// If not, it finds the one whose resourceIndex label is the largest.
// The resource index will always start from 0.
// Return error when 1) cannot list the snapshots 2) there are more than one active resource snapshots 3) snapshot has the
// invalid label value.
// 2 & 3 should never happen.
func (r *Reconciler) lookupLatestResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterResourceSnapshot, int, error) {
snapshotList := &fleetv1beta1.ClusterResourceSnapshotList{}
latestSnapshotLabelMatcher := client.MatchingLabels{
fleetv1beta1.CRPTrackingLabel: crp.Name,
fleetv1beta1.IsLatestSnapshotLabel: strconv.FormatBool(true),
}
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil {
klog.ErrorS(err, "Failed to list active clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(false, err)
}
if len(snapshotList.Items) == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we handle the multiple resourceSnapshots objects per index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not now, will handle it in the second phase.

resourceIndex, err := parseResourceIndexFromLabel(&snapshotList.Items[0])
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourceSnapshot", klog.KObj(&snapshotList.Items[0]))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
return &snapshotList.Items[0], resourceIndex, nil
} else if len(snapshotList.Items) > 1 {
// It means there are multiple active snapshots and should never happen.
err := fmt.Errorf("there are %d active clusterResourceSnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name)
klog.ErrorS(err, "Invalid clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
// When there are no active snapshots, find the one who has the largest resource index.
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(false, err)
}
if len(snapshotList.Items) == 0 {
// The resource index of the first snapshot will start from 0.
return nil, -1, nil
}
index := -1 // the index of the cluster resource snapshot array
lastResourceIndex := -1 // the assigned resource index of the cluster resource snapshot
for i := range snapshotList.Items {
resourceIndex, err := parseResourceIndexFromLabel(&snapshotList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourceSnapshot", klog.KObj(&snapshotList.Items[i]))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
if lastResourceIndex < resourceIndex {
index = i
lastResourceIndex = resourceIndex
}
}
return &snapshotList.Items[index], lastResourceIndex, nil
}

// parsePolicyIndexFromLabel returns error when parsing the label which should never return error in production.
func parsePolicyIndexFromLabel(s *fleetv1beta1.ClusterSchedulingPolicySnapshot) (int, error) {
indexLabel := s.Labels[fleetv1beta1.PolicyIndexLabel]
v, err := strconv.Atoi(indexLabel)
if err != nil {
return -1, err
if err != nil || v < 0 {
return -1, fmt.Errorf("invalid policy index %q, error: %w", indexLabel, err)
}
if v < 0 {
return -1, fmt.Errorf("policy index should not be negative: %d", v)
return v, nil
}

// parseResourceIndexFromLabel returns error when parsing the label which should never return error in production.
func parseResourceIndexFromLabel(s *fleetv1beta1.ClusterResourceSnapshot) (int, error) {
indexLabel := s.Labels[fleetv1beta1.ResourceIndexLabel]
v, err := strconv.Atoi(indexLabel)
if err != nil || v < 0 {
return -1, fmt.Errorf("invalid resource index %q, error: %w", indexLabel, err)
}
return v, nil
}
Expand All @@ -215,11 +389,8 @@ func parsePolicyIndexFromLabel(s *fleetv1beta1.ClusterSchedulingPolicySnapshot)
func parseNumberOfClustersFromAnnotation(s *fleetv1beta1.ClusterSchedulingPolicySnapshot) (int, error) {
n := s.Annotations[fleetv1beta1.NumberOfClustersAnnotation]
v, err := strconv.Atoi(n)
if err != nil {
return -1, err
}
if v < 0 {
return -1, fmt.Errorf("numberOfCluster should not be negative: %d", v)
if err != nil || v < 0 {
return -1, fmt.Errorf("invalid numberOfCluster %q, error: %w", n, err)
}
return v, nil
}
Expand All @@ -231,3 +402,20 @@ func generatePolicyHash(policy *fleetv1beta1.PlacementPolicy) (string, error) {
}
return fmt.Sprintf("%x", sha256.Sum256(jsonBytes)), nil
}

func generateResourceHash(rs *fleetv1beta1.ResourceSnapshotSpec) (string, error) {
jsonBytes, err := json.Marshal(rs)
if err != nil {
return "", err
}
return fmt.Sprintf("%x", sha256.Sum256(jsonBytes)), nil
}

// parseResourceGroupHashFromAnnotation returns error when parsing the annotation which should never return error in production.
func parseResourceGroupHashFromAnnotation(s *fleetv1beta1.ClusterResourceSnapshot) (string, error) {
v, ok := s.Annotations[fleetv1beta1.ResourceGroupHashAnnotation]
if !ok {
return "", fmt.Errorf("ResourceGroupHashAnnotation is not set")
}
return v, nil
}
Loading