From 7dca03e4ed417ed0dc3cc58fe016cae2739bea8c Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Fri, 2 Jun 2023 10:20:04 +0800 Subject: [PATCH 1/8] create policySnapshot --- ...pshot_types.go => policysnapshot_types.go} | 5 +- ...hot_types.go => resourcesnapshot_types.go} | 0 .../placement_controller.go | 118 +++++ .../placement_controller_test.go | 453 ++++++++++++++++++ 4 files changed, 575 insertions(+), 1 deletion(-) rename apis/v1/{policySnapshot_types.go => policysnapshot_types.go} (97%) rename apis/v1/{resourceSnapshot_types.go => resourcesnapshot_types.go} (100%) create mode 100644 pkg/controllers/clusterresourceplacement/placement_controller_test.go diff --git a/apis/v1/policySnapshot_types.go b/apis/v1/policysnapshot_types.go similarity index 97% rename from apis/v1/policySnapshot_types.go rename to apis/v1/policysnapshot_types.go index 4c4e636a2..0cc13ef61 100644 --- a/apis/v1/policySnapshot_types.go +++ b/apis/v1/policysnapshot_types.go @@ -13,6 +13,9 @@ import ( const ( // PolicyIndexLabel is the label that indicate the policy snapshot index of a cluster policy. PolicyIndexLabel = fleetPrefix + "policyIndex" + + // PolicySnapshotNameFmt is clusterPolicySnapshot name format: {CRPName}-{PolicySnapshotIndex}. + PolicySnapshotNameFmt = "%s-%d" ) // +genclient @@ -26,7 +29,7 @@ const ( // ClusterPolicySnapshot is used to store a snapshot of cluster placement policy. // Its spec is immutable. -// The name convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex} +// The name convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex}. // Each snapshot must have the following labels: // - `CRPTrackingLabel` which points to its owner CRP. // - `PolicyIndexLabel` which is the index of the policy snapshot. diff --git a/apis/v1/resourceSnapshot_types.go b/apis/v1/resourcesnapshot_types.go similarity index 100% rename from apis/v1/resourceSnapshot_types.go rename to apis/v1/resourcesnapshot_types.go diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index 5026cef92..bcab15ed0 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -7,8 +7,12 @@ package clusterresourceplacement import ( "context" + "crypto/sha256" "errors" "fmt" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/utils/pointer" + "strconv" "time" corev1 "k8s.io/api/core/v1" @@ -21,6 +25,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + fleetv1 "go.goms.io/fleet/apis/v1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/metrics" "go.goms.io/fleet/pkg/utils" @@ -333,3 +338,116 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl } } } + +// handleUpdate handles the create/update clusterResourcePlacement event. +// It creates corresponding clusterPolicySnapshot and clusterResourceSnapshot if needed and updates the status based on +// clusterPolicySnapshot status and work status. +func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResourcePlacement) (ctrl.Result, error) { + crpKObj := klog.KObj(crp) + policyHash, err := generatePolicyHash(crp.Spec.Policy) + if err != nil { + klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj) + return ctrl.Result{}, err + } + + clusterPolicySnapshotList := &fleetv1.ClusterPolicySnapShotList{} + if err := r.Client.List(ctx, clusterPolicySnapshotList, client.MatchingLabels{fleetv1.CRPTrackingLabel: crp.Name}); err != nil { + klog.ErrorS(err, "Failed to list clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) + return ctrl.Result{}, err + } + latestPolicySnapshot, latestPolicySnapshotIndex, err := lookupLatestClusterPolicySnapshot(clusterPolicySnapshotList) + if err != nil { + return ctrl.Result{}, err + } + if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) != policyHash { + // set the latest label to false first to make sure there is only one or none active policy snapshot + latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(false) + if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { + klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + return ctrl.Result{}, err + } + } + if latestPolicySnapshot == nil || string(latestPolicySnapshot.Spec.PolicyHash) != policyHash { + // create a new policy snapshot + latestPolicySnapshotIndex += 1 + latestPolicySnapshot = &fleetv1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, crp.Name, latestPolicySnapshotIndex), + Labels: map[string]string{ + fleetv1.CRPTrackingLabel: crp.Name, + fleetv1.IsLatestSnapshotLabel: strconv.FormatBool(true), + fleetv1.PolicyIndexLabel: strconv.Itoa(latestPolicySnapshotIndex), + }, + OwnerReferences: []metav1.OwnerReference{ownerReference(crp)}, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: crp.Spec.Policy, + PolicyHash: []byte(policyHash), + }, + } + if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil { + klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + return ctrl.Result{}, err + } + } + // create clusterResourceSnapshot + // update the status based on the latestPolicySnapshot status + // update the status based on the work + return ctrl.Result{}, nil +} + +func ownerReference(crp *fleetv1.ClusterResourcePlacement) metav1.OwnerReference { + return metav1.OwnerReference{ + APIVersion: crp.GroupVersionKind().GroupVersion().String(), + Kind: crp.GroupVersionKind().Kind, + Name: crp.GetName(), + UID: crp.GetUID(), + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + } +} + +// lookupLatestClusterPolicySnapshot finds the index of the latest cluster policy snapshot in the array and its policyIndex. +// There will be only one active policy snapshot. +// If there are no active policy snapshots, find the one whose policyIndex label is the largest. +// Return error when the policyIndex label is invalid. +func lookupLatestClusterPolicySnapshot(list *fleetv1.ClusterPolicySnapShotList) (*fleetv1.ClusterPolicySnapshot, int, error) { + if len(list.Items) == 0 { + return nil, -1, nil + } + index := -1 // the index of the cluster policy snapshot array + lastPolicyIndex := -1 // the assigned policy index of the cluster policy snapshot + for i, snapshot := range list.Items { + policyIndex, err := parsePolicyIndexFromLabel(&snapshot) + if err != nil { + return nil, -1, err + } + if v := snapshot.Labels[fleetv1.IsLatestSnapshotLabel]; v == strconv.FormatBool(true) { + return &list.Items[i], policyIndex, nil + } + if lastPolicyIndex < policyIndex { + index = i + lastPolicyIndex = policyIndex + } + } + return &list.Items[index], lastPolicyIndex, nil +} + +func parsePolicyIndexFromLabel(s *fleetv1.ClusterPolicySnapshot) (int, error) { + indexLabel := s.Labels[fleetv1.PolicyIndexLabel] + v, err := strconv.Atoi(indexLabel) + if err != nil { + klog.ErrorS(errors.New("invalid clusterPolicySnapshot policyIndex label"), "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel) + // should never happen + return -1, err + } + return v, nil +} + +func generatePolicyHash(policy *fleetv1.PlacementPolicy) (string, error) { + jsonBytes, err := json.Marshal(policy) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", sha256.Sum256(jsonBytes)), nil +} diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go new file mode 100644 index 000000000..0d9355fdb --- /dev/null +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -0,0 +1,453 @@ +package clusterresourceplacement + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + fleetv1 "go.goms.io/fleet/apis/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "testing" +) + +const ( + testName = "my-crp" +) + +func serviceScheme(t *testing.T) *runtime.Scheme { + scheme := runtime.NewScheme() + if err := fleetv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add scheme: %v", err) + } + return scheme +} + +func placementPolicyForTest() *fleetv1.PlacementPolicy { + return &fleetv1.PlacementPolicy{ + PlacementType: fleetv1.PickNPlacementType, + NumberOfClusters: pointer.Int32(3), + Affinity: &fleetv1.Affinity{ + ClusterAffinity: &fleetv1.ClusterAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &fleetv1.ClusterSelector{ + ClusterSelectorTerms: []fleetv1.ClusterSelectorTerm{ + { + LabelSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key1": "value1", + }, + }, + }, + }, + }, + }, + }, + } +} + +func clusterResourcePlacementForTest() *fleetv1.ClusterResourcePlacement { + return &fleetv1.ClusterResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: testName, + }, + Spec: fleetv1.ClusterResourcePlacementSpec{ + Policy: placementPolicyForTest(), + }, + } +} + +func TestHandleUpdate(t *testing.T) { + jsonBytes, err := json.Marshal(placementPolicyForTest()) + if err != nil { + t.Fatalf("failed to create the policy hash: %v", err) + } + policyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) + jsonBytes, err = json.Marshal(nil) + if err != nil { + t.Fatalf("failed to create the policy hash: %v", err) + } + unspecifiedPolicyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) + tests := []struct { + name string + policySnapshots []fleetv1.ClusterPolicySnapshot + wantPolicySnapshots []fleetv1.ClusterPolicySnapshot + }{ + { + name: "new clusterResourcePolicy and no existing policy snapshots owned by my-crp", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "another-crp-1", + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: "another-crp", + }, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "another-crp-1", + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: "another-crp", + }, + }, + }, + // new policy snapshot owned by the my-crp + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + { + name: "crp policy has no change", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + { + name: "crp policy has changed and there is no active snapshot", + // It happens when last reconcile loop fails after setting the latest label to false and + // before creating a new policy snapshot. + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + { + name: "crp policy has changed and there is an active snapshot", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + crp := clusterResourcePlacementForTest() + objects := []client.Object{crp} + for i := range tc.policySnapshots { + objects = append(objects, &tc.policySnapshots[i]) + } + fakeClient := fake.NewClientBuilder(). + WithScheme(serviceScheme(t)). + WithObjects(objects...). + Build() + r := Reconciler{Client: fakeClient} + got, err := r.handleUpdate(ctx, crp) + if err != nil { + t.Fatalf("failed to handle update: %v", err) + } + want := ctrl.Result{} + if !cmp.Equal(got, want) { + t.Errorf("handleUpdate() = %+v, want %+v", got, want) + } + clusterPolicySnapshotList := &fleetv1.ClusterPolicySnapShotList{} + if err := fakeClient.List(ctx, clusterPolicySnapshotList); err != nil { + t.Fatalf("clusterPolicySnapShot List() got error %v, want no error", err) + } + options := []cmp.Option{ + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + } + if diff := cmp.Diff(tc.wantPolicySnapshots, clusterPolicySnapshotList.Items, options...); diff != "" { + t.Errorf("clusterPolicySnapShot List() mismatch (-want, +got):\n%s", diff) + } + }) + } +} + +func TestHandleUpdate_failure(t *testing.T) { + tests := []struct { + name string + policySnapshots []fleetv1.ClusterPolicySnapshot + }{ + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "existing active policy snapshot does not policyIndex label", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "existing active policy snapshot has an invalid policyIndex label", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0bc", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "no active policy snapshot exists and policySnapshot with invalid policyIndex label", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "abc", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + crp := clusterResourcePlacementForTest() + objects := []client.Object{crp} + for i := range tc.policySnapshots { + objects = append(objects, &tc.policySnapshots[i]) + } + fakeClient := fake.NewClientBuilder(). + WithScheme(serviceScheme(t)). + WithObjects(objects...). + Build() + r := Reconciler{Client: fakeClient} + got, err := r.handleUpdate(ctx, crp) + if err == nil { // if error is nil + t.Fatal("handleUpdate = nil, want err") + } + want := ctrl.Result{} + if !cmp.Equal(got, want) { + t.Errorf("handleUpdate() = %+v, want %+v", got, want) + } + }) + } +} From 3fc2cf93f3ecddfbf90a497d75e11b19394e66e4 Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Fri, 2 Jun 2023 16:47:57 +0800 Subject: [PATCH 2/8] fix lint error --- .../placement_controller.go | 9 ++-- .../placement_controller_test.go | 52 +++++++++++++++++-- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index bcab15ed0..d28005cca 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -10,11 +10,12 @@ import ( "crypto/sha256" "errors" "fmt" - "k8s.io/apimachinery/pkg/util/json" - "k8s.io/utils/pointer" "strconv" "time" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/utils/pointer" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -369,7 +370,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou } if latestPolicySnapshot == nil || string(latestPolicySnapshot.Spec.PolicyHash) != policyHash { // create a new policy snapshot - latestPolicySnapshotIndex += 1 + latestPolicySnapshotIndex++ latestPolicySnapshot = &fleetv1.ClusterPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, crp.Name, latestPolicySnapshotIndex), @@ -418,7 +419,7 @@ func lookupLatestClusterPolicySnapshot(list *fleetv1.ClusterPolicySnapShotList) index := -1 // the index of the cluster policy snapshot array lastPolicyIndex := -1 // the assigned policy index of the cluster policy snapshot for i, snapshot := range list.Items { - policyIndex, err := parsePolicyIndexFromLabel(&snapshot) + policyIndex, err := parsePolicyIndexFromLabel(&list.Items[i]) if err != nil { return nil, -1, err } diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go index 0d9355fdb..179f1da1b 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -5,16 +5,18 @@ import ( "crypto/sha256" "encoding/json" "fmt" + "testing" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - fleetv1 "go.goms.io/fleet/apis/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "testing" + + fleetv1 "go.goms.io/fleet/apis/v1" ) const ( @@ -202,6 +204,27 @@ func TestHandleUpdate(t *testing.T) { PolicyHash: unspecifiedPolicyHash, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 3), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "3", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, }, wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ { @@ -227,9 +250,30 @@ func TestHandleUpdate(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 3), Labels: map[string]string{ - fleetv1.PolicyIndexLabel: "1", + fleetv1.PolicyIndexLabel: "3", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 4), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "4", fleetv1.IsLatestSnapshotLabel: "true", fleetv1.CRPTrackingLabel: testName, }, From 39add7b137c98f7a773da3b7269dc61e45a32f73 Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Mon, 5 Jun 2023 15:37:55 +0800 Subject: [PATCH 3/8] address comments --- apis/v1/policysnapshot_types.go | 2 +- .../placement_controller.go | 76 +++++++--- .../placement_controller_test.go | 132 +++++++++++++++++- 3 files changed, 189 insertions(+), 21 deletions(-) diff --git a/apis/v1/policysnapshot_types.go b/apis/v1/policysnapshot_types.go index 0cc13ef61..cc882479e 100644 --- a/apis/v1/policysnapshot_types.go +++ b/apis/v1/policysnapshot_types.go @@ -29,7 +29,7 @@ const ( // ClusterPolicySnapshot is used to store a snapshot of cluster placement policy. // Its spec is immutable. -// The name convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex}. +// The naming convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex}. // Each snapshot must have the following labels: // - `CRPTrackingLabel` which points to its owner CRP. // - `PolicyIndexLabel` which is the index of the policy snapshot. diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index d28005cca..019a36075 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -351,16 +351,15 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou return ctrl.Result{}, err } - clusterPolicySnapshotList := &fleetv1.ClusterPolicySnapShotList{} - if err := r.Client.List(ctx, clusterPolicySnapshotList, client.MatchingLabels{fleetv1.CRPTrackingLabel: crp.Name}); err != nil { - klog.ErrorS(err, "Failed to list clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return ctrl.Result{}, err - } - latestPolicySnapshot, latestPolicySnapshotIndex, err := lookupLatestClusterPolicySnapshot(clusterPolicySnapshotList) + latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestClusterPolicySnapshot(ctx, crp) if err != nil { return ctrl.Result{}, err } - if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) != policyHash { + + if latestPolicySnapshot != nil && + string(latestPolicySnapshot.Spec.PolicyHash) != policyHash && + latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] == strconv.FormatBool(true) { + // set the latest label to false first to make sure there is only one or none active policy snapshot latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(false) if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { @@ -390,6 +389,18 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) return ctrl.Result{}, err } + } else if latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] != strconv.FormatBool(true) { + // When latestPolicySnapshot.Spec.PolicyHash == policyHash, + // It could happen when the controller just sets the latest label to false for the old snapshot, and fails to + // create a new policy snapshot. + // And then the customers revert back their policy to the old one again. + // In this case, the "latest" snapshot without isLatest label has the same policy hash as the current policy. + + latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(true) + if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { + klog.ErrorS(err, "Failed to set the isLatestSnapshot label to true", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + return ctrl.Result{}, err + } } // create clusterResourceSnapshot // update the status based on the latestPolicySnapshot status @@ -408,30 +419,57 @@ func ownerReference(crp *fleetv1.ClusterResourcePlacement) metav1.OwnerReference } } -// lookupLatestClusterPolicySnapshot finds the index of the latest cluster policy snapshot in the array and its policyIndex. -// There will be only one active policy snapshot. -// If there are no active policy snapshots, find the one whose policyIndex label is the largest. -// Return error when the policyIndex label is invalid. -func lookupLatestClusterPolicySnapshot(list *fleetv1.ClusterPolicySnapShotList) (*fleetv1.ClusterPolicySnapshot, int, error) { - if len(list.Items) == 0 { +// lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index. +// There will be only one active policy snapshot if exists. +// It first checks whether there is an active policy snapshot. +// If not, it finds the one whose policyIndex label is the largest. +// Return error when 1) cannot list the snapshots 2) there are more than one active policy snapshots 3) snapshot has the +// invalid label value. +// 2 & 3 should never happen. +func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp *fleetv1.ClusterResourcePlacement) (*fleetv1.ClusterPolicySnapshot, int, error) { + snapshotList := &fleetv1.ClusterPolicySnapShotList{} + latestSnapshotLabelMatcher := client.MatchingLabels{ + fleetv1.CRPTrackingLabel: crp.Name, + fleetv1.IsLatestSnapshotLabel: strconv.FormatBool(true), + } + crpKObj := klog.KObj(crp) + if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil { + klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) + return nil, -1, err + } + if len(snapshotList.Items) == 1 { + policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) + if err != nil { + return nil, -1, err + } + return &snapshotList.Items[0], policyIndex, nil + } else if len(snapshotList.Items) > 1 { + // It means there are multiple active snapshots and should never happen. + err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) + klog.ErrorS(err, "It should never happen", "clusterResourcePlacement", crpKObj) + return nil, -1, err + } + // When there are no active snapshots, find the one who has the largest policy index. + if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1.CRPTrackingLabel: crp.Name}); err != nil { + klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) + return nil, -1, err + } + if len(snapshotList.Items) == 0 { return nil, -1, nil } index := -1 // the index of the cluster policy snapshot array lastPolicyIndex := -1 // the assigned policy index of the cluster policy snapshot - for i, snapshot := range list.Items { - policyIndex, err := parsePolicyIndexFromLabel(&list.Items[i]) + for i := range snapshotList.Items { + policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) if err != nil { return nil, -1, err } - if v := snapshot.Labels[fleetv1.IsLatestSnapshotLabel]; v == strconv.FormatBool(true) { - return &list.Items[i], policyIndex, nil - } if lastPolicyIndex < policyIndex { index = i lastPolicyIndex = policyIndex } } - return &list.Items[index], lastPolicyIndex, nil + return &snapshotList.Items[index], lastPolicyIndex, nil } func parsePolicyIndexFromLabel(s *fleetv1.ClusterPolicySnapshot) (int, error) { diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go index 179f1da1b..2db4d9c53 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -362,6 +362,96 @@ func TestHandleUpdate(t *testing.T) { }, }, }, + { + name: "crp policy has been changed and reverted back and there is no active snapshot", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + wantPolicySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.IsLatestSnapshotLabel: "false", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + // Policy is not specified. + PolicyHash: unspecifiedPolicyHash, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.IsLatestSnapshotLabel: "true", + fleetv1.CRPTrackingLabel: testName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + Spec: fleetv1.PolicySnapShotSpec{ + Policy: placementPolicyForTest(), + PolicyHash: policyHash, + }, + }, + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -405,7 +495,7 @@ func TestHandleUpdate_failure(t *testing.T) { }{ { // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. - name: "existing active policy snapshot does not policyIndex label", + name: "existing active policy snapshot does not have policyIndex label", policySnapshots: []fleetv1.ClusterPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ @@ -470,6 +560,46 @@ func TestHandleUpdate_failure(t *testing.T) { }, }, }, + { + // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + name: "multiple active policy snapshot exist", + policySnapshots: []fleetv1.ClusterPolicySnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 0), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "0", + fleetv1.CRPTrackingLabel: testName, + fleetv1.IsLatestSnapshotLabel: "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(fleetv1.PolicySnapshotNameFmt, testName, 1), + Labels: map[string]string{ + fleetv1.PolicyIndexLabel: "1", + fleetv1.CRPTrackingLabel: testName, + fleetv1.IsLatestSnapshotLabel: "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: testName, + BlockOwnerDeletion: pointer.Bool(true), + Controller: pointer.Bool(true), + }, + }, + }, + }, + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { From c94187d2dff3df83afa9c564709be57a17fa302d Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Mon, 5 Jun 2023 16:02:03 +0800 Subject: [PATCH 4/8] fix lint --- pkg/controllers/clusterresourceplacement/placement_controller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index 019a36075..83e58beae 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -359,7 +359,6 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) != policyHash && latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] == strconv.FormatBool(true) { - // set the latest label to false first to make sure there is only one or none active policy snapshot latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(false) if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { From 899672837ab9200daffe00c5b32541d8a4483411 Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Tue, 6 Jun 2023 12:01:04 +0800 Subject: [PATCH 5/8] address comment --- cmd/hubagent/workload/setup.go | 1 + .../placement_controller.go | 32 +++++++------- .../placement_controller_test.go | 43 ++++++++++++++++--- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go index 759b96c1c..fe449663f 100644 --- a/cmd/hubagent/workload/setup.go +++ b/cmd/hubagent/workload/setup.go @@ -88,6 +88,7 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config InformerManager: dynamicInformerManager, DisabledResourceConfig: disabledResourceConfig, SkippedNamespaces: skippedNamespaces, + Scheme: mgr.GetScheme(), } ratelimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts) diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index 83e58beae..59ae16c80 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -13,18 +13,17 @@ import ( "strconv" "time" - "k8s.io/apimachinery/pkg/util/json" - "k8s.io/utils/pointer" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" fleetv1 "go.goms.io/fleet/apis/v1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" @@ -63,6 +62,8 @@ type Reconciler struct { SkippedNamespaces map[string]bool Recorder record.EventRecorder + + Scheme *runtime.Scheme } func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) { @@ -356,6 +357,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou return ctrl.Result{}, err } + // mark the last policy snapshot as inactive if it is different from what we have now if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) != policyHash && latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] == strconv.FormatBool(true) { @@ -377,13 +379,18 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou fleetv1.IsLatestSnapshotLabel: strconv.FormatBool(true), fleetv1.PolicyIndexLabel: strconv.Itoa(latestPolicySnapshotIndex), }, - OwnerReferences: []metav1.OwnerReference{ownerReference(crp)}, }, Spec: fleetv1.PolicySnapShotSpec{ Policy: crp.Spec.Policy, PolicyHash: []byte(policyHash), }, } + if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil { + klog.ErrorS(err, "Failed to create set owner reference", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + // should never happen + // TODO(zhiying) emit error metrics or well defined logs + return ctrl.Result{}, err + } if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil { klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) return ctrl.Result{}, err @@ -407,21 +414,11 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou return ctrl.Result{}, nil } -func ownerReference(crp *fleetv1.ClusterResourcePlacement) metav1.OwnerReference { - return metav1.OwnerReference{ - APIVersion: crp.GroupVersionKind().GroupVersion().String(), - Kind: crp.GroupVersionKind().Kind, - Name: crp.GetName(), - UID: crp.GetUID(), - BlockOwnerDeletion: pointer.Bool(true), - Controller: pointer.Bool(true), - } -} - // lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index. // There will be only one active policy snapshot if exists. // It first checks whether there is an active policy snapshot. // If not, it finds the one whose policyIndex label is the largest. +// The policy index will always start from 0. // Return error when 1) cannot list the snapshots 2) there are more than one active policy snapshots 3) snapshot has the // invalid label value. // 2 & 3 should never happen. @@ -446,6 +443,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp // It means there are multiple active snapshots and should never happen. err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) klog.ErrorS(err, "It should never happen", "clusterResourcePlacement", crpKObj) + // TODO(zhiying) emit error metrics or well defined logs return nil, -1, err } // When there are no active snapshots, find the one who has the largest policy index. @@ -454,6 +452,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp return nil, -1, err } if len(snapshotList.Items) == 0 { + // The policy index of the first snapshot will start from 0. return nil, -1, nil } index := -1 // the index of the cluster policy snapshot array @@ -475,8 +474,9 @@ func parsePolicyIndexFromLabel(s *fleetv1.ClusterPolicySnapshot) (int, error) { indexLabel := s.Labels[fleetv1.PolicyIndexLabel] v, err := strconv.Atoi(indexLabel) if err != nil { - klog.ErrorS(errors.New("invalid clusterPolicySnapshot policyIndex label"), "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel) + klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel) // should never happen + // TODO(zhiying) emit error metrics or well defined logs return -1, err } return v, nil diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go index 2db4d9c53..0ca52921b 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -20,7 +20,8 @@ import ( ) const ( - testName = "my-crp" + testName = "my-crp" + fleetAPIVersion = "fleet.azure.com/v1" ) func serviceScheme(t *testing.T) *runtime.Scheme { @@ -119,6 +120,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -145,6 +148,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -168,6 +173,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -196,6 +203,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -217,6 +226,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -240,6 +251,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -261,6 +274,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -282,6 +297,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -308,6 +325,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -331,6 +350,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -352,6 +373,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -378,6 +401,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -398,6 +423,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -421,6 +448,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -442,6 +471,8 @@ func TestHandleUpdate(t *testing.T) { Name: testName, BlockOwnerDeletion: pointer.Bool(true), Controller: pointer.Bool(true), + APIVersion: fleetAPIVersion, + Kind: "ClusterResourcePlacement", }, }, }, @@ -461,11 +492,12 @@ func TestHandleUpdate(t *testing.T) { for i := range tc.policySnapshots { objects = append(objects, &tc.policySnapshots[i]) } + scheme := serviceScheme(t) fakeClient := fake.NewClientBuilder(). - WithScheme(serviceScheme(t)). + WithScheme(scheme). WithObjects(objects...). Build() - r := Reconciler{Client: fakeClient} + r := Reconciler{Client: fakeClient, Scheme: scheme} got, err := r.handleUpdate(ctx, crp) if err != nil { t.Fatalf("failed to handle update: %v", err) @@ -609,11 +641,12 @@ func TestHandleUpdate_failure(t *testing.T) { for i := range tc.policySnapshots { objects = append(objects, &tc.policySnapshots[i]) } + scheme := serviceScheme(t) fakeClient := fake.NewClientBuilder(). - WithScheme(serviceScheme(t)). + WithScheme(scheme). WithObjects(objects...). Build() - r := Reconciler{Client: fakeClient} + r := Reconciler{Client: fakeClient, Scheme: scheme} got, err := r.handleUpdate(ctx, crp) if err == nil { // if error is nil t.Fatal("handleUpdate = nil, want err") From 16cdde0e86cd8659556574c679fde086b4006767 Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Tue, 6 Jun 2023 14:59:16 +0800 Subject: [PATCH 6/8] add unexpected behavior error type --- apis/v1/policysnapshot_types.go | 1 + .../placement_controller.go | 7 ++++--- .../placement_controller_test.go | 5 +++++ pkg/utils/controller/controller.go | 15 +++++++++++++++ 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/apis/v1/policysnapshot_types.go b/apis/v1/policysnapshot_types.go index cc882479e..05182559b 100644 --- a/apis/v1/policysnapshot_types.go +++ b/apis/v1/policysnapshot_types.go @@ -30,6 +30,7 @@ const ( // ClusterPolicySnapshot is used to store a snapshot of cluster placement policy. // Its spec is immutable. // The naming convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex}. +// PolicySnapshotIndex will begin with 0. // Each snapshot must have the following labels: // - `CRPTrackingLabel` which points to its owner CRP. // - `PolicyIndexLabel` which is the index of the policy snapshot. diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index 59ae16c80..4c00556d8 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -344,6 +344,7 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl // handleUpdate handles the create/update clusterResourcePlacement event. // It creates corresponding clusterPolicySnapshot and clusterResourceSnapshot if needed and updates the status based on // clusterPolicySnapshot status and work status. +// If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling. func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResourcePlacement) (ctrl.Result, error) { crpKObj := klog.KObj(crp) policyHash, err := generatePolicyHash(crp.Spec.Policy) @@ -389,7 +390,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou klog.ErrorS(err, "Failed to create set owner reference", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) // should never happen // TODO(zhiying) emit error metrics or well defined logs - return ctrl.Result{}, err + return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil { klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) @@ -444,7 +445,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) klog.ErrorS(err, "It should never happen", "clusterResourcePlacement", crpKObj) // TODO(zhiying) emit error metrics or well defined logs - return nil, -1, err + return nil, -1, controller.NewUnexpectedBehaviorError(err) } // When there are no active snapshots, find the one who has the largest policy index. if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1.CRPTrackingLabel: crp.Name}); err != nil { @@ -477,7 +478,7 @@ func parsePolicyIndexFromLabel(s *fleetv1.ClusterPolicySnapshot) (int, error) { klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel) // should never happen // TODO(zhiying) emit error metrics or well defined logs - return -1, err + return -1, controller.NewUnexpectedBehaviorError(err) } return v, nil } diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go index 0ca52921b..0a437e352 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -4,7 +4,9 @@ import ( "context" "crypto/sha256" "encoding/json" + "errors" "fmt" + "go.goms.io/fleet/pkg/utils/controller" "testing" "github.com/google/go-cmp/cmp" @@ -651,6 +653,9 @@ func TestHandleUpdate_failure(t *testing.T) { if err == nil { // if error is nil t.Fatal("handleUpdate = nil, want err") } + if !errors.Is(err, controller.ErrUnexpectedBehavior) { + t.Errorf("handleUpdate() got %v, want %v type", err, controller.ErrUnexpectedBehavior) + } want := ctrl.Result{} if !cmp.Equal(got, want) { t.Errorf("handleUpdate() = %+v, want %+v", got, want) diff --git a/pkg/utils/controller/controller.go b/pkg/utils/controller/controller.go index 49cef22b5..4be5c53fb 100644 --- a/pkg/utils/controller/controller.go +++ b/pkg/utils/controller/controller.go @@ -7,6 +7,7 @@ package controller import ( "context" + "errors" "fmt" "sync" "time" @@ -27,6 +28,20 @@ const ( labelSuccess = "success" ) +var ( + // ErrUnexpectedBehavior indicates the current situation is not expected. + // There should be something wrong with the system and cannot be recovered by itself. + ErrUnexpectedBehavior = errors.New("unexpected behavior which cannot be handled by the controller") +) + +// NewUnexpectedBehaviorError returns ErrUnexpectedBehavior type error. +func NewUnexpectedBehaviorError(err error) error { + if err != nil { + return fmt.Errorf("%w: %v", ErrUnexpectedBehavior, err) + } + return ErrUnexpectedBehavior +} + // Controller maintains a rate limiting queue and the items in the queue will be reconciled by a "ReconcileFunc". // The item will be re-queued if "ReconcileFunc" returns an error, maximum re-queue times defined by "maxRetries" above, // after that the item will be discarded from the queue. From 2fbc19f5db01b9b4a29c7d92d9d6016079ce0290 Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Tue, 6 Jun 2023 15:12:47 +0800 Subject: [PATCH 7/8] fix lint --- .../clusterresourceplacement/placement_controller_test.go | 2 +- pkg/utils/controller/controller.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go index 0a437e352..a1dc6cd28 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "go.goms.io/fleet/pkg/utils/controller" "testing" "github.com/google/go-cmp/cmp" @@ -19,6 +18,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" fleetv1 "go.goms.io/fleet/apis/v1" + "go.goms.io/fleet/pkg/utils/controller" ) const ( diff --git a/pkg/utils/controller/controller.go b/pkg/utils/controller/controller.go index 4be5c53fb..e4d30e793 100644 --- a/pkg/utils/controller/controller.go +++ b/pkg/utils/controller/controller.go @@ -37,7 +37,7 @@ var ( // NewUnexpectedBehaviorError returns ErrUnexpectedBehavior type error. func NewUnexpectedBehaviorError(err error) error { if err != nil { - return fmt.Errorf("%w: %v", ErrUnexpectedBehavior, err) + return fmt.Errorf("%w: %v", ErrUnexpectedBehavior, err.Error()) } return ErrUnexpectedBehavior } From 6c02b7aacab7010863422235a6f5b5e04e4addbd Mon Sep 17 00:00:00 2001 From: Zhiying Lin Date: Wed, 7 Jun 2023 17:08:11 +0800 Subject: [PATCH 8/8] add api server error type --- .../placement_controller.go | 21 ++++++++----------- pkg/utils/controller/controller.go | 12 +++++++++++ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index 4c00556d8..77ff01922 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -350,7 +350,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou policyHash, err := generatePolicyHash(crp.Spec.Policy) if err != nil { klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj) - return ctrl.Result{}, err + return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestClusterPolicySnapshot(ctx, crp) @@ -366,7 +366,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(false) if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) - return ctrl.Result{}, err + return ctrl.Result{}, controller.NewAPIServerError(err) } } if latestPolicySnapshot == nil || string(latestPolicySnapshot.Spec.PolicyHash) != policyHash { @@ -389,12 +389,11 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil { klog.ErrorS(err, "Failed to create set owner reference", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) // should never happen - // TODO(zhiying) emit error metrics or well defined logs return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil { klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) - return ctrl.Result{}, err + return ctrl.Result{}, controller.NewAPIServerError(err) } } else if latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] != strconv.FormatBool(true) { // When latestPolicySnapshot.Spec.PolicyHash == policyHash, @@ -406,7 +405,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1.ClusterResou latestPolicySnapshot.Labels[fleetv1.IsLatestSnapshotLabel] = strconv.FormatBool(true) if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { klog.ErrorS(err, "Failed to set the isLatestSnapshot label to true", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) - return ctrl.Result{}, err + return ctrl.Result{}, controller.NewAPIServerError(err) } } // create clusterResourceSnapshot @@ -432,25 +431,24 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp crpKObj := klog.KObj(crp) if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil { klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, err + return nil, -1, controller.NewAPIServerError(err) } if len(snapshotList.Items) == 1 { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) if err != nil { - return nil, -1, err + return nil, -1, controller.NewUnexpectedBehaviorError(err) } return &snapshotList.Items[0], policyIndex, nil } else if len(snapshotList.Items) > 1 { // It means there are multiple active snapshots and should never happen. err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) klog.ErrorS(err, "It should never happen", "clusterResourcePlacement", crpKObj) - // TODO(zhiying) emit error metrics or well defined logs return nil, -1, controller.NewUnexpectedBehaviorError(err) } // When there are no active snapshots, find the one who has the largest policy index. if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1.CRPTrackingLabel: crp.Name}); err != nil { klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, err + return nil, -1, controller.NewAPIServerError(err) } if len(snapshotList.Items) == 0 { // The policy index of the first snapshot will start from 0. @@ -461,7 +459,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp for i := range snapshotList.Items { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) if err != nil { - return nil, -1, err + return nil, -1, controller.NewUnexpectedBehaviorError(err) } if lastPolicyIndex < policyIndex { index = i @@ -477,8 +475,7 @@ func parsePolicyIndexFromLabel(s *fleetv1.ClusterPolicySnapshot) (int, error) { if err != nil { klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(s), "policyIndexLabel", indexLabel) // should never happen - // TODO(zhiying) emit error metrics or well defined logs - return -1, controller.NewUnexpectedBehaviorError(err) + return -1, err } return v, nil } diff --git a/pkg/utils/controller/controller.go b/pkg/utils/controller/controller.go index e4d30e793..7338e317f 100644 --- a/pkg/utils/controller/controller.go +++ b/pkg/utils/controller/controller.go @@ -32,16 +32,28 @@ var ( // ErrUnexpectedBehavior indicates the current situation is not expected. // There should be something wrong with the system and cannot be recovered by itself. ErrUnexpectedBehavior = errors.New("unexpected behavior which cannot be handled by the controller") + + // ErrAPIServerError indicates the error is returned by the API server. + ErrAPIServerError = errors.New("error returned by the API server") ) // NewUnexpectedBehaviorError returns ErrUnexpectedBehavior type error. func NewUnexpectedBehaviorError(err error) error { + // TODO(zhiying) emit error metrics or well defined logs if err != nil { return fmt.Errorf("%w: %v", ErrUnexpectedBehavior, err.Error()) } return ErrUnexpectedBehavior } +// NewAPIServerError returns ErrAPIServerError type error. +func NewAPIServerError(err error) error { + if err != nil { + return fmt.Errorf("%w: %v", ErrAPIServerError, err.Error()) + } + return ErrAPIServerError +} + // Controller maintains a rate limiting queue and the items in the queue will be reconciled by a "ReconcileFunc". // The item will be re-queued if "ReconcileFunc" returns an error, maximum re-queue times defined by "maxRetries" above, // after that the item will be discarded from the queue.