Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
const (
// PolicyIndexLabel is the label that indicate the policy snapshot index of a cluster policy.
PolicyIndexLabel = fleetPrefix + "policyIndex"

// PolicySnapshotNameFmt is clusterPolicySnapshot name format: {CRPName}-{PolicySnapshotIndex}.
PolicySnapshotNameFmt = "%s-%d"
)

// +genclient
Expand All @@ -26,7 +29,8 @@ const (

// ClusterPolicySnapshot is used to store a snapshot of cluster placement policy.
// Its spec is immutable.
// The name convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex}
// The naming convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex}.
// PolicySnapshotIndex will begin with 0.
// Each snapshot must have the following labels:
// - `CRPTrackingLabel` which points to its owner CRP.
// - `PolicyIndexLabel` which is the index of the policy snapshot.
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
InformerManager: dynamicInformerManager,
DisabledResourceConfig: disabledResourceConfig,
SkippedNamespaces: skippedNamespaces,
Scheme: mgr.GetScheme(),
}

ratelimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts)
Expand Down
154 changes: 154 additions & 0 deletions pkg/controllers/clusterresourceplacement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,25 @@ package clusterresourceplacement

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

fleetv1 "go.goms.io/fleet/apis/v1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
Expand Down Expand Up @@ -57,6 +62,8 @@ type Reconciler struct {
SkippedNamespaces map[string]bool

Recorder record.EventRecorder

Scheme *runtime.Scheme
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
Expand Down Expand Up @@ -333,3 +340,150 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl
}
}
}

// handleUpdate handles the create/update clusterResourcePlacement event.
// It creates corresponding clusterPolicySnapshot and clusterResourceSnapshot if needed and updates the status based on
// clusterPolicySnapshot status and work status.
// If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling.
func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResourcePlacement) (ctrl.Result, error) {
crpKObj := klog.KObj(crp)
policyHash, err := generatePolicyHash(crp.Spec.Policy)
if err != nil {
klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj)
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
}

latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestClusterPolicySnapshot(ctx, crp)
if err != nil {
return ctrl.Result{}, err
}

// mark the last policy snapshot as inactive if it is different from what we have now
if latestPolicySnapshot != nil &&
string(latestPolicySnapshot.Spec.PolicyHash) != policyHash &&
latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] == strconv.FormatBool(true) {
// set the latest label to false first to make sure there is only one or none active policy snapshot
latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(false)
if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
return ctrl.Result{}, controller.NewAPIServerError(err)
}
}
if latestPolicySnapshot == nil || string(latestPolicySnapshot.Spec.PolicyHash) != policyHash {
// create a new policy snapshot
latestPolicySnapshotIndex++
latestPolicySnapshot = &fleetv1.ClusterPolicySnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, crp.Name, latestPolicySnapshotIndex),
Labels: map[string]string{
fleetv1.CRPTrackingLabel: crp.Name,
fleetv1.IsLatestSnapshotLabel: strconv.FormatBool(true),
fleetv1.PolicyIndexLabel: strconv.Itoa(latestPolicySnapshotIndex),
},
},
Spec: fleetv1.PolicySnapShotSpec{
Policy: crp.Spec.Policy,
PolicyHash: []byte(policyHash),
},
}
if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil {
klog.ErrorS(err, "Failed to create set owner reference", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
// should never happen
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
}
if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
return ctrl.Result{}, controller.NewAPIServerError(err)
}
} else if latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] != strconv.FormatBool(true) {
// When latestPolicySnapshot.Spec.PolicyHash == policyHash,
// It could happen when the controller just sets the latest label to false for the old snapshot, and fails to
// create a new policy snapshot.
// And then the customers revert back their policy to the old one again.
// In this case, the "latest" snapshot without isLatest label has the same policy hash as the current policy.

latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(true)
if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to true", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
return ctrl.Result{}, controller.NewAPIServerError(err)
}
}
// create clusterResourceSnapshot
// update the status based on the latestPolicySnapshot status
// update the status based on the work
return ctrl.Result{}, nil
}

// lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index.
// There will be only one active policy snapshot if exists.
// It first checks whether there is an active policy snapshot.
// If not, it finds the one whose policyIndex label is the largest.
// The policy index will always start from 0.
// Return error when 1) cannot list the snapshots 2) there are more than one active policy snapshots 3) snapshot has the
// invalid label value.
// 2 & 3 should never happen.
func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp *fleetv1.ClusterResourcePlacement) (*fleetv1.ClusterPolicySnapshot, int, error) {
snapshotList := &fleetv1.ClusterPolicySnapShotList{}
latestSnapshotLabelMatcher := client.MatchingLabels{
fleetv1.CRPTrackingLabel: crp.Name,
fleetv1.IsLatestSnapshotLabel: strconv.FormatBool(true),
}
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil {
klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(err)
}
if len(snapshotList.Items) == 1 {
policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0])
if err != nil {
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
return &snapshotList.Items[0], policyIndex, nil
} else if len(snapshotList.Items) > 1 {
// It means there are multiple active snapshots and should never happen.
err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name)
klog.ErrorS(err, "It should never happen", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
// When there are no active snapshots, find the one who has the largest policy index.
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(err)
}
if len(snapshotList.Items) == 0 {
// The policy index of the first snapshot will start from 0.
return nil, -1, nil
}
index := -1 // the index of the cluster policy snapshot array
lastPolicyIndex := -1 // the assigned policy index of the cluster policy snapshot
for i := range snapshotList.Items {
policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i])
if err != nil {
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
if lastPolicyIndex < policyIndex {
index = i
lastPolicyIndex = policyIndex
}
}
return &snapshotList.Items[index], lastPolicyIndex, nil
}

func parsePolicyIndexFromLabel(s *fleetv1.ClusterPolicySnapshot) (int, error) {
indexLabel := s.Labels[fleetv1.PolicyIndexLabel]
v, err := strconv.Atoi(indexLabel)
if err != nil {
klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel)
// should never happen
return -1, err
}
return v, nil
}

func generatePolicyHash(policy *fleetv1.PlacementPolicy) (string, error) {
jsonBytes, err := json.Marshal(policy)
if err != nil {
return "", err
}
return fmt.Sprintf("%x", sha256.Sum256(jsonBytes)), nil
}
Loading