From 9a1de8898761ed953ef680c0ac441ea2b7c4960b Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Fri, 16 Jun 2023 19:47:18 +0800 Subject: [PATCH 1/4] Added more scheduler logic --- pkg/scheduler/framework/framework.go | 128 +++++++- pkg/scheduler/framework/framework_test.go | 358 ++++++++++++++++++++++ pkg/scheduler/framework/frameworkutils.go | 74 +++++ pkg/utils/common.go | 16 + 4 files changed, 575 insertions(+), 1 deletion(-) create mode 100644 pkg/scheduler/framework/framework_test.go create mode 100644 pkg/scheduler/framework/frameworkutils.go diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index eddc671c5..9257dd773 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -10,13 +10,19 @@ package framework import ( "context" "fmt" + "time" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/scheduler/framework/parallelizer" + "go.goms.io/fleet/pkg/utils" + "go.goms.io/fleet/pkg/utils/controller" ) const ( @@ -145,6 +151,126 @@ func (f *framework) EventRecorder() record.EventRecorder { // RunSchedulingCycleFor performs scheduling for a policy snapshot. func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, resources *fleetv1beta1.ClusterResourceSnapshot) (result ctrl.Result, err error) { //nolint:revive - // Not yet implemented. + startTime := time.Now() + clusterPolicySnapshotRef := klog.KObj(policy) + klog.V(2).InfoS("Scheduling cycle starts", "clusterPolicySnapshot", clusterPolicySnapshotRef) + defer func() { + latency := time.Since(startTime).Milliseconds() + klog.V(2).InfoS("Scheduling cycle ends", "clusterPolicySnapshot", clusterPolicySnapshotRef, "latency", latency) + }() + + errorMessage := "failed to run scheduling cycle" + + klog.V(2).InfoS("Retrieving clusters and bindings", "clusterPolicySnapshot", clusterPolicySnapshotRef) + + // Retrieve the desired number of clusters from the policy. + // + // TO-DO (chenyu1): assign variable(s) when more logic is added. + _, err = extractNumOfClustersFromPolicySnapshot(policy) + if err != nil { + klog.ErrorS(err, errorMessage, "clusterPolicySnapshot", clusterPolicySnapshotRef) + return ctrl.Result{}, err + } + + // Collect all clusters. + // + // Note that clusters here are listed from the cached client for improved performance. This is + // safe in consistency as it is guaranteed that the scheduler will receive all events for cluster + // changes eventually. + // + // TO-DO (chenyu1): assign variable(s) when more logic is added. + _, err = f.collectClusters(ctx) + if err != nil { + klog.ErrorS(err, errorMessage, klog.KObj(policy)) + return ctrl.Result{}, err + } + + // Collect all bindings. + // + // Note that for consistency reasons, bindings are listed directly from the API server; this helps + // avoid a classic read-after-write consistency issue, which, though should only happen when there + // are connectivity issues and/or API server is overloaded, can lead to over-scheduling in adverse + // scenarios. It is true that even when bindings are over-scheduled, the scheduler can still correct + // the situation in the next cycle; however, considering that placing resources to clusters, unlike + // pods to nodes, is more expensive, it is better to avoid over-scheduling in the first place. + // + // This, of course, has additional performance overhead (and may further exacerbate API server + // overloading). In the long run we might still want to resort to a cached situtation. + // + // TO-DO (chenyu1): explore the possbilities of using a mutation cache for better performance. + bindings, err := f.collectBindings(ctx, policy) + if err != nil { + klog.ErrorS(err, errorMessage, klog.KObj(policy)) + return ctrl.Result{}, err + } + + // Parse the bindings, find out + // * active bindings, i.e., bindings that are not marked for deletion; and + // * bindings that are already marked for deletion, but still have the dispatcher finalizer + // present; + // * bindings that are already marked for deletion, and no longer have the dispatcher finalizer + // + // Note that the scheduler only considers a binding to be deleted if it is marked for deletion + // and it no longer has the dispatcher finalizer. This helps avoid a rare racing condition + // where the scheduler binds a cluster to a resource placement, even though the dispatcher has + // not started, or is still in the process of, removing the same set of resources from + // the cluster, triggered by a recently deleted binding. + // + // TO-DO (chenyu1): assign variable(s) when more logic is added. + klog.V(2).InfoS("Classifying bindings", "clusterPolicySnapshot", clusterPolicySnapshotRef) + _, _, deletedWithoutDispatcherFinalizer := classifyBindings(bindings) + + // If a binding has been marked for deletion and no longer has the dispatcher finalizer, the scheduler + // removes its own finalizer from it, to clear it for eventual deletion. + if err := f.removeSchedulerFinalizerFromBindings(ctx, deletedWithoutDispatcherFinalizer); err != nil { + klog.ErrorS(err, errorMessage, klog.KObj(policy)) + return ctrl.Result{}, err + } + + // Not yet fully implemented. return ctrl.Result{}, nil } + +// collectClusters lists all clusters in the cache. +func (f *framework) collectClusters(ctx context.Context) ([]fleetv1beta1.MemberCluster, error) { + errorFormat := "failed to collect clusters: %w" + + clusterList := &fleetv1beta1.MemberClusterList{} + if err := f.client.List(ctx, clusterList, &client.ListOptions{}); err != nil { + return nil, controller.NewAPIServerError(fmt.Errorf(errorFormat, err)) + } + return clusterList.Items, nil +} + +// collectBindings lists all bindings associated with a CRP **using the uncached client**. +func (f *framework) collectBindings(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot) ([]fleetv1beta1.ClusterResourceBinding, error) { + errorFormat := "failed to collect bindings: %w" + + bindingOwner, err := extractOwnerCRPNameFromPolicySnapshot(policy) + if err != nil { + // This branch should never run in most cases, as the a policy snapshot is expected to be + // owned by a CRP. + return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf(errorFormat, err)) + } + + bindingList := &fleetv1beta1.ClusterResourceBindingList{} + labelSelector := labels.SelectorFromSet(labels.Set{fleetv1beta1.CRPTrackingLabel: bindingOwner}) + // List bindings directly from the API server. + if err := f.uncachedReader.List(ctx, bindingList, &client.ListOptions{LabelSelector: labelSelector}); err != nil { + return nil, controller.NewAPIServerError(fmt.Errorf(errorFormat, err)) + } + return bindingList.Items, nil +} + +// removeSchedulerFinalizerFromBindings removes the scheduler finalizer from a list of bindings. +func (f *framework) removeSchedulerFinalizerFromBindings(ctx context.Context, bindings []*fleetv1beta1.ClusterResourceBinding) error { + errorFormat := "failed to remove scheduler finalizer from binding %s: %w" + + for _, binding := range bindings { + controllerutil.RemoveFinalizer(binding, utils.SchedulerFinalizer) + if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil { + return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err)) + } + } + return nil +} diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go new file mode 100644 index 000000000..941ec383f --- /dev/null +++ b/pkg/scheduler/framework/framework_test.go @@ -0,0 +1,358 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "context" + "log" + "os" + "testing" + + "github.com/google/go-cmp/cmp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils" +) + +const ( + CRPName = "test-placement" + policyName = "test-policy" + bindingName = "test-binding" +) + +// TO-DO (chenyu1): expand the test cases as development stablizes. + +// TestMain sets up the test environment. +func TestMain(m *testing.M) { + // Add custom APIs to the runtime scheme. + if err := fleetv1beta1.AddToScheme(scheme.Scheme); err != nil { + log.Fatalf("failed to add custom APIs to the runtime scheme: %v", err) + } + + os.Exit(m.Run()) +} + +// TestExtractNumOfClustersFromPolicySnapshot tests the extractNumOfClustersFromPolicySnapshot function. +func TestExtractNumOfClustersFromPolicySnapshot(t *testing.T) { + testCases := []struct { + name string + policy *fleetv1beta1.ClusterPolicySnapshot + wantNumOfClusters int + expectedToFail bool + }{ + { + name: "valid annotation", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: "1", + }, + }, + }, + wantNumOfClusters: 1, + }, + { + name: "no annotation", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + }, + expectedToFail: true, + }, + { + name: "invalid annotation: not an integer", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: "abc", + }, + }, + }, + expectedToFail: true, + }, + { + name: "invalid annotation: negative integer", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: "-1", + }, + }, + }, + expectedToFail: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + numOfClusters, err := extractNumOfClustersFromPolicySnapshot(tc.policy) + if tc.expectedToFail { + if err == nil { + t.Fatalf("extractNumOfClustersFromPolicySnapshot() = %v, %v, want error", numOfClusters, err) + } + return + } + + if numOfClusters != tc.wantNumOfClusters { + t.Fatalf("extractNumOfClustersFromPolicySnapshot() = %v, %v, want %v, nil", numOfClusters, err, tc.wantNumOfClusters) + } + }) + } +} + +// TestCollectClusters tests the collectClusters method. +func TestCollectClusters(t *testing.T) { + cluster := fleetv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-1", + }, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(&cluster). + Build() + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + client: fakeClient, + } + + ctx := context.Background() + clusters, err := f.collectClusters(ctx) + if err != nil { + t.Fatalf("collectClusters() = %v, %v, want no error", clusters, err) + } + + wantClusters := []fleetv1beta1.MemberCluster{cluster} + if !cmp.Equal(clusters, wantClusters) { + t.Fatalf("collectClusters() = %v, %v, want %v, nil", clusters, err, wantClusters) + } +} + +// TestExtractOwnerCRPNameFromPolicySnapshot tests the extractOwnerCRPNameFromPolicySnapshot method. +func TestExtractOwnerCRPNameFromPolicySnapshot(t *testing.T) { + testCases := []struct { + name string + policy *fleetv1beta1.ClusterPolicySnapshot + expectToFail bool + }{ + { + name: "policy with CRP owner reference", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: utils.CRPV1Beta1GVK.Kind, + Name: CRPName, + }, + }, + }, + }, + }, + { + name: "policy without CRP owner reference", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + OwnerReferences: []metav1.OwnerReference{}, + }, + }, + expectToFail: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + owner, err := extractOwnerCRPNameFromPolicySnapshot(tc.policy) + if tc.expectToFail { + if err == nil { + t.Fatalf("extractOwnerCRPNameFromPolicySnapshot() = %v, %v, want cannot find owner ref error", owner, err) + } + return + } + + if err != nil || owner != CRPName { + t.Fatalf("extractOwnerCRPNameFromPolicySnapshot() = %v, %v, want %v, no error", owner, err, CRPName) + } + }) + } +} + +// TestCollectBindings tests the collectBindings method. +func TestCollectBindings(t *testing.T) { + binding := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Labels: map[string]string{ + fleetv1beta1.CRPTrackingLabel: CRPName, + }, + }, + } + altCRPName := "another-test-placement" + + testCases := []struct { + name string + binding *fleetv1beta1.ClusterResourceBinding + policy *fleetv1beta1.ClusterPolicySnapshot + expectToFail bool + expectToFindNoBindings bool + }{ + { + name: "found matching bindings", + binding: binding, + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: utils.CRPV1Beta1GVK.Kind, + Name: CRPName, + }, + }, + }, + }, + }, + { + name: "no owner reference in policy", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + }, + expectToFail: true, + }, + { + name: "no matching bindings", + binding: binding, + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: utils.CRPV1Beta1GVK.Kind, + Name: altCRPName, + }, + }, + }, + }, + expectToFindNoBindings: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fakeClientBuilder := fake.NewClientBuilder().WithScheme(scheme.Scheme) + if tc.binding != nil { + fakeClientBuilder.WithObjects(tc.binding) + } + fakeClient := fakeClientBuilder.Build() + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + uncachedReader: fakeClient, + } + + ctx := context.Background() + bindings, err := f.collectBindings(ctx, tc.policy) + if tc.expectToFail { + if err == nil { + t.Fatalf("collectBindings() = %v, %v, want failed to collect bindings error", bindings, err) + } + return + } + + if err != nil { + t.Fatalf("collectBindings() = %v, %v, want %v, no error", bindings, err, binding) + } + wantBindings := []fleetv1beta1.ClusterResourceBinding{} + if !tc.expectToFindNoBindings { + wantBindings = append(wantBindings, *binding) + } + if !cmp.Equal(bindings, wantBindings) { + t.Fatalf("collectBindings() = %v, %v, want %v, no error", bindings, err, wantBindings) + } + }) + } +} + +// TestClassifyBindings tests the classifyBindings function. +func TestClassifyBindings(t *testing.T) { + timestamp := metav1.Now() + activeBinding := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "active-binding", + }, + } + deletedBindingWithDispatcherFinalizer := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-binding-with-dispatcher-finalizer", + DeletionTimestamp: ×tamp, + Finalizers: []string{ + utils.DispatcherFinalizer, + }, + }, + } + deletedBindingWithoutDispatcherFinalizer := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-binding-without-dispatcher-finalizer", + DeletionTimestamp: ×tamp, + }, + } + + active, deletedWith, deletedWithout := classifyBindings([]fleetv1beta1.ClusterResourceBinding{*activeBinding, *deletedBindingWithDispatcherFinalizer, *deletedBindingWithoutDispatcherFinalizer}) + if diff := cmp.Diff(active, []*fleetv1beta1.ClusterResourceBinding{activeBinding}); diff != "" { + t.Errorf("classifyBindings() active = %v, want %v", active, []*fleetv1beta1.ClusterResourceBinding{activeBinding}) + } + if !cmp.Equal(deletedWith, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithDispatcherFinalizer}) { + t.Errorf("classifyBindings() deletedWithDispatcherFinalizer = %v, want %v", deletedWith, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithDispatcherFinalizer}) + } + if !cmp.Equal(deletedWithout, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithoutDispatcherFinalizer}) { + t.Errorf("classifyBindings() deletedWithoutDispatcherFinalizer = %v, want %v", deletedWithout, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithoutDispatcherFinalizer}) + } +} + +// TestRemoveSchedulerFinalizerFromBindings tests the removeSchedulerFinalizerFromBindings method. +func TestRemoveSchedulerFinalizerFromBindings(t *testing.T) { + binding := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Finalizers: []string{utils.SchedulerFinalizer}, + }, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(binding). + Build() + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + client: fakeClient, + } + + ctx := context.Background() + if err := f.removeSchedulerFinalizerFromBindings(ctx, []*fleetv1beta1.ClusterResourceBinding{binding}); err != nil { + t.Fatalf("removeSchedulerFinalizerFromBindings() = %v, want no error", err) + } + + // Verify that the finalizer has been removed. + updatedBinding := &fleetv1beta1.ClusterResourceBinding{} + if err := fakeClient.Get(ctx, types.NamespacedName{Name: bindingName}, updatedBinding); err != nil { + t.Fatalf("Binding Get(%v) = %v, want no error", bindingName, err) + } + + if controllerutil.ContainsFinalizer(updatedBinding, utils.SchedulerFinalizer) { + t.Fatalf("Binding %s finalizers = %v, want no scheduler finalizer", bindingName, updatedBinding.Finalizers) + } +} diff --git a/pkg/scheduler/framework/frameworkutils.go b/pkg/scheduler/framework/frameworkutils.go new file mode 100644 index 000000000..5ae7c4eed --- /dev/null +++ b/pkg/scheduler/framework/frameworkutils.go @@ -0,0 +1,74 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "fmt" + "strconv" + + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils" + "go.goms.io/fleet/pkg/utils/controller" +) + +// extractNumOfClustersFromPolicySnapshot extracts the numOfClusters from the policy snapshot. +func extractNumOfClustersFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (int, error) { + numOfClustersStr, ok := policy.Annotations[fleetv1beta1.NumberOfClustersAnnotation] + if !ok { + return 0, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot find annotation %s", fleetv1beta1.NumberOfClustersAnnotation)) + } + + // Cast the annotation to an integer; throw an error if the cast cannot be completed or the value is negative. + numOfClusters, err := strconv.Atoi(numOfClustersStr) + if err != nil || numOfClusters < 0 { + return 0, controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid annotation %s: %s is not a valid count: %w", fleetv1beta1.NumberOfClustersAnnotation, numOfClustersStr, err)) + } + + return numOfClusters, nil +} + +// extractOwnerCRPNameFromPolicySnapshot extracts the name of the owner CRP from the policy snapshot. +func extractOwnerCRPNameFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (string, error) { + var owner string + for _, ownerRef := range policy.OwnerReferences { + if ownerRef.Kind == utils.CRPV1Beta1GVK.Kind { + owner = ownerRef.Name + break + } + } + if len(owner) == 0 { + return "", fmt.Errorf("cannot find owner reference for policy snapshot %v", policy.Name) + } + return owner, nil +} + +// classifyBindings categorizes bindings into three groups: +// * active: active bindings, that is, bindings that are not marked for deletion; and +// * deletedWithDispatcherFinalizer: bindings that are marked for deletion, but still has the dispatcher finalizer present; and +// * deletedWithoutDispatcherFinalizer: bindings that are marked for deletion, and the dispatcher finalizer is already removed. +func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding) (active, deletedWithDispatcherFinalizer, deletedWithoutDispatcherFinalizer []*fleetv1beta1.ClusterResourceBinding) { + // Pre-allocate arrays. + active = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + deletedWithDispatcherFinalizer = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + deletedWithoutDispatcherFinalizer = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + + for idx := range bindings { + binding := bindings[idx] + if binding.DeletionTimestamp != nil { + if controllerutil.ContainsFinalizer(&binding, utils.DispatcherFinalizer) { + deletedWithDispatcherFinalizer = append(deletedWithDispatcherFinalizer, &binding) + } else { + deletedWithoutDispatcherFinalizer = append(deletedWithoutDispatcherFinalizer, &binding) + } + } else { + active = append(active, &binding) + } + } + + return active, deletedWithDispatcherFinalizer, deletedWithoutDispatcherFinalizer +} diff --git a/pkg/utils/common.go b/pkg/utils/common.go index 2a8d915c5..95f09d33b 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -24,6 +24,7 @@ import ( "k8s.io/klog/v2" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/utils/informer" ) @@ -59,6 +60,14 @@ const ( // MemberClusterFinalizer is used to make sure that we handle gc of all the member cluster resources on the hub cluster. MemberClusterFinalizer = "work.fleet.azure.com/membercluster-finalizer" + // DispatcherFinalizer is added by the dispatcher to make sure that a binding can only be deleted if the dispatcher + // has removed all selected resources from the bound cluster. + DispatcherFinalizer = "fleet.io/dispatcher-cleanup" + + // SchedulerFinalizer is added by the scheduler to make sure that a binding can only be deleted if the scheduler + // has relieved it from scheduling consideration. + SchedulerFinalizer = "fleet.io/scheduler-cleanup" + // LastWorkUpdateTimeAnnotationKey is used to mark the last update time on a work object. LastWorkUpdateTimeAnnotationKey = "work.fleet.azure.com/last-update-time" ) @@ -141,6 +150,13 @@ var ( Version: corev1.SchemeGroupVersion.Version, Resource: "services", } + + // TO-DO (chenyu1): Add more common GVRs/GVKs here. + CRPV1Beta1GVK = schema.GroupVersionKind{ + Group: fleetv1beta1.GroupVersion.Group, + Version: fleetv1beta1.GroupVersion.Version, + Kind: "ClusterResourcePlacement", + } ) // RandSecureInt returns a uniform random value in [1, max] or panic. From aa1332450d4575162a5856405aab73e55fe02bfe Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Mon, 19 Jun 2023 21:28:30 +0800 Subject: [PATCH 2/4] Minor fixes --- apis/placement/v1beta1/binding_types.go | 10 +++ .../placement_controller.go | 15 +--- pkg/scheduler/framework/framework.go | 12 +-- pkg/scheduler/framework/framework_test.go | 78 +--------------- pkg/scheduler/framework/frameworkutils.go | 20 +---- pkg/utils/common.go | 18 ++++ pkg/utils/common_test.go | 88 +++++++++++++++++++ 7 files changed, 127 insertions(+), 114 deletions(-) create mode 100644 pkg/utils/common_test.go diff --git a/apis/placement/v1beta1/binding_types.go b/apis/placement/v1beta1/binding_types.go index cdefbb892..e173122f8 100644 --- a/apis/placement/v1beta1/binding_types.go +++ b/apis/placement/v1beta1/binding_types.go @@ -10,6 +10,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + // DispatcherFinalizer is added by the dispatcher to make sure that a binding can only be deleted if the dispatcher + // has removed all selected resources from the bound cluster. + DispatcherFinalizer = fleetPrefix + "dispatcher-cleanup" + + // SchedulerFinalizer is added by the scheduler to make sure that a binding can only be deleted if the scheduler + // has relieved it from scheduling consideration. + SchedulerFinalizer = fleetPrefix + "scheduler-cleanup" +) + // +kubebuilder:object:root=true // +kubebuilder:resource:scope=Cluster,categories={fleet},shortName=rb // +kubebuilder:subresource:status diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index 088d09187..fd5b88b70 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -436,7 +436,7 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv if crp.Spec.Policy != nil && crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType && crp.Spec.Policy.NumberOfClusters != nil { - oldCount, err := parseNumberOfClustersFromAnnotation(latest) + oldCount, err := utils.ExtractNumOfClustersFromPolicySnapshot(latest) if err != nil { klog.ErrorS(err, "Failed to parse the numberOfClusterAnnotation", "clusterPolicySnapshot", klog.KObj(latest)) return controller.NewUnexpectedBehaviorError(err) @@ -527,19 +527,6 @@ func parsePolicyIndexFromLabel(s *fleetv1beta1.ClusterPolicySnapshot) (int, erro return v, nil } -// parseNumberOfClustersFromAnnotation returns error when parsing the annotation which should never return error in production. -func parseNumberOfClustersFromAnnotation(s *fleetv1beta1.ClusterPolicySnapshot) (int, error) { - n := s.Annotations[fleetv1beta1.NumberOfClustersAnnotation] - v, err := strconv.Atoi(n) - if err != nil { - return -1, err - } - if v < 0 { - return -1, fmt.Errorf("numberOfCluster should not be negative: %d", v) - } - return v, nil -} - func generatePolicyHash(policy *fleetv1beta1.PlacementPolicy) (string, error) { jsonBytes, err := json.Marshal(policy) if err != nil { diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index 9257dd773..47679a620 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -159,14 +159,14 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1be klog.V(2).InfoS("Scheduling cycle ends", "clusterPolicySnapshot", clusterPolicySnapshotRef, "latency", latency) }() - errorMessage := "failed to run scheduling cycle" + errorMessage := "Failed to run scheduling cycle" klog.V(2).InfoS("Retrieving clusters and bindings", "clusterPolicySnapshot", clusterPolicySnapshotRef) // Retrieve the desired number of clusters from the policy. // // TO-DO (chenyu1): assign variable(s) when more logic is added. - _, err = extractNumOfClustersFromPolicySnapshot(policy) + _, err = utils.ExtractNumOfClustersFromPolicySnapshot(policy) if err != nil { klog.ErrorS(err, errorMessage, "clusterPolicySnapshot", clusterPolicySnapshotRef) return ctrl.Result{}, err @@ -181,7 +181,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1be // TO-DO (chenyu1): assign variable(s) when more logic is added. _, err = f.collectClusters(ctx) if err != nil { - klog.ErrorS(err, errorMessage, klog.KObj(policy)) + klog.ErrorS(err, errorMessage, clusterPolicySnapshotRef) return ctrl.Result{}, err } @@ -200,7 +200,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1be // TO-DO (chenyu1): explore the possbilities of using a mutation cache for better performance. bindings, err := f.collectBindings(ctx, policy) if err != nil { - klog.ErrorS(err, errorMessage, klog.KObj(policy)) + klog.ErrorS(err, errorMessage, clusterPolicySnapshotRef) return ctrl.Result{}, err } @@ -223,7 +223,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1be // If a binding has been marked for deletion and no longer has the dispatcher finalizer, the scheduler // removes its own finalizer from it, to clear it for eventual deletion. if err := f.removeSchedulerFinalizerFromBindings(ctx, deletedWithoutDispatcherFinalizer); err != nil { - klog.ErrorS(err, errorMessage, klog.KObj(policy)) + klog.ErrorS(err, errorMessage, clusterPolicySnapshotRef) return ctrl.Result{}, err } @@ -267,7 +267,7 @@ func (f *framework) removeSchedulerFinalizerFromBindings(ctx context.Context, bi errorFormat := "failed to remove scheduler finalizer from binding %s: %w" for _, binding := range bindings { - controllerutil.RemoveFinalizer(binding, utils.SchedulerFinalizer) + controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerFinalizer) if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil { return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err)) } diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index 941ec383f..7fde76b58 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -40,78 +40,6 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -// TestExtractNumOfClustersFromPolicySnapshot tests the extractNumOfClustersFromPolicySnapshot function. -func TestExtractNumOfClustersFromPolicySnapshot(t *testing.T) { - testCases := []struct { - name string - policy *fleetv1beta1.ClusterPolicySnapshot - wantNumOfClusters int - expectedToFail bool - }{ - { - name: "valid annotation", - policy: &fleetv1beta1.ClusterPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - Annotations: map[string]string{ - fleetv1beta1.NumberOfClustersAnnotation: "1", - }, - }, - }, - wantNumOfClusters: 1, - }, - { - name: "no annotation", - policy: &fleetv1beta1.ClusterPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - }, - }, - expectedToFail: true, - }, - { - name: "invalid annotation: not an integer", - policy: &fleetv1beta1.ClusterPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - Annotations: map[string]string{ - fleetv1beta1.NumberOfClustersAnnotation: "abc", - }, - }, - }, - expectedToFail: true, - }, - { - name: "invalid annotation: negative integer", - policy: &fleetv1beta1.ClusterPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - Annotations: map[string]string{ - fleetv1beta1.NumberOfClustersAnnotation: "-1", - }, - }, - }, - expectedToFail: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - numOfClusters, err := extractNumOfClustersFromPolicySnapshot(tc.policy) - if tc.expectedToFail { - if err == nil { - t.Fatalf("extractNumOfClustersFromPolicySnapshot() = %v, %v, want error", numOfClusters, err) - } - return - } - - if numOfClusters != tc.wantNumOfClusters { - t.Fatalf("extractNumOfClustersFromPolicySnapshot() = %v, %v, want %v, nil", numOfClusters, err, tc.wantNumOfClusters) - } - }) - } -} - // TestCollectClusters tests the collectClusters method. func TestCollectClusters(t *testing.T) { cluster := fleetv1beta1.MemberCluster{ @@ -300,7 +228,7 @@ func TestClassifyBindings(t *testing.T) { Name: "deleted-binding-with-dispatcher-finalizer", DeletionTimestamp: ×tamp, Finalizers: []string{ - utils.DispatcherFinalizer, + fleetv1beta1.DispatcherFinalizer, }, }, } @@ -328,7 +256,7 @@ func TestRemoveSchedulerFinalizerFromBindings(t *testing.T) { binding := &fleetv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: bindingName, - Finalizers: []string{utils.SchedulerFinalizer}, + Finalizers: []string{fleetv1beta1.SchedulerFinalizer}, }, } @@ -352,7 +280,7 @@ func TestRemoveSchedulerFinalizerFromBindings(t *testing.T) { t.Fatalf("Binding Get(%v) = %v, want no error", bindingName, err) } - if controllerutil.ContainsFinalizer(updatedBinding, utils.SchedulerFinalizer) { + if controllerutil.ContainsFinalizer(updatedBinding, fleetv1beta1.SchedulerFinalizer) { t.Fatalf("Binding %s finalizers = %v, want no scheduler finalizer", bindingName, updatedBinding.Finalizers) } } diff --git a/pkg/scheduler/framework/frameworkutils.go b/pkg/scheduler/framework/frameworkutils.go index 5ae7c4eed..3cb5416d1 100644 --- a/pkg/scheduler/framework/frameworkutils.go +++ b/pkg/scheduler/framework/frameworkutils.go @@ -7,31 +7,13 @@ package framework import ( "fmt" - "strconv" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/utils" - "go.goms.io/fleet/pkg/utils/controller" ) -// extractNumOfClustersFromPolicySnapshot extracts the numOfClusters from the policy snapshot. -func extractNumOfClustersFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (int, error) { - numOfClustersStr, ok := policy.Annotations[fleetv1beta1.NumberOfClustersAnnotation] - if !ok { - return 0, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot find annotation %s", fleetv1beta1.NumberOfClustersAnnotation)) - } - - // Cast the annotation to an integer; throw an error if the cast cannot be completed or the value is negative. - numOfClusters, err := strconv.Atoi(numOfClustersStr) - if err != nil || numOfClusters < 0 { - return 0, controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid annotation %s: %s is not a valid count: %w", fleetv1beta1.NumberOfClustersAnnotation, numOfClustersStr, err)) - } - - return numOfClusters, nil -} - // extractOwnerCRPNameFromPolicySnapshot extracts the name of the owner CRP from the policy snapshot. func extractOwnerCRPNameFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (string, error) { var owner string @@ -60,7 +42,7 @@ func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding) (active, d for idx := range bindings { binding := bindings[idx] if binding.DeletionTimestamp != nil { - if controllerutil.ContainsFinalizer(&binding, utils.DispatcherFinalizer) { + if controllerutil.ContainsFinalizer(&binding, fleetv1beta1.DispatcherFinalizer) { deletedWithDispatcherFinalizer = append(deletedWithDispatcherFinalizer, &binding) } else { deletedWithoutDispatcherFinalizer = append(deletedWithoutDispatcherFinalizer, &binding) diff --git a/pkg/utils/common.go b/pkg/utils/common.go index 95f09d33b..705e715cc 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -9,6 +9,7 @@ import ( "crypto/rand" "fmt" "math/big" + "strconv" "strings" "time" @@ -274,3 +275,20 @@ func ShouldPropagateNamespace(namespace string, skippedNamespaces map[string]boo } return true } + +// ExtractNumOfClustersFromPolicySnapshot extracts the numOfClusters value from the annotations +// on a policy snapshot. +func ExtractNumOfClustersFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (int, error) { + numOfClustersStr, ok := policy.Annotations[fleetv1beta1.NumberOfClustersAnnotation] + if !ok { + return 0, fmt.Errorf("cannot find annotation %s", fleetv1beta1.NumberOfClustersAnnotation) + } + + // Cast the annotation to an integer; throw an error if the cast cannot be completed or the value is negative. + numOfClusters, err := strconv.Atoi(numOfClustersStr) + if err != nil || numOfClusters < 0 { + return 0, fmt.Errorf("invalid annotation %s: %s is not a valid count: %w", fleetv1beta1.NumberOfClustersAnnotation, numOfClustersStr, err) + } + + return numOfClusters, nil +} diff --git a/pkg/utils/common_test.go b/pkg/utils/common_test.go new file mode 100644 index 000000000..3c5389564 --- /dev/null +++ b/pkg/utils/common_test.go @@ -0,0 +1,88 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package utils + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" +) + +// TestExtractNumOfClustersFromPolicySnapshot tests the extractNumOfClustersFromPolicySnapshot function. +func TestExtractNumOfClustersFromPolicySnapshot(t *testing.T) { + policyName := "test-policy" + + testCases := []struct { + name string + policy *fleetv1beta1.ClusterPolicySnapshot + wantNumOfClusters int + expectedToFail bool + }{ + { + name: "valid annotation", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: "1", + }, + }, + }, + wantNumOfClusters: 1, + }, + { + name: "no annotation", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + }, + expectedToFail: true, + }, + { + name: "invalid annotation: not an integer", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: "abc", + }, + }, + }, + expectedToFail: true, + }, + { + name: "invalid annotation: negative integer", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumberOfClustersAnnotation: "-1", + }, + }, + }, + expectedToFail: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + numOfClusters, err := ExtractNumOfClustersFromPolicySnapshot(tc.policy) + if tc.expectedToFail { + if err == nil { + t.Fatalf("extractNumOfClustersFromPolicySnapshot() = %v, %v, want error", numOfClusters, err) + } + return + } + + if numOfClusters != tc.wantNumOfClusters { + t.Fatalf("extractNumOfClustersFromPolicySnapshot() = %v, %v, want %v, nil", numOfClusters, err, tc.wantNumOfClusters) + } + }) + } +} From 0c7d0bfbcb193a60276c8af10219ebfcfadd4d31 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Wed, 21 Jun 2023 17:34:25 +0800 Subject: [PATCH 3/4] Reflect changes in design refresh --- apis/placement/v1beta1/binding_types.go | 33 +++++ ...t_types.go => schedulingpolicysnapshot.go} | 22 ++-- .../v1beta1/zz_generated.deepcopy.go | 118 +++++++++--------- ...karavel.io_schedulingpolicysnapshots.yaml} | 14 +-- .../placement_controller.go | 38 +++--- .../placement_controller_test.go | 68 +++++----- pkg/scheduler/framework/framework.go | 55 ++++---- pkg/scheduler/framework/framework_test.go | 67 +++++++--- pkg/scheduler/framework/frameworkutils.go | 56 ++++++--- pkg/scheduler/framework/interface.go | 10 +- pkg/scheduler/framework/profile_test.go | 10 +- pkg/scheduler/queue/queue.go | 86 ++++++------- pkg/scheduler/queue/queue_test.go | 14 +-- pkg/utils/common.go | 10 +- pkg/utils/common_test.go | 10 +- 15 files changed, 342 insertions(+), 269 deletions(-) rename apis/placement/v1beta1/{policysnapshot_types.go => schedulingpolicysnapshot.go} (86%) rename config/crd/bases/{placement.karavel.io_clusterpolicysnapshots.yaml => placement.karavel.io_schedulingpolicysnapshots.yaml} (98%) diff --git a/apis/placement/v1beta1/binding_types.go b/apis/placement/v1beta1/binding_types.go index e173122f8..e131249e6 100644 --- a/apis/placement/v1beta1/binding_types.go +++ b/apis/placement/v1beta1/binding_types.go @@ -18,6 +18,39 @@ const ( // SchedulerFinalizer is added by the scheduler to make sure that a binding can only be deleted if the scheduler // has relieved it from scheduling consideration. SchedulerFinalizer = fleetPrefix + "scheduler-cleanup" + + // ActiveBindingLabel is added by the update controller to mark that a binding is active, i.e., the dispatcher + // should place resources to it. + // + // Note that an active binding may not be associated with the latest scheduling policy snapshot or the latest + // resource snapshot. It may be up to another controller, e.g., the rolling update controller, to modify the + // association (if applicable). In certain cases (e.g., not enough fitting clusters), the binding may not even + // has a target cluster. + // + // Note also that it is not the scheduler's responsibility to add this label, even though it does + // reads this label to inform the scheduling cycle.. + ActiveBindingLabel = fleetPrefix + "is-active-binding" + + // CreatingBindingLabel is added by the scheduler to mark that a binding is being created. Any binding in + // this state should not be picked up by the dispatcher. + // + // Note that the scheduler **always** produces enough number of bindings, as user specified, after a scheduling run, + // even if there might not be enough number of fitting clusters. + // + // Note also that it is up to another controller, e.g., the rolling update controller, to mark a creating + // binding as active. + CreatingBindingLabel = fleetPrefix + "is-creating-binding" + + // ObsoleteBindingLabel is added by the scheduler to mark that a binding is no longer needed, i.e., its + // associated cluster (if any) no longer fits the current (as seen by the scheduler) scheduling policy. + // + // Note that it is up to another controller, e.g, the rolling update controller, to actually delete the + // binding. + ObsoleteBindingLabel = fleetPrefix + "is-obsolete-binding" + + // NoTargetClusterBindingLabel is added by the scheduler to mark that a binding does not have a target cluster. + // This usually happens when there is not enough number of fitting clusters in the system. + NoTargetClusterBindingLabel = fleetPrefix + "no-target-cluster-binding" ) // +kubebuilder:object:root=true diff --git a/apis/placement/v1beta1/policysnapshot_types.go b/apis/placement/v1beta1/schedulingpolicysnapshot.go similarity index 86% rename from apis/placement/v1beta1/policysnapshot_types.go rename to apis/placement/v1beta1/schedulingpolicysnapshot.go index 7ff78402e..0c3d86e02 100644 --- a/apis/placement/v1beta1/policysnapshot_types.go +++ b/apis/placement/v1beta1/schedulingpolicysnapshot.go @@ -14,7 +14,7 @@ const ( // PolicyIndexLabel is the label that indicate the policy snapshot index of a cluster policy. PolicyIndexLabel = fleetPrefix + "policyIndex" - // PolicySnapshotNameFmt is clusterPolicySnapshot name format: {CRPName}-{PolicySnapshotIndex}. + // PolicySnapshotNameFmt is schedulingPolicySnapshot name format: {CRPName}-{PolicySnapshotIndex}. PolicySnapshotNameFmt = "%s-%d" // NumberOfClustersAnnotation is the annotation that indicates how many clusters should be selected for selectN placement type. @@ -30,15 +30,15 @@ const ( // +kubebuilder:printcolumn:JSONPath=`.metadata.creationTimestamp`,name="Age",type=date // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object -// ClusterPolicySnapshot is used to store a snapshot of cluster placement policy. +// SchedulingPolicySnapshot is used to store a snapshot of cluster placement policy. // Its spec is immutable. -// The naming convention of a ClusterPolicySnapshot is {CRPName}-{PolicySnapshotIndex}. +// The naming convention of a SchedulingPolicySnapshot is {CRPName}-{PolicySnapshotIndex}. // PolicySnapshotIndex will begin with 0. // Each snapshot must have the following labels: // - `CRPTrackingLabel` which points to its owner CRP. // - `PolicyIndexLabel` which is the index of the policy snapshot. // - `IsLatestSnapshotLabel` which indicates whether the snapshot is the latest one. -type ClusterPolicySnapshot struct { +type SchedulingPolicySnapshot struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -131,27 +131,27 @@ type ClusterScore struct { TopologySpreadScore *int32 `json:"priorityScore,omitempty"` } -// ClusterPolicySnapshotList contains a list of ClusterPolicySnapshot. +// SchedulingPolicySnapshotList contains a list of SchedulingPolicySnapshot. // +kubebuilder:resource:scope="Cluster" // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object -type ClusterPolicySnapshotList struct { +type SchedulingPolicySnapshotList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` - Items []ClusterPolicySnapshot `json:"items"` + Items []SchedulingPolicySnapshot `json:"items"` } -// SetConditions sets the given conditions on the ClusterPolicySnapshot. -func (m *ClusterPolicySnapshot) SetConditions(conditions ...metav1.Condition) { +// SetConditions sets the given conditions on the SchedulingPolicySnapshot. +func (m *SchedulingPolicySnapshot) SetConditions(conditions ...metav1.Condition) { for _, c := range conditions { meta.SetStatusCondition(&m.Status.Conditions, c) } } // GetCondition returns the condition of the given type if exists. -func (m *ClusterPolicySnapshot) GetCondition(conditionType string) *metav1.Condition { +func (m *SchedulingPolicySnapshot) GetCondition(conditionType string) *metav1.Condition { return meta.FindStatusCondition(m.Status.Conditions, conditionType) } func init() { - SchemeBuilder.Register(&ClusterPolicySnapshot{}, &ClusterPolicySnapshotList{}) + SchemeBuilder.Register(&SchedulingPolicySnapshot{}, &SchedulingPolicySnapshotList{}) } diff --git a/apis/placement/v1beta1/zz_generated.deepcopy.go b/apis/placement/v1beta1/zz_generated.deepcopy.go index d78c05e29..ef5f796cf 100644 --- a/apis/placement/v1beta1/zz_generated.deepcopy.go +++ b/apis/placement/v1beta1/zz_generated.deepcopy.go @@ -106,65 +106,6 @@ func (in *ClusterDecision) DeepCopy() *ClusterDecision { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ClusterPolicySnapshot) DeepCopyInto(out *ClusterPolicySnapshot) { - *out = *in - out.TypeMeta = in.TypeMeta - in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - in.Spec.DeepCopyInto(&out.Spec) - in.Status.DeepCopyInto(&out.Status) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterPolicySnapshot. -func (in *ClusterPolicySnapshot) DeepCopy() *ClusterPolicySnapshot { - if in == nil { - return nil - } - out := new(ClusterPolicySnapshot) - in.DeepCopyInto(out) - return out -} - -// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *ClusterPolicySnapshot) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ClusterPolicySnapshotList) DeepCopyInto(out *ClusterPolicySnapshotList) { - *out = *in - out.TypeMeta = in.TypeMeta - in.ListMeta.DeepCopyInto(&out.ListMeta) - if in.Items != nil { - in, out := &in.Items, &out.Items - *out = make([]ClusterPolicySnapshot, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterPolicySnapshotList. -func (in *ClusterPolicySnapshotList) DeepCopy() *ClusterPolicySnapshotList { - if in == nil { - return nil - } - out := new(ClusterPolicySnapshotList) - in.DeepCopyInto(out) - return out -} - -// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *ClusterPolicySnapshotList) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterResourceBinding) DeepCopyInto(out *ClusterResourceBinding) { *out = *in @@ -959,6 +900,65 @@ func (in *ResourceUsage) DeepCopy() *ResourceUsage { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulingPolicySnapshot) DeepCopyInto(out *SchedulingPolicySnapshot) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingPolicySnapshot. +func (in *SchedulingPolicySnapshot) DeepCopy() *SchedulingPolicySnapshot { + if in == nil { + return nil + } + out := new(SchedulingPolicySnapshot) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SchedulingPolicySnapshot) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulingPolicySnapshotList) DeepCopyInto(out *SchedulingPolicySnapshotList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]SchedulingPolicySnapshot, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingPolicySnapshotList. +func (in *SchedulingPolicySnapshotList) DeepCopy() *SchedulingPolicySnapshotList { + if in == nil { + return nil + } + out := new(SchedulingPolicySnapshotList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SchedulingPolicySnapshotList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TopologySpreadConstraint) DeepCopyInto(out *TopologySpreadConstraint) { *out = *in diff --git a/config/crd/bases/placement.karavel.io_clusterpolicysnapshots.yaml b/config/crd/bases/placement.karavel.io_schedulingpolicysnapshots.yaml similarity index 98% rename from config/crd/bases/placement.karavel.io_clusterpolicysnapshots.yaml rename to config/crd/bases/placement.karavel.io_schedulingpolicysnapshots.yaml index 017b0be72..243b48c75 100644 --- a/config/crd/bases/placement.karavel.io_clusterpolicysnapshots.yaml +++ b/config/crd/bases/placement.karavel.io_schedulingpolicysnapshots.yaml @@ -4,18 +4,18 @@ kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.11.4 - name: clusterpolicysnapshots.placement.karavel.io + name: schedulingpolicysnapshots.placement.karavel.io spec: group: placement.karavel.io names: categories: - fleet-workload - kind: ClusterPolicySnapshot - listKind: ClusterPolicySnapshotList - plural: clusterpolicysnapshots + kind: SchedulingPolicySnapshot + listKind: SchedulingPolicySnapshotList + plural: schedulingpolicysnapshots shortNames: - pss - singular: clusterpolicysnapshot + singular: schedulingpolicysnapshot scope: Cluster versions: - additionalPrinterColumns: @@ -28,8 +28,8 @@ spec: name: v1beta1 schema: openAPIV3Schema: - description: 'ClusterPolicySnapshot is used to store a snapshot of cluster - placement policy. Its spec is immutable. The naming convention of a ClusterPolicySnapshot + description: 'SchedulingPolicySnapshot is used to store a snapshot of cluster + placement policy. Its spec is immutable. The naming convention of a SchedulingPolicySnapshot is {CRPName}-{PolicySnapshotIndex}. PolicySnapshotIndex will begin with 0. Each snapshot must have the following labels: - `CRPTrackingLabel` which points to its owner CRP. - `PolicyIndexLabel` which is the index of the diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index fd5b88b70..2001026fa 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -342,8 +342,8 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl } // handleUpdate handles the create/update clusterResourcePlacement event. -// It creates corresponding clusterPolicySnapshot and clusterResourceSnapshot if needed and updates the status based on -// clusterPolicySnapshot status and work status. +// It creates corresponding schedulingPolicySnapshot and clusterResourceSnapshot if needed and updates the status based on +// schedulingPolicySnapshot status and work status. // If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling. func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (ctrl.Result, error) { crpKObj := klog.KObj(crp) @@ -355,7 +355,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } - latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestClusterPolicySnapshot(ctx, crp) + latestPolicySnapshot, latestPolicySnapshotIndex, err := r.lookupLatestSchedulingPolicySnapshot(ctx, crp) if err != nil { return ctrl.Result{}, err } @@ -367,7 +367,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster // set the latest label to false first to make sure there is only one or none active policy snapshot latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false) if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { - klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) + klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "schedulingPolicySnapshot", klog.KObj(latestPolicySnapshot)) return ctrl.Result{}, controller.NewAPIServerError(err) } } @@ -378,7 +378,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster } else { // create a new policy snapshot latestPolicySnapshotIndex++ - latestPolicySnapshot = &fleetv1beta1.ClusterPolicySnapshot{ + latestPolicySnapshot = &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, crp.Name, latestPolicySnapshotIndex), Labels: map[string]string{ @@ -394,7 +394,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster } policySnapshotKObj := klog.KObj(latestPolicySnapshot) if err := controllerutil.SetControllerReference(crp, latestPolicySnapshot, r.Scheme); err != nil { - klog.ErrorS(err, "Failed to set owner reference", "clusterPolicySnapshot", policySnapshotKObj) + klog.ErrorS(err, "Failed to set owner reference", "schedulingPolicySnapshot", policySnapshotKObj) // should never happen return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } @@ -408,7 +408,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster } if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil { - klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", policySnapshotKObj) + klog.ErrorS(err, "Failed to create new schedulingPolicySnapshot", "schedulingPolicySnapshot", policySnapshotKObj) return ctrl.Result{}, controller.NewAPIServerError(err) } } @@ -420,7 +420,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster } // ensureLatestPolicySnapshot ensures the latest policySnapshot has the isLatest label and the numberOfClusters are updated. -func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, latest *fleetv1beta1.ClusterPolicySnapshot) error { +func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, latest *fleetv1beta1.SchedulingPolicySnapshot) error { needUpdate := false if latest.Labels[fleetv1beta1.IsLatestSnapshotLabel] != strconv.FormatBool(true) { // When latestPolicySnapshot.Spec.PolicyHash == policyHash, @@ -438,7 +438,7 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv crp.Spec.Policy.NumberOfClusters != nil { oldCount, err := utils.ExtractNumOfClustersFromPolicySnapshot(latest) if err != nil { - klog.ErrorS(err, "Failed to parse the numberOfClusterAnnotation", "clusterPolicySnapshot", klog.KObj(latest)) + klog.ErrorS(err, "Failed to parse the numberOfClusterAnnotation", "schedulingPolicySnapshot", klog.KObj(latest)) return controller.NewUnexpectedBehaviorError(err) } newCount := int(*crp.Spec.Policy.NumberOfClusters) @@ -451,13 +451,13 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv return nil } if err := r.Client.Update(ctx, latest); err != nil { - klog.ErrorS(err, "Failed to update the clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latest)) + klog.ErrorS(err, "Failed to update the schedulingPolicySnapshot", "schedulingPolicySnapshot", klog.KObj(latest)) return controller.NewAPIServerError(err) } return nil } -// lookupLatestClusterPolicySnapshot finds the latest snapshots and its policy index. +// lookupLatestSchedulingPolicySnapshot finds the latest snapshots and its policy index. // There will be only one active policy snapshot if exists. // It first checks whether there is an active policy snapshot. // If not, it finds the one whose policyIndex label is the largest. @@ -465,33 +465,33 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv // Return error when 1) cannot list the snapshots 2) there are more than one active policy snapshots 3) snapshot has the // invalid label value. // 2 & 3 should never happen. -func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterPolicySnapshot, int, error) { - snapshotList := &fleetv1beta1.ClusterPolicySnapshotList{} +func (r *Reconciler) lookupLatestSchedulingPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.SchedulingPolicySnapshot, int, error) { + snapshotList := &fleetv1beta1.SchedulingPolicySnapshotList{} latestSnapshotLabelMatcher := client.MatchingLabels{ fleetv1beta1.CRPTrackingLabel: crp.Name, fleetv1beta1.IsLatestSnapshotLabel: strconv.FormatBool(true), } crpKObj := klog.KObj(crp) if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil { - klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) + klog.ErrorS(err, "Failed to list active schedulingPolicySnapshots", "clusterResourcePlacement", crpKObj) return nil, -1, controller.NewAPIServerError(err) } if len(snapshotList.Items) == 1 { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) if err != nil { - klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[0])) + klog.ErrorS(err, "Failed to parse the policy index label", "schedulingPolicySnapshot", klog.KObj(&snapshotList.Items[0])) return nil, -1, controller.NewUnexpectedBehaviorError(err) } return &snapshotList.Items[0], policyIndex, nil } else if len(snapshotList.Items) > 1 { // It means there are multiple active snapshots and should never happen. - err := fmt.Errorf("there are %d active clusterPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) + err := fmt.Errorf("there are %d active schedulingPolicySnapshots owned by clusterResourcePlacement %v", len(snapshotList.Items), crp.Name) klog.ErrorS(err, "It should never happen", "clusterResourcePlacement", crpKObj) return nil, -1, controller.NewUnexpectedBehaviorError(err) } // When there are no active snapshots, find the one who has the largest policy index. if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil { - klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) + klog.ErrorS(err, "Failed to list all schedulingPolicySnapshots", "clusterResourcePlacement", crpKObj) return nil, -1, controller.NewAPIServerError(err) } if len(snapshotList.Items) == 0 { @@ -503,7 +503,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp for i := range snapshotList.Items { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[i]) if err != nil { - klog.ErrorS(err, "Failed to parse the policy index label", "clusterPolicySnapshot", klog.KObj(&snapshotList.Items[i])) + klog.ErrorS(err, "Failed to parse the policy index label", "schedulingPolicySnapshot", klog.KObj(&snapshotList.Items[i])) return nil, -1, controller.NewUnexpectedBehaviorError(err) } if lastPolicyIndex < policyIndex { @@ -515,7 +515,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp } // parsePolicyIndexFromLabel returns error when parsing the label which should never return error in production. -func parsePolicyIndexFromLabel(s *fleetv1beta1.ClusterPolicySnapshot) (int, error) { +func parsePolicyIndexFromLabel(s *fleetv1beta1.SchedulingPolicySnapshot) (int, error) { indexLabel := s.Labels[fleetv1beta1.PolicyIndexLabel] v, err := strconv.Atoi(indexLabel) if err != nil { diff --git a/pkg/controllers/clusterresourceplacement/placement_controller_test.go b/pkg/controllers/clusterresourceplacement/placement_controller_test.go index fe9eb2474..b0112bb43 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller_test.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller_test.go @@ -86,12 +86,12 @@ func TestHandleUpdate(t *testing.T) { unspecifiedPolicyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) tests := []struct { name string - policySnapshots []fleetv1beta1.ClusterPolicySnapshot - wantPolicySnapshots []fleetv1beta1.ClusterPolicySnapshot + policySnapshots []fleetv1beta1.SchedulingPolicySnapshot + wantPolicySnapshots []fleetv1beta1.SchedulingPolicySnapshot }{ { name: "new clusterResourcePolicy and no existing policy snapshots owned by my-crp", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: "another-crp-1", @@ -103,7 +103,7 @@ func TestHandleUpdate(t *testing.T) { }, }, }, - wantPolicySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + wantPolicySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: "another-crp-1", @@ -145,7 +145,7 @@ func TestHandleUpdate(t *testing.T) { }, { name: "crp policy has no change", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -173,7 +173,7 @@ func TestHandleUpdate(t *testing.T) { }, }, }, - wantPolicySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + wantPolicySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -206,7 +206,7 @@ func TestHandleUpdate(t *testing.T) { name: "crp policy has changed and there is no active snapshot", // It happens when last reconcile loop fails after setting the latest label to false and // before creating a new policy snapshot. - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -254,7 +254,7 @@ func TestHandleUpdate(t *testing.T) { }, }, }, - wantPolicySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + wantPolicySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -331,7 +331,7 @@ func TestHandleUpdate(t *testing.T) { }, { name: "crp policy has changed and there is an active snapshot", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -356,7 +356,7 @@ func TestHandleUpdate(t *testing.T) { }, }, }, - wantPolicySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + wantPolicySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -410,7 +410,7 @@ func TestHandleUpdate(t *testing.T) { }, { name: "crp policy has been changed and reverted back and there is no active snapshot", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -460,7 +460,7 @@ func TestHandleUpdate(t *testing.T) { }, }, }, - wantPolicySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + wantPolicySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -514,7 +514,7 @@ func TestHandleUpdate(t *testing.T) { }, { name: "crp policy has not been changed and only the numberOfCluster is changed", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -565,7 +565,7 @@ func TestHandleUpdate(t *testing.T) { }, }, }, - wantPolicySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + wantPolicySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -640,15 +640,15 @@ func TestHandleUpdate(t *testing.T) { if !cmp.Equal(got, want) { t.Errorf("handleUpdate() = %+v, want %+v", got, want) } - clusterPolicySnapshotList := &fleetv1beta1.ClusterPolicySnapshotList{} - if err := fakeClient.List(ctx, clusterPolicySnapshotList); err != nil { - t.Fatalf("clusterPolicySnapshot List() got error %v, want no error", err) + schedulingPolicySnapshotList := &fleetv1beta1.SchedulingPolicySnapshotList{} + if err := fakeClient.List(ctx, schedulingPolicySnapshotList); err != nil { + t.Fatalf("schedulingPolicySnapshot List() got error %v, want no error", err) } options := []cmp.Option{ cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), } - if diff := cmp.Diff(tc.wantPolicySnapshots, clusterPolicySnapshotList.Items, options...); diff != "" { - t.Errorf("clusterPolicysnapShot List() mismatch (-want, +got):\n%s", diff) + if diff := cmp.Diff(tc.wantPolicySnapshots, schedulingPolicySnapshotList.Items, options...); diff != "" { + t.Errorf("schedulingPolicySnapshot List() mismatch (-want, +got):\n%s", diff) } }) } @@ -664,12 +664,12 @@ func TestHandleUpdate_failure(t *testing.T) { policyHash := []byte(fmt.Sprintf("%x", sha256.Sum256(jsonBytes))) tests := []struct { name string - policySnapshots []fleetv1beta1.ClusterPolicySnapshot + policySnapshots []fleetv1beta1.SchedulingPolicySnapshot }{ { - // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + // Should never hit this case unless there is a bug in the controller or customers manually modify the schedulingPolicySnapshot. name: "existing active policy snapshot does not have policyIndex label", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -689,9 +689,9 @@ func TestHandleUpdate_failure(t *testing.T) { }, }, { - // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + // Should never hit this case unless there is a bug in the controller or customers manually modify the schedulingPolicySnapshot. name: "existing active policy snapshot has an invalid policyIndex label", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -712,9 +712,9 @@ func TestHandleUpdate_failure(t *testing.T) { }, }, { - // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + // Should never hit this case unless there is a bug in the controller or customers manually modify the schedulingPolicySnapshot. name: "no active policy snapshot exists and policySnapshot with invalid policyIndex label", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -734,9 +734,9 @@ func TestHandleUpdate_failure(t *testing.T) { }, }, { - // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + // Should never hit this case unless there is a bug in the controller or customers manually modify the schedulingPolicySnapshot. name: "multiple active policy snapshot exist", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -774,9 +774,9 @@ func TestHandleUpdate_failure(t *testing.T) { }, }, { - // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + // Should never hit this case unless there is a bug in the controller or customers manually modify the schedulingPolicySnapshot. name: "no active policy snapshot exists and policySnapshot with invalid policyIndex label (negative value)", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 0), @@ -796,9 +796,9 @@ func TestHandleUpdate_failure(t *testing.T) { }, }, { - // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + // Should never hit this case unless there is a bug in the controller or customers manually modify the schedulingPolicySnapshot. name: "active policy snapshot exists and policySnapshot with invalid numberOfClusters annotation", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), @@ -828,9 +828,9 @@ func TestHandleUpdate_failure(t *testing.T) { }, }, { - // Should never hit this case unless there is a bug in the controller or customers manually modify the clusterPolicySnapshot. + // Should never hit this case unless there is a bug in the controller or customers manually modify the schedulingPolicySnapshot. name: "no active policy snapshot exists and policySnapshot with invalid numberOfClusters annotation (negative)", - policySnapshots: []fleetv1beta1.ClusterPolicySnapshot{ + policySnapshots: []fleetv1beta1.SchedulingPolicySnapshot{ { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(fleetv1beta1.PolicySnapshotNameFmt, testName, 1), diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index 47679a620..6c52e0621 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -49,7 +49,7 @@ type Framework interface { Handle // RunSchedulerCycleFor performs scheduling for a policy snapshot. - RunSchedulingCycleFor(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, resources *fleetv1beta1.ClusterResourceSnapshot) (result ctrl.Result, err error) + RunSchedulingCycleFor(ctx context.Context, crpName string, policy *fleetv1beta1.SchedulingPolicySnapshot, resources *fleetv1beta1.ClusterResourceSnapshot) (result ctrl.Result, err error) } // framework implements the Framework interface. @@ -150,25 +150,25 @@ func (f *framework) EventRecorder() record.EventRecorder { } // RunSchedulingCycleFor performs scheduling for a policy snapshot. -func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, resources *fleetv1beta1.ClusterResourceSnapshot) (result ctrl.Result, err error) { //nolint:revive +func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, policy *fleetv1beta1.SchedulingPolicySnapshot, resources *fleetv1beta1.ClusterResourceSnapshot) (result ctrl.Result, err error) { //nolint:revive startTime := time.Now() - clusterPolicySnapshotRef := klog.KObj(policy) - klog.V(2).InfoS("Scheduling cycle starts", "clusterPolicySnapshot", clusterPolicySnapshotRef) + schedulingPolicySnapshotRef := klog.KObj(policy) + klog.V(2).InfoS("Scheduling cycle starts", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) defer func() { latency := time.Since(startTime).Milliseconds() - klog.V(2).InfoS("Scheduling cycle ends", "clusterPolicySnapshot", clusterPolicySnapshotRef, "latency", latency) + klog.V(2).InfoS("Scheduling cycle ends", "schedulingPolicySnapshot", schedulingPolicySnapshotRef, "latency", latency) }() errorMessage := "Failed to run scheduling cycle" - klog.V(2).InfoS("Retrieving clusters and bindings", "clusterPolicySnapshot", clusterPolicySnapshotRef) + klog.V(2).InfoS("Retrieving clusters and bindings", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) // Retrieve the desired number of clusters from the policy. // // TO-DO (chenyu1): assign variable(s) when more logic is added. _, err = utils.ExtractNumOfClustersFromPolicySnapshot(policy) if err != nil { - klog.ErrorS(err, errorMessage, "clusterPolicySnapshot", clusterPolicySnapshotRef) + klog.ErrorS(err, errorMessage, "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -181,7 +181,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1be // TO-DO (chenyu1): assign variable(s) when more logic is added. _, err = f.collectClusters(ctx) if err != nil { - klog.ErrorS(err, errorMessage, clusterPolicySnapshotRef) + klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -200,30 +200,29 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1be // TO-DO (chenyu1): explore the possbilities of using a mutation cache for better performance. bindings, err := f.collectBindings(ctx, policy) if err != nil { - klog.ErrorS(err, errorMessage, clusterPolicySnapshotRef) + klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) return ctrl.Result{}, err } // Parse the bindings, find out - // * active bindings, i.e., bindings that are not marked for deletion; and - // * bindings that are already marked for deletion, but still have the dispatcher finalizer - // present; - // * bindings that are already marked for deletion, and no longer have the dispatcher finalizer + // * active bindings, i.e., bindings that should be picked up by the dispatcher; and + // * creating bindings with associated target clusters; and + // * creating bindings without associated target clusters; and + // * bindings that are already marked for deletion // - // Note that the scheduler only considers a binding to be deleted if it is marked for deletion - // and it no longer has the dispatcher finalizer. This helps avoid a rare racing condition - // where the scheduler binds a cluster to a resource placement, even though the dispatcher has - // not started, or is still in the process of, removing the same set of resources from - // the cluster, triggered by a recently deleted binding. + // Note that the scheduler does not perform deletions at all, and all the bindings it creates + // are initially not ready for fulfillment, i.e., the dispatcher will not place resources + // to the target cluster. As a result it is safe for the scheduler to disregard deleted clusters + // completely in a scheduling cycle (aside for finalizer cleanup). // - // TO-DO (chenyu1): assign variable(s) when more logic is added. - klog.V(2).InfoS("Classifying bindings", "clusterPolicySnapshot", clusterPolicySnapshotRef) - _, _, deletedWithoutDispatcherFinalizer := classifyBindings(bindings) + // A similar patterns applies to obsolete bindings as well. + klog.V(2).InfoS("Classifying bindings", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) + _, _, _, deleted, err := classifyBindings(bindings) // If a binding has been marked for deletion and no longer has the dispatcher finalizer, the scheduler // removes its own finalizer from it, to clear it for eventual deletion. - if err := f.removeSchedulerFinalizerFromBindings(ctx, deletedWithoutDispatcherFinalizer); err != nil { - klog.ErrorS(err, errorMessage, clusterPolicySnapshotRef) + if err := f.removeSchedulerFinalizerFromBindings(ctx, deleted); err != nil { + klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -243,7 +242,7 @@ func (f *framework) collectClusters(ctx context.Context) ([]fleetv1beta1.MemberC } // collectBindings lists all bindings associated with a CRP **using the uncached client**. -func (f *framework) collectBindings(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot) ([]fleetv1beta1.ClusterResourceBinding, error) { +func (f *framework) collectBindings(ctx context.Context, policy *fleetv1beta1.SchedulingPolicySnapshot) ([]fleetv1beta1.ClusterResourceBinding, error) { errorFormat := "failed to collect bindings: %w" bindingOwner, err := extractOwnerCRPNameFromPolicySnapshot(policy) @@ -267,9 +266,11 @@ func (f *framework) removeSchedulerFinalizerFromBindings(ctx context.Context, bi errorFormat := "failed to remove scheduler finalizer from binding %s: %w" for _, binding := range bindings { - controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerFinalizer) - if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil { - return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err)) + if controllerutil.ContainsFinalizer(binding, fleetv1beta1.SchedulerFinalizer) { + controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerFinalizer) + if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil { + return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err)) + } } } return nil diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index 7fde76b58..ea08add9e 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -73,12 +73,12 @@ func TestCollectClusters(t *testing.T) { func TestExtractOwnerCRPNameFromPolicySnapshot(t *testing.T) { testCases := []struct { name string - policy *fleetv1beta1.ClusterPolicySnapshot + policy *fleetv1beta1.SchedulingPolicySnapshot expectToFail bool }{ { name: "policy with CRP owner reference", - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, OwnerReferences: []metav1.OwnerReference{ @@ -92,7 +92,7 @@ func TestExtractOwnerCRPNameFromPolicySnapshot(t *testing.T) { }, { name: "policy without CRP owner reference", - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, OwnerReferences: []metav1.OwnerReference{}, @@ -134,14 +134,14 @@ func TestCollectBindings(t *testing.T) { testCases := []struct { name string binding *fleetv1beta1.ClusterResourceBinding - policy *fleetv1beta1.ClusterPolicySnapshot + policy *fleetv1beta1.SchedulingPolicySnapshot expectToFail bool expectToFindNoBindings bool }{ { name: "found matching bindings", binding: binding, - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, OwnerReferences: []metav1.OwnerReference{ @@ -155,7 +155,7 @@ func TestCollectBindings(t *testing.T) { }, { name: "no owner reference in policy", - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, }, @@ -165,7 +165,7 @@ func TestCollectBindings(t *testing.T) { { name: "no matching bindings", binding: binding, - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, OwnerReferences: []metav1.OwnerReference{ @@ -218,36 +218,63 @@ func TestCollectBindings(t *testing.T) { // TestClassifyBindings tests the classifyBindings function. func TestClassifyBindings(t *testing.T) { timestamp := metav1.Now() + isPresent := "true" activeBinding := &fleetv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: "active-binding", + Labels: map[string]string{ + fleetv1beta1.ActiveBindingLabel: isPresent, + }, }, } - deletedBindingWithDispatcherFinalizer := &fleetv1beta1.ClusterResourceBinding{ + creatingBinding := &fleetv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ - Name: "deleted-binding-with-dispatcher-finalizer", - DeletionTimestamp: ×tamp, - Finalizers: []string{ - fleetv1beta1.DispatcherFinalizer, + Name: "creating-binding", + Labels: map[string]string{ + fleetv1beta1.CreatingBindingLabel: isPresent, }, }, } - deletedBindingWithoutDispatcherFinalizer := &fleetv1beta1.ClusterResourceBinding{ + creatingWithoutTargetClusterBinding := &fleetv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ - Name: "deleted-binding-without-dispatcher-finalizer", + Name: "creating-binding", + Labels: map[string]string{ + fleetv1beta1.CreatingBindingLabel: isPresent, + fleetv1beta1.NoTargetClusterBindingLabel: isPresent, + }, + }, + } + deletedBinding := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-binding", DeletionTimestamp: ×tamp, + Finalizers: []string{ + fleetv1beta1.DispatcherFinalizer, + fleetv1beta1.SchedulerFinalizer, + }, + Labels: map[string]string{ + fleetv1beta1.ObsoleteBindingLabel: isPresent, + }, }, } - active, deletedWith, deletedWithout := classifyBindings([]fleetv1beta1.ClusterResourceBinding{*activeBinding, *deletedBindingWithDispatcherFinalizer, *deletedBindingWithoutDispatcherFinalizer}) - if diff := cmp.Diff(active, []*fleetv1beta1.ClusterResourceBinding{activeBinding}); diff != "" { + active, creating, creatingWithoutTargetCluster, deleted, err := classifyBindings([]fleetv1beta1.ClusterResourceBinding{ + *activeBinding, *deletedBinding, *creatingBinding, *creatingWithoutTargetClusterBinding, + }) + if err != nil { + t.Fatalf("classifyBindings(), got %v, want no error", err) + } + if !cmp.Equal(active, []*fleetv1beta1.ClusterResourceBinding{activeBinding}) { t.Errorf("classifyBindings() active = %v, want %v", active, []*fleetv1beta1.ClusterResourceBinding{activeBinding}) } - if !cmp.Equal(deletedWith, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithDispatcherFinalizer}) { - t.Errorf("classifyBindings() deletedWithDispatcherFinalizer = %v, want %v", deletedWith, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithDispatcherFinalizer}) + if !cmp.Equal(deleted, []*fleetv1beta1.ClusterResourceBinding{deletedBinding}) { + t.Errorf("classifyBindings() deleted = %v, want %v", deleted, []*fleetv1beta1.ClusterResourceBinding{deletedBinding}) + } + if !cmp.Equal(creating, []*fleetv1beta1.ClusterResourceBinding{creatingBinding}) { + t.Errorf("classifyBindings() creating = %v, want %v", creating, []*fleetv1beta1.ClusterResourceBinding{creatingBinding}) } - if !cmp.Equal(deletedWithout, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithoutDispatcherFinalizer}) { - t.Errorf("classifyBindings() deletedWithoutDispatcherFinalizer = %v, want %v", deletedWithout, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithoutDispatcherFinalizer}) + if !cmp.Equal(creatingWithoutTargetCluster, []*fleetv1beta1.ClusterResourceBinding{creatingWithoutTargetClusterBinding}) { + t.Errorf("classifyBindings() creatingWithoutTargetCluster = %v, want %v", creatingWithoutTargetCluster, []*fleetv1beta1.ClusterResourceBinding{creatingWithoutTargetClusterBinding}) } } diff --git a/pkg/scheduler/framework/frameworkutils.go b/pkg/scheduler/framework/frameworkutils.go index 3cb5416d1..43f8ded91 100644 --- a/pkg/scheduler/framework/frameworkutils.go +++ b/pkg/scheduler/framework/frameworkutils.go @@ -8,14 +8,13 @@ package framework import ( "fmt" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/utils" + "go.goms.io/fleet/pkg/utils/controller" ) // extractOwnerCRPNameFromPolicySnapshot extracts the name of the owner CRP from the policy snapshot. -func extractOwnerCRPNameFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (string, error) { +func extractOwnerCRPNameFromPolicySnapshot(policy *fleetv1beta1.SchedulingPolicySnapshot) (string, error) { var owner string for _, ownerRef := range policy.OwnerReferences { if ownerRef.Kind == utils.CRPV1Beta1GVK.Kind { @@ -29,28 +28,49 @@ func extractOwnerCRPNameFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySna return owner, nil } -// classifyBindings categorizes bindings into three groups: -// * active: active bindings, that is, bindings that are not marked for deletion; and -// * deletedWithDispatcherFinalizer: bindings that are marked for deletion, but still has the dispatcher finalizer present; and -// * deletedWithoutDispatcherFinalizer: bindings that are marked for deletion, and the dispatcher finalizer is already removed. -func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding) (active, deletedWithDispatcherFinalizer, deletedWithoutDispatcherFinalizer []*fleetv1beta1.ClusterResourceBinding) { +// classifyBindings categorizes bindings into the following groups: +// - active: active bindings, that is, bindings that should be (though may not necessarily have been) picked up by the dispatcher; +// - creating: bindings that are being created, that is, bindings that should not be picked up by the dispatcher yet, +// but have a target cluster associated; +// - creatingWithoutTargetCluster: bindings that are being created, that is bindings that should not picked up by the dispatcher yet, +// and do not have a target cluster assigned. +// - deleted: bindings that are marked for deletion. +func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding) (active, creating, creatingWithoutTargetCluster, deleted []*fleetv1beta1.ClusterResourceBinding, err error) { // Pre-allocate arrays. active = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) - deletedWithDispatcherFinalizer = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) - deletedWithoutDispatcherFinalizer = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + creating = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + creatingWithoutTargetCluster = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + deleted = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) for idx := range bindings { binding := bindings[idx] - if binding.DeletionTimestamp != nil { - if controllerutil.ContainsFinalizer(&binding, fleetv1beta1.DispatcherFinalizer) { - deletedWithDispatcherFinalizer = append(deletedWithDispatcherFinalizer, &binding) - } else { - deletedWithoutDispatcherFinalizer = append(deletedWithoutDispatcherFinalizer, &binding) - } - } else { + + _, activeLabelExists := binding.Labels[fleetv1beta1.ActiveBindingLabel] + _, creatingLabelExists := binding.Labels[fleetv1beta1.CreatingBindingLabel] + _, obsoleteLabelExists := binding.Labels[fleetv1beta1.ObsoleteBindingLabel] + _, noTargetClusterLabelExists := binding.Labels[fleetv1beta1.NoTargetClusterBindingLabel] + + // Note that this utility assumes that label mutual-exclusivity is always enforced. + switch { + case binding.DeletionTimestamp != nil: + deleted = append(deleted, &binding) + case activeLabelExists: active = append(active, &binding) + case creatingLabelExists && !noTargetClusterLabelExists: + creating = append(creating, &binding) + case creatingLabelExists && noTargetClusterLabelExists: + creatingWithoutTargetCluster = append(creatingWithoutTargetCluster, &binding) + case obsoleteLabelExists: + // Do nothing. + // + // The scheduler cares not for obsolete bindings. + default: + // Normally this branch is never run, as a binding that has not been marked for deletion is + // always of one of the following states: + // active, creating, and obsolete + return nil, nil, nil, nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to classify bindings: an binding %s has no desired labels", binding.Name)) } } - return active, deletedWithDispatcherFinalizer, deletedWithoutDispatcherFinalizer + return active, creating, creatingWithoutTargetCluster, deleted, nil } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 12173f463..eec7269b2 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -30,7 +30,7 @@ type PostBatchPlugin interface { // * A Success status with a new batch size; or // * A Skip status, if no changes in batch size is needed; or // * An InternalError status, if an expected error has occurred - PostBatch(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot) (size int, status *Status) + PostBatch(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot) (size int, status *Status) } // PreFilterPlugin is the interface which all plugins that would like to run at the PreFilter @@ -47,7 +47,7 @@ type PreFilterPlugin interface { // * A Success status, if the plugin should run at the Filter stage; or // * A Skip status, if the plugin should be skipped at the Filter stage; or // * An InternalError status, if an expected error has occurred - PreFilter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot) (status *Status) + PreFilter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot) (status *Status) } // FilterPlugin is the interface which all plugins that would like to run at the Filter @@ -60,7 +60,7 @@ type FilterPlugin interface { // * A Success status, if the placement can be bound to the cluster; or // * A ClusterUnschedulable status, if the placement cannot be bound to the cluster; or // * An InternalError status, if an expected error has occurred - Filter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) + Filter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) } // PreScorePlugin is the interface which all plugins that would like to run at the PreScore @@ -77,7 +77,7 @@ type PreScorePlugin interface { // * A Success status, if the plugin should run at the Score stage; or // * A Skip status, if the plugin should be skipped at the Score stage; or // * An InternalError status, if an expected error has occurred - PreScore(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot) (status *Status) + PreScore(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot) (status *Status) } // ScorePlugin is the interface which all plugins that would like to run at the Score @@ -89,5 +89,5 @@ type ScorePlugin interface { // A plugin which registers at this extension point must return one of the follows: // * A Success status, with the score for the cluster; or // * An InternalError status, if an expected error has occurred - Score(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (score *ClusterScore, status *Status) + Score(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (score *ClusterScore, status *Status) } diff --git a/pkg/scheduler/framework/profile_test.go b/pkg/scheduler/framework/profile_test.go index 3d82b681e..046a7ead9 100644 --- a/pkg/scheduler/framework/profile_test.go +++ b/pkg/scheduler/framework/profile_test.go @@ -28,27 +28,27 @@ func (p *DummyAllPurposePlugin) Name() string { } // PostBatch implements the PostBatch interface for the dummy plugin. -func (p *DummyAllPurposePlugin) PostBatch(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot) (size int, status *Status) { //nolint:revive +func (p *DummyAllPurposePlugin) PostBatch(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot) (size int, status *Status) { //nolint:revive return 1, nil } // PreFilter implements the PreFilter interface for the dummy plugin. -func (p *DummyAllPurposePlugin) PreFilter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot) (status *Status) { //nolint:revive +func (p *DummyAllPurposePlugin) PreFilter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot) (status *Status) { //nolint:revive return nil } // Filter implements the Filter interface for the dummy plugin. -func (p *DummyAllPurposePlugin) Filter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { //nolint:revive +func (p *DummyAllPurposePlugin) Filter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { //nolint:revive return nil } // PreScore implements the PreScore interface for the dummy plugin. -func (p *DummyAllPurposePlugin) PreScore(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot) (status *Status) { //nolint:revive +func (p *DummyAllPurposePlugin) PreScore(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot) (status *Status) { //nolint:revive return nil } // Score implements the Score interface for the dummy plugin. -func (p *DummyAllPurposePlugin) Score(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (score *ClusterScore, status *Status) { //nolint:revive +func (p *DummyAllPurposePlugin) Score(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.SchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (score *ClusterScore, status *Status) { //nolint:revive return &ClusterScore{}, nil } diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index f06dfaad4..56874f8eb 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -11,18 +11,18 @@ import ( "k8s.io/client-go/util/workqueue" ) -// ClusterPolicySnapshotKey is the unique identifier (its name) for a ClusterPolicySnapshot stored in a scheduling queue. -type ClusterPolicySnapshotKey string +// SchedulingPolicySnapshotKey is the unique identifier (its name) for a SchedulingPolicySnapshot stored in a scheduling queue. +type SchedulingPolicySnapshotKey string -// ClusterPolicySnapshotKeySchedulingQueueWriter is an interface which allows sources, such as controllers, to add -// ClusterPolicySnapshots to the scheduling queue. -type ClusterPolicySnapshotKeySchedulingQueueWriter interface { - Add(cpsKey ClusterPolicySnapshotKey) +// SchedulingPolicySnapshotKeySchedulingQueueWriter is an interface which allows sources, such as controllers, to add +// SchedulingPolicySnapshots to the scheduling queue. +type SchedulingPolicySnapshotKeySchedulingQueueWriter interface { + Add(cpsKey SchedulingPolicySnapshotKey) } -// ClusterPolicySnapshotSchedulingQueue is an interface which queues ClusterPolicySnapshots for the scheduler to schedule. -type ClusterPolicySnapshotKeySchedulingQueue interface { - ClusterPolicySnapshotKeySchedulingQueueWriter +// SchedulingPolicySnapshotSchedulingQueue is an interface which queues SchedulingPolicySnapshots for the scheduler to schedule. +type SchedulingPolicySnapshotKeySchedulingQueue interface { + SchedulingPolicySnapshotKeySchedulingQueueWriter // Run starts the scheduling queue. Run() @@ -30,52 +30,52 @@ type ClusterPolicySnapshotKeySchedulingQueue interface { Close() // CloseWithDrain closes the scheduling queue after all items in the queue are processed. CloseWithDrain() - // NextClusterPolicySnapshotKey returns the next-in-line ClusterPolicySnapshot key for the scheduler to schedule. - NextClusterPolicySnapshotKey() (key ClusterPolicySnapshotKey, closed bool) - // Done marks a ClusterPolicySnapshot key as done. - Done(cpsKey ClusterPolicySnapshotKey) + // NextSchedulingPolicySnapshotKey returns the next-in-line SchedulingPolicySnapshot key for the scheduler to schedule. + NextSchedulingPolicySnapshotKey() (key SchedulingPolicySnapshotKey, closed bool) + // Done marks a SchedulingPolicySnapshot key as done. + Done(cpsKey SchedulingPolicySnapshotKey) } -// simpleClusterPolicySnapshotKeySchedulingQueue is a simple implementation of -// ClusterPolicySnapshotKeySchedulingQueue. +// simpleSchedulingPolicySnapshotKeySchedulingQueue is a simple implementation of +// SchedulingPolicySnapshotKeySchedulingQueue. // // At this moment, one single workqueue would suffice, as sources such as the cluster watcher, // the binding watcher, etc., can catch all changes that need the scheduler's attention. // In the future, when more features, e.g., inter-placement affinity/anti-affinity, are added, // more queues, such as a backoff queue, might become necessary. -type simpleClusterPolicySnapshotKeySchedulingQueue struct { +type simpleSchedulingPolicySnapshotKeySchedulingQueue struct { clusterPolicySanpshotWorkQueue workqueue.RateLimitingInterface } -// Verify that simpleClusterPolicySnapshotKeySchedulingQueue implements -// ClusterPolicySnapshotKeySchedulingQueue at compile time. -var _ ClusterPolicySnapshotKeySchedulingQueue = &simpleClusterPolicySnapshotKeySchedulingQueue{} +// Verify that simpleSchedulingPolicySnapshotKeySchedulingQueue implements +// SchedulingPolicySnapshotKeySchedulingQueue at compile time. +var _ SchedulingPolicySnapshotKeySchedulingQueue = &simpleSchedulingPolicySnapshotKeySchedulingQueue{} -// simpleClusterPolicySnapshotKeySchedulingQueueOptions are the options for the -// simpleClusterPolicySnapshotKeySchedulingQueue. -type simpleClusterPolicySnapshotKeySchedulingQueueOptions struct { +// simpleSchedulingPolicySnapshotKeySchedulingQueueOptions are the options for the +// simpleSchedulingPolicySnapshotKeySchedulingQueue. +type simpleSchedulingPolicySnapshotKeySchedulingQueueOptions struct { workqueueRateLimiter workqueue.RateLimiter workqueueName string } -// Option is the function that configures the simpleClusterPolicySnapshotKeySchedulingQueue. -type Option func(*simpleClusterPolicySnapshotKeySchedulingQueueOptions) +// Option is the function that configures the simpleSchedulingPolicySnapshotKeySchedulingQueue. +type Option func(*simpleSchedulingPolicySnapshotKeySchedulingQueueOptions) -var defaultSimpleClusterPolicySnapshotKeySchedulingQueueOptions = simpleClusterPolicySnapshotKeySchedulingQueueOptions{ +var defaultSimpleSchedulingPolicySnapshotKeySchedulingQueueOptions = simpleSchedulingPolicySnapshotKeySchedulingQueueOptions{ workqueueRateLimiter: workqueue.DefaultControllerRateLimiter(), - workqueueName: "clusterPolicySnapshotKeySchedulingQueue", + workqueueName: "schedulingPolicySnapshotKeySchedulingQueue", } // WithWorkqueueRateLimiter sets a rate limiter for the workqueue. func WithWorkqueueRateLimiter(rateLimiter workqueue.RateLimiter) Option { - return func(o *simpleClusterPolicySnapshotKeySchedulingQueueOptions) { + return func(o *simpleSchedulingPolicySnapshotKeySchedulingQueueOptions) { o.workqueueRateLimiter = rateLimiter } } // WithWorkqueueName sets a name for the workqueue. func WithWorkqueueName(name string) Option { - return func(o *simpleClusterPolicySnapshotKeySchedulingQueueOptions) { + return func(o *simpleSchedulingPolicySnapshotKeySchedulingQueueOptions) { o.workqueueName = name } } @@ -85,52 +85,52 @@ func WithWorkqueueName(name string) Option { // At this moment, Run is an no-op as there is only one queue present; in the future, // when more queues are added, Run would start goroutines that move items between queues as // appropriate. -func (sq *simpleClusterPolicySnapshotKeySchedulingQueue) Run() {} +func (sq *simpleSchedulingPolicySnapshotKeySchedulingQueue) Run() {} // Close shuts down the scheduling queue immediately. -func (sq *simpleClusterPolicySnapshotKeySchedulingQueue) Close() { +func (sq *simpleSchedulingPolicySnapshotKeySchedulingQueue) Close() { sq.clusterPolicySanpshotWorkQueue.ShutDown() } // CloseWithDrain shuts down the scheduling queue and returns until all items are processed. -func (sq *simpleClusterPolicySnapshotKeySchedulingQueue) CloseWithDrain() { +func (sq *simpleSchedulingPolicySnapshotKeySchedulingQueue) CloseWithDrain() { sq.clusterPolicySanpshotWorkQueue.ShutDownWithDrain() } -// NextClusterPolicySnapshotKey returns the next ClusterPolicySnapshot key in the work queue for +// NextSchedulingPolicySnapshotKey returns the next SchedulingPolicySnapshot key in the work queue for // the scheduler to process. // // Note that for now the queue simply wraps a work queue, and consider its state (whether it // is shut down or not) as its own closedness. In the future, when more queues are added, the // queue implementation must manage its own state. -func (sq *simpleClusterPolicySnapshotKeySchedulingQueue) NextClusterPolicySnapshotKey() (key ClusterPolicySnapshotKey, closed bool) { +func (sq *simpleSchedulingPolicySnapshotKeySchedulingQueue) NextSchedulingPolicySnapshotKey() (key SchedulingPolicySnapshotKey, closed bool) { // This will block on a condition variable if the queue is empty. cpsKey, shutdown := sq.clusterPolicySanpshotWorkQueue.Get() if shutdown { return "", true } - return cpsKey.(ClusterPolicySnapshotKey), false + return cpsKey.(SchedulingPolicySnapshotKey), false } -// Done marks a ClusterPolicySnapshot key as done. -func (sq *simpleClusterPolicySnapshotKeySchedulingQueue) Done(cpsKey ClusterPolicySnapshotKey) { +// Done marks a SchedulingPolicySnapshot key as done. +func (sq *simpleSchedulingPolicySnapshotKeySchedulingQueue) Done(cpsKey SchedulingPolicySnapshotKey) { sq.clusterPolicySanpshotWorkQueue.Done(cpsKey) } -// Add adds a ClusterPolicySnapshot key to the work queue. -func (sq *simpleClusterPolicySnapshotKeySchedulingQueue) Add(cpsKey ClusterPolicySnapshotKey) { +// Add adds a SchedulingPolicySnapshot key to the work queue. +func (sq *simpleSchedulingPolicySnapshotKeySchedulingQueue) Add(cpsKey SchedulingPolicySnapshotKey) { sq.clusterPolicySanpshotWorkQueue.Add(cpsKey) } -// NewSimpleClusterPolicySnapshotKeySchedulingQueue returns a -// simpleClusterPolicySnapshotKeySchedulingQueue. -func NewSimpleClusterPolicySnapshotKeySchedulingQueue(opts ...Option) ClusterPolicySnapshotKeySchedulingQueue { - options := defaultSimpleClusterPolicySnapshotKeySchedulingQueueOptions +// NewSimpleSchedulingPolicySnapshotKeySchedulingQueue returns a +// simpleSchedulingPolicySnapshotKeySchedulingQueue. +func NewSimpleSchedulingPolicySnapshotKeySchedulingQueue(opts ...Option) SchedulingPolicySnapshotKeySchedulingQueue { + options := defaultSimpleSchedulingPolicySnapshotKeySchedulingQueueOptions for _, opt := range opts { opt(&options) } - return &simpleClusterPolicySnapshotKeySchedulingQueue{ + return &simpleSchedulingPolicySnapshotKeySchedulingQueue{ clusterPolicySanpshotWorkQueue: workqueue.NewNamedRateLimitingQueue(options.workqueueRateLimiter, options.workqueueName), } } diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index e087b8fdc..ba854f938 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -11,20 +11,20 @@ import ( "github.com/google/go-cmp/cmp" ) -// TestSimpleClusterPolicySnapshotKeySchedulingQueueBasicOps tests the basic ops -// (Add, NextClusterPolicySnapshotKey, Done) of a simplePolicySnapshotKeySchedulingQueue. -func TestSimpleClusterPolicySnapshotKeySchedulingQueueBasicOps(t *testing.T) { - sq := NewSimpleClusterPolicySnapshotKeySchedulingQueue() +// TestSimpleSchedulingPolicySnapshotKeySchedulingQueueBasicOps tests the basic ops +// (Add, NextSchedulingPolicySnapshotKey, Done) of a simplePolicySnapshotKeySchedulingQueue. +func TestSimpleSchedulingPolicySnapshotKeySchedulingQueueBasicOps(t *testing.T) { + sq := NewSimpleSchedulingPolicySnapshotKeySchedulingQueue() sq.Run() - keysToAdd := []ClusterPolicySnapshotKey{"A", "B", "C", "D", "E"} + keysToAdd := []SchedulingPolicySnapshotKey{"A", "B", "C", "D", "E"} for _, key := range keysToAdd { sq.Add(key) } - keysRecved := []ClusterPolicySnapshotKey{} + keysRecved := []SchedulingPolicySnapshotKey{} for i := 0; i < len(keysToAdd); i++ { - key, closed := sq.NextClusterPolicySnapshotKey() + key, closed := sq.NextSchedulingPolicySnapshotKey() if closed { t.Fatalf("Queue closed unexpected") } diff --git a/pkg/utils/common.go b/pkg/utils/common.go index 705e715cc..81b039bea 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -61,14 +61,6 @@ const ( // MemberClusterFinalizer is used to make sure that we handle gc of all the member cluster resources on the hub cluster. MemberClusterFinalizer = "work.fleet.azure.com/membercluster-finalizer" - // DispatcherFinalizer is added by the dispatcher to make sure that a binding can only be deleted if the dispatcher - // has removed all selected resources from the bound cluster. - DispatcherFinalizer = "fleet.io/dispatcher-cleanup" - - // SchedulerFinalizer is added by the scheduler to make sure that a binding can only be deleted if the scheduler - // has relieved it from scheduling consideration. - SchedulerFinalizer = "fleet.io/scheduler-cleanup" - // LastWorkUpdateTimeAnnotationKey is used to mark the last update time on a work object. LastWorkUpdateTimeAnnotationKey = "work.fleet.azure.com/last-update-time" ) @@ -278,7 +270,7 @@ func ShouldPropagateNamespace(namespace string, skippedNamespaces map[string]boo // ExtractNumOfClustersFromPolicySnapshot extracts the numOfClusters value from the annotations // on a policy snapshot. -func ExtractNumOfClustersFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (int, error) { +func ExtractNumOfClustersFromPolicySnapshot(policy *fleetv1beta1.SchedulingPolicySnapshot) (int, error) { numOfClustersStr, ok := policy.Annotations[fleetv1beta1.NumberOfClustersAnnotation] if !ok { return 0, fmt.Errorf("cannot find annotation %s", fleetv1beta1.NumberOfClustersAnnotation) diff --git a/pkg/utils/common_test.go b/pkg/utils/common_test.go index 3c5389564..cbdca11a1 100644 --- a/pkg/utils/common_test.go +++ b/pkg/utils/common_test.go @@ -19,13 +19,13 @@ func TestExtractNumOfClustersFromPolicySnapshot(t *testing.T) { testCases := []struct { name string - policy *fleetv1beta1.ClusterPolicySnapshot + policy *fleetv1beta1.SchedulingPolicySnapshot wantNumOfClusters int expectedToFail bool }{ { name: "valid annotation", - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, Annotations: map[string]string{ @@ -37,7 +37,7 @@ func TestExtractNumOfClustersFromPolicySnapshot(t *testing.T) { }, { name: "no annotation", - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, }, @@ -46,7 +46,7 @@ func TestExtractNumOfClustersFromPolicySnapshot(t *testing.T) { }, { name: "invalid annotation: not an integer", - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, Annotations: map[string]string{ @@ -58,7 +58,7 @@ func TestExtractNumOfClustersFromPolicySnapshot(t *testing.T) { }, { name: "invalid annotation: negative integer", - policy: &fleetv1beta1.ClusterPolicySnapshot{ + policy: &fleetv1beta1.SchedulingPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, Annotations: map[string]string{ From 90d2ec1ff7668f60f41b7d3de3a5801dff228e98 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Wed, 21 Jun 2023 17:47:52 +0800 Subject: [PATCH 4/4] Minor fixes --- pkg/scheduler/framework/framework.go | 13 +--- pkg/scheduler/framework/framework_test.go | 93 ++--------------------- pkg/scheduler/framework/frameworkutils.go | 16 ---- 3 files changed, 9 insertions(+), 113 deletions(-) diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index 6c52e0621..a1a6336fc 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -198,7 +198,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // overloading). In the long run we might still want to resort to a cached situtation. // // TO-DO (chenyu1): explore the possbilities of using a mutation cache for better performance. - bindings, err := f.collectBindings(ctx, policy) + bindings, err := f.collectBindings(ctx, crpName) if err != nil { klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) return ctrl.Result{}, err @@ -242,18 +242,11 @@ func (f *framework) collectClusters(ctx context.Context) ([]fleetv1beta1.MemberC } // collectBindings lists all bindings associated with a CRP **using the uncached client**. -func (f *framework) collectBindings(ctx context.Context, policy *fleetv1beta1.SchedulingPolicySnapshot) ([]fleetv1beta1.ClusterResourceBinding, error) { +func (f *framework) collectBindings(ctx context.Context, crpName string) ([]fleetv1beta1.ClusterResourceBinding, error) { errorFormat := "failed to collect bindings: %w" - bindingOwner, err := extractOwnerCRPNameFromPolicySnapshot(policy) - if err != nil { - // This branch should never run in most cases, as the a policy snapshot is expected to be - // owned by a CRP. - return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf(errorFormat, err)) - } - bindingList := &fleetv1beta1.ClusterResourceBindingList{} - labelSelector := labels.SelectorFromSet(labels.Set{fleetv1beta1.CRPTrackingLabel: bindingOwner}) + labelSelector := labels.SelectorFromSet(labels.Set{fleetv1beta1.CRPTrackingLabel: crpName}) // List bindings directly from the API server. if err := f.uncachedReader.List(ctx, bindingList, &client.ListOptions{LabelSelector: labelSelector}); err != nil { return nil, controller.NewAPIServerError(fmt.Errorf(errorFormat, err)) diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index ea08add9e..ea27d668c 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -19,12 +19,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" - "go.goms.io/fleet/pkg/utils" ) const ( CRPName = "test-placement" - policyName = "test-policy" bindingName = "test-binding" ) @@ -69,56 +67,6 @@ func TestCollectClusters(t *testing.T) { } } -// TestExtractOwnerCRPNameFromPolicySnapshot tests the extractOwnerCRPNameFromPolicySnapshot method. -func TestExtractOwnerCRPNameFromPolicySnapshot(t *testing.T) { - testCases := []struct { - name string - policy *fleetv1beta1.SchedulingPolicySnapshot - expectToFail bool - }{ - { - name: "policy with CRP owner reference", - policy: &fleetv1beta1.SchedulingPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - OwnerReferences: []metav1.OwnerReference{ - { - Kind: utils.CRPV1Beta1GVK.Kind, - Name: CRPName, - }, - }, - }, - }, - }, - { - name: "policy without CRP owner reference", - policy: &fleetv1beta1.SchedulingPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - OwnerReferences: []metav1.OwnerReference{}, - }, - }, - expectToFail: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - owner, err := extractOwnerCRPNameFromPolicySnapshot(tc.policy) - if tc.expectToFail { - if err == nil { - t.Fatalf("extractOwnerCRPNameFromPolicySnapshot() = %v, %v, want cannot find owner ref error", owner, err) - } - return - } - - if err != nil || owner != CRPName { - t.Fatalf("extractOwnerCRPNameFromPolicySnapshot() = %v, %v, want %v, no error", owner, err, CRPName) - } - }) - } -} - // TestCollectBindings tests the collectBindings method. func TestCollectBindings(t *testing.T) { binding := &fleetv1beta1.ClusterResourceBinding{ @@ -134,48 +82,19 @@ func TestCollectBindings(t *testing.T) { testCases := []struct { name string binding *fleetv1beta1.ClusterResourceBinding - policy *fleetv1beta1.SchedulingPolicySnapshot + crpName string expectToFail bool expectToFindNoBindings bool }{ { name: "found matching bindings", binding: binding, - policy: &fleetv1beta1.SchedulingPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - OwnerReferences: []metav1.OwnerReference{ - { - Kind: utils.CRPV1Beta1GVK.Kind, - Name: CRPName, - }, - }, - }, - }, + crpName: CRPName, }, { - name: "no owner reference in policy", - policy: &fleetv1beta1.SchedulingPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - }, - }, - expectToFail: true, - }, - { - name: "no matching bindings", - binding: binding, - policy: &fleetv1beta1.SchedulingPolicySnapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: policyName, - OwnerReferences: []metav1.OwnerReference{ - { - Kind: utils.CRPV1Beta1GVK.Kind, - Name: altCRPName, - }, - }, - }, - }, + name: "no matching bindings", + binding: binding, + crpName: altCRPName, expectToFindNoBindings: true, }, } @@ -193,7 +112,7 @@ func TestCollectBindings(t *testing.T) { } ctx := context.Background() - bindings, err := f.collectBindings(ctx, tc.policy) + bindings, err := f.collectBindings(ctx, tc.crpName) if tc.expectToFail { if err == nil { t.Fatalf("collectBindings() = %v, %v, want failed to collect bindings error", bindings, err) diff --git a/pkg/scheduler/framework/frameworkutils.go b/pkg/scheduler/framework/frameworkutils.go index 43f8ded91..39c32a3fa 100644 --- a/pkg/scheduler/framework/frameworkutils.go +++ b/pkg/scheduler/framework/frameworkutils.go @@ -9,25 +9,9 @@ import ( "fmt" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" - "go.goms.io/fleet/pkg/utils" "go.goms.io/fleet/pkg/utils/controller" ) -// extractOwnerCRPNameFromPolicySnapshot extracts the name of the owner CRP from the policy snapshot. -func extractOwnerCRPNameFromPolicySnapshot(policy *fleetv1beta1.SchedulingPolicySnapshot) (string, error) { - var owner string - for _, ownerRef := range policy.OwnerReferences { - if ownerRef.Kind == utils.CRPV1Beta1GVK.Kind { - owner = ownerRef.Name - break - } - } - if len(owner) == 0 { - return "", fmt.Errorf("cannot find owner reference for policy snapshot %v", policy.Name) - } - return owner, nil -} - // classifyBindings categorizes bindings into the following groups: // - active: active bindings, that is, bindings that should be (though may not necessarily have been) picked up by the dispatcher; // - creating: bindings that are being created, that is, bindings that should not be picked up by the dispatcher yet,