diff --git a/apis/placement/v1beta1/policysnapshot_types.go b/apis/placement/v1beta1/policysnapshot_types.go index 07eb2e597..6046555cf 100644 --- a/apis/placement/v1beta1/policysnapshot_types.go +++ b/apis/placement/v1beta1/policysnapshot_types.go @@ -16,6 +16,11 @@ const ( // PolicySnapshotNameFmt is clusterPolicySnapshot name format: {CRPName}-{PolicySnapshotIndex}. PolicySnapshotNameFmt = "%s-%d" + + // NumOfClustersAnnotation is an annotation that indicates the desired number of clusters where + // the selected resources should be placed. It is annotated on policy snapshots and is sync'd + // from the CRP to the currently active policy snapshot. + NumOfClustersAnnotation = fleetPrefix + "numOfClusters" ) // +genclient diff --git a/apis/placement/v1beta1/zz_generated.deepcopy.go b/apis/placement/v1beta1/zz_generated.deepcopy.go index eb12e1242..d78c05e29 100644 --- a/apis/placement/v1beta1/zz_generated.deepcopy.go +++ b/apis/placement/v1beta1/zz_generated.deepcopy.go @@ -12,7 +12,7 @@ package v1beta1 import ( corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go new file mode 100644 index 000000000..0e22a8b9a --- /dev/null +++ b/pkg/scheduler/framework/framework.go @@ -0,0 +1,276 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +// Package framework features the scheduler framework, which the scheduler runs to schedule +// a placement to most appropriate clusters. +package framework + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/scheduler/framework/parallelizer" + "go.goms.io/fleet/pkg/utils" + "go.goms.io/fleet/pkg/utils/controller" +) + +const ( + // eventRecorderNameTemplate is the template used to format event recorder name for a scheduler framework. + eventRecorderNameTemplate = "scheduler-framework-%s" +) + +// Handle is an interface which allows plugins to access some shared structs (e.g., client, manager) +// and set themselves up with the scheduler framework (e.g., sign up for an informer). +type Handle interface { + // Client returns a cached client. + Client() client.Client + // Manager returns a controller manager; this is mostly used for setting up a new informer + // (indirectly) via a reconciler. + Manager() ctrl.Manager + // UncachedReader returns an uncached read-only client, which allows direct (uncached) access to the API server. + UncachedReader() client.Reader + // EventRecorder returns an event recorder. + EventRecorder() record.EventRecorder +} + +// Framework is an interface which scheduler framework should implement. +type Framework interface { + Handle + + // RunSchedulerCycleFor performs scheduling for a policy snapshot. + RunSchedulingCycleFor(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, resources *fleetv1beta1.ClusterResourceSnapshot) (result ctrl.Result, err error) +} + +// framework implements the Framework interface. +type framework struct { + // profile is the scheduling profile in use by the scheduler framework; it includes + // the plugins to run at each extension point. + profile *Profile + + // client is the (cached) client in use by the scheduler framework for accessing Kubernetes API server. + client client.Client + // uncachedReader is the uncached read-only client in use by the scheduler framework for accessing + // Kubernetes API server; in most cases client should be used instead, unless consistency becomes + // a serious concern. + // TO-DO (chenyu1): explore the possbilities of using a mutation cache for better performance. + uncachedReader client.Reader + // manager is the controller manager in use by the scheduler framework. + manager ctrl.Manager + // eventRecorder is the event recorder in use by the scheduler framework. + eventRecorder record.EventRecorder + + // parallelizer is a utility which helps run tasks in parallel. + parallelizer *parallelizer.Parallerlizer +} + +var ( + // Verify that framework implements Framework (and consequently, Handle). + _ Framework = &framework{} +) + +// frameworkOptions is the options for a scheduler framework. +type frameworkOptions struct { + // numOfWorkers is the number of workers the scheduler framework will use to parallelize tasks, + // e.g., calling plugins. + numOfWorkers int +} + +// Option is the function for configuring a scheduler framework. +type Option func(*frameworkOptions) + +// defaultFrameworkOptions is the default options for a scheduler framework. +var defaultFrameworkOptions = frameworkOptions{numOfWorkers: parallelizer.DefaultNumOfWorkers} + +// WithNumOfWorkers sets the number of workers to use for a scheduler framework. +func WithNumOfWorkers(numOfWorkers int) Option { + return func(fo *frameworkOptions) { + fo.numOfWorkers = numOfWorkers + } +} + +// NewFramework returns a new scheduler framework. +func NewFramework(profile *Profile, manager ctrl.Manager, opts ...Option) Framework { + options := defaultFrameworkOptions + for _, opt := range opts { + opt(&options) + } + + // In principle, the scheduler needs to set up informers for resources it is interested in, + // primarily clusters, snapshots, and bindings. In our current architecture, however, + // some (if not all) of the informers may have already been set up by other controllers + // sharing the same controller manager, e.g. cluster watcher. Therefore, here no additional + // informers are explicitly set up. + // + // Note that setting up an informer is achieved by setting up an no-op (at this moment) + // reconciler, as it does not seem to possible to directly manipulate the informers (cache) in + // use by a controller runtime manager via public API. In the long run, the reconciles might + // be useful for setting up some common states for the scheduler, e.g., a resource model. + // + // Also note that an indexer might need to be set up for improved performance. + + return &framework{ + profile: profile, + client: manager.GetClient(), + uncachedReader: manager.GetAPIReader(), + manager: manager, + eventRecorder: manager.GetEventRecorderFor(fmt.Sprintf(eventRecorderNameTemplate, profile.Name())), + parallelizer: parallelizer.NewParallelizer(options.numOfWorkers), + } +} + +// Client returns the (cached) client in use by the scheduler framework. +func (f *framework) Client() client.Client { + return f.client +} + +// Manager returns the controller manager in use by the scheduler framework. +func (f *framework) Manager() ctrl.Manager { + return f.manager +} + +// APIReader returns the (uncached) read-only client in use by the scheduler framework. +func (f *framework) UncachedReader() client.Reader { + return f.uncachedReader +} + +// EventRecorder returns the event recorder in use by the scheduler framework. +func (f *framework) EventRecorder() record.EventRecorder { + return f.eventRecorder +} + +// RunSchedulingCycleFor performs scheduling for a policy snapshot. +func (f *framework) RunSchedulingCycleFor(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, resources *fleetv1beta1.ClusterResourceSnapshot) (result ctrl.Result, err error) { //nolint:revive + startTime := time.Now() + clusterPolicySnapshotRef := klog.KObj(policy) + klog.V(2).InfoS("Scheduling cycle starts", "clusterPolicySnapshot", clusterPolicySnapshotRef) + defer func() { + latency := time.Since(startTime).Milliseconds() + klog.V(2).InfoS("Scheduling cycle ends", "clusterPolicySnapshot", clusterPolicySnapshotRef, "latency", latency) + }() + + errorMessage := "failed to run scheduling cycle" + + klog.V(2).InfoS("Retrieving clusters and bindings", "clusterPolicySnapshot", clusterPolicySnapshotRef) + + // Retrieve the desired number of clusters from the policy. + // + // TO-DO (chenyu1): assign variable(s) when more logic is added. + _, err = extractNumOfClustersFromPolicySnapshot(policy) + if err != nil { + klog.ErrorS(err, errorMessage, "clusterPolicySnapshot", clusterPolicySnapshotRef) + return ctrl.Result{}, err + } + + // Collect all clusters. + // + // Note that clusters here are listed from the cached client for improved performance. This is + // safe in consistency as it is guaranteed that the scheduler will receive all events for cluster + // changes eventually. + // + // TO-DO (chenyu1): assign variable(s) when more logic is added. + _, err = f.collectClusters(ctx) + if err != nil { + klog.ErrorS(err, errorMessage, klog.KObj(policy)) + return ctrl.Result{}, err + } + + // Collect all bindings. + // + // Note that for consistency reasons, bindings are listed directly from the API server; this helps + // avoid a classic read-after-write consistency issue, which, though should only happen when there + // are connectivity issues and/or API server is overloaded, can lead to over-scheduling in adverse + // scenarios. It is true that even when bindings are over-scheduled, the scheduler can still correct + // the situation in the next cycle; however, considering that placing resources to clusters, unlike + // pods to nodes, is more expensive, it is better to avoid over-scheduling in the first place. + // + // This, of course, has additional performance overhead (and may further exacerbate API server + // overloading). In the long run we might still want to resort to a cached situtation. + // + // TO-DO (chenyu1): explore the possbilities of using a mutation cache for better performance. + bindings, err := f.collectBindings(ctx, policy) + if err != nil { + klog.ErrorS(err, errorMessage, klog.KObj(policy)) + return ctrl.Result{}, err + } + + // Parse the bindings, find out + // * active bindings, i.e., bindings that are not marked for deletion; and + // * bindings that are already marked for deletion, but still have the dispatcher finalizer + // present; + // * bindings that are already marked for deletion, and no longer have the dispatcher finalizer + // + // Note that the scheduler only considers a binding to be deleted if it is marked for deletion + // and it no longer has the dispatcher finalizer. This helps avoid a rare racing condition + // where the scheduler binds a cluster to a resource placement, even though the dispatcher has + // not started, or is still in the process of, removing the same set of resources from + // the cluster, triggered by a recently deleted binding. + // + // TO-DO (chenyu1): assign variable(s) when more logic is added. + klog.V(2).InfoS("Classifying bindings", "clusterPolicySnapshot", clusterPolicySnapshotRef) + _, _, deletedWithoutDispatcherFinalizer := classifyBindings(bindings) + + // If a binding has been marked for deletion and no longer has the dispatcher finalizer, the scheduler + // removes its own finalizer from it, to clear it for eventual deletion. + if err := f.removeSchedulerFinalizerFromBindings(ctx, deletedWithoutDispatcherFinalizer); err != nil { + klog.ErrorS(err, errorMessage, klog.KObj(policy)) + return ctrl.Result{}, err + } + + // Not yet fully implemented. + return ctrl.Result{}, nil +} + +// collectClusters lists all clusters in the cache. +func (f *framework) collectClusters(ctx context.Context) ([]fleetv1beta1.MemberCluster, error) { + errorFormat := "failed to collect clusters: %w" + + clusterList := &fleetv1beta1.MemberClusterList{} + if err := f.client.List(ctx, clusterList, &client.ListOptions{}); err != nil { + return nil, controller.NewAPIServerError(fmt.Errorf(errorFormat, err)) + } + return clusterList.Items, nil +} + +// collectBindings lists all bindings associated with a CRP **using the uncached client**. +func (f *framework) collectBindings(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot) ([]fleetv1beta1.ClusterResourceBinding, error) { + errorFormat := "failed to collect bindings: %w" + + bindingOwner, err := extractOwnerCRPNameFromPolicySnapshot(policy) + if err != nil { + // This branch should never run in most cases, as the a policy snapshot is expected to be + // owned by a CRP. + return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf(errorFormat, err)) + } + + bindingList := &fleetv1beta1.ClusterResourceBindingList{} + labelSelector := labels.SelectorFromSet(labels.Set{fleetv1beta1.CRPTrackingLabel: bindingOwner}) + // List bindings directly from the API server. + if err := f.uncachedReader.List(ctx, bindingList, &client.ListOptions{LabelSelector: labelSelector}); err != nil { + return nil, controller.NewAPIServerError(fmt.Errorf(errorFormat, err)) + } + return bindingList.Items, nil +} + +// removeSchedulerFinalizerFromBindings removes the scheduler finalizer from a list of bindings. +func (f *framework) removeSchedulerFinalizerFromBindings(ctx context.Context, bindings []*fleetv1beta1.ClusterResourceBinding) error { + errorFormat := "failed to remove scheduler finalizer from binding %s: %w" + + for _, binding := range bindings { + controllerutil.RemoveFinalizer(binding, utils.SchedulerFinalizer) + if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil { + return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err)) + } + } + return nil +} diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go new file mode 100644 index 000000000..a1d8e61ea --- /dev/null +++ b/pkg/scheduler/framework/framework_test.go @@ -0,0 +1,358 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "context" + "log" + "os" + "testing" + + "github.com/google/go-cmp/cmp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils" +) + +const ( + CRPName = "test-placement" + policyName = "test-policy" + bindingName = "test-binding" +) + +// TO-DO (chenyu1): expand the test cases as development stablizes. + +// TestMain sets up the test environment. +func TestMain(m *testing.M) { + // Add custom APIs to the runtime scheme. + if err := fleetv1beta1.AddToScheme(scheme.Scheme); err != nil { + log.Fatalf("failed to add custom APIs to the runtime scheme: %v", err) + } + + os.Exit(m.Run()) +} + +// TestExtractNumOfClustersFromPolicySnapshot tests the extractNumOfClustersFromPolicySnapshot function. +func TestExtractNumOfClustersFromPolicySnapshot(t *testing.T) { + testCases := []struct { + name string + policy *fleetv1beta1.ClusterPolicySnapshot + wantNumOfClusters int + expectedToFail bool + }{ + { + name: "valid annotation", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumOfClustersAnnotation: "1", + }, + }, + }, + wantNumOfClusters: 1, + }, + { + name: "no annotation", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + }, + expectedToFail: true, + }, + { + name: "invalid annotation: not an integer", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumOfClustersAnnotation: "abc", + }, + }, + }, + expectedToFail: true, + }, + { + name: "invalid annotation: negative integer", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + fleetv1beta1.NumOfClustersAnnotation: "-1", + }, + }, + }, + expectedToFail: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + numOfClusters, err := extractNumOfClustersFromPolicySnapshot(tc.policy) + if tc.expectedToFail { + if err == nil { + t.Fatalf("extractNumOfClustersFromPolicySnapshot() = %v, %v, want error", numOfClusters, err) + } + return + } + + if numOfClusters != tc.wantNumOfClusters { + t.Fatalf("extractNumOfClustersFromPolicySnapshot() = %v, %v, want %v, nil", numOfClusters, err, tc.wantNumOfClusters) + } + }) + } +} + +// TestCollectClusters tests the collectClusters method. +func TestCollectClusters(t *testing.T) { + cluster := fleetv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-1", + }, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(&cluster). + Build() + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + client: fakeClient, + } + + ctx := context.Background() + clusters, err := f.collectClusters(ctx) + if err != nil { + t.Fatalf("collectClusters() = %v, %v, want no error", clusters, err) + } + + wantClusters := []fleetv1beta1.MemberCluster{cluster} + if !cmp.Equal(clusters, wantClusters) { + t.Fatalf("collectClusters() = %v, %v, want %v, nil", clusters, err, wantClusters) + } +} + +// TestExtractOwnerCRPNameFromPolicySnapshot tests the extractOwnerCRPNameFromPolicySnapshot method. +func TestExtractOwnerCRPNameFromPolicySnapshot(t *testing.T) { + testCases := []struct { + name string + policy *fleetv1beta1.ClusterPolicySnapshot + expectToFail bool + }{ + { + name: "policy with CRP owner reference", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: utils.CRPV1Beta1GVK.Kind, + Name: CRPName, + }, + }, + }, + }, + }, + { + name: "policy without CRP owner reference", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + OwnerReferences: []metav1.OwnerReference{}, + }, + }, + expectToFail: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + owner, err := extractOwnerCRPNameFromPolicySnapshot(tc.policy) + if tc.expectToFail { + if err == nil { + t.Fatalf("extractOwnerCRPNameFromPolicySnapshot() = %v, %v, want cannot find owner ref error", owner, err) + } + return + } + + if err != nil || owner != CRPName { + t.Fatalf("extractOwnerCRPNameFromPolicySnapshot() = %v, %v, want %v, no error", owner, err, CRPName) + } + }) + } +} + +// TestCollectBindings tests the collectBindings method. +func TestCollectBindings(t *testing.T) { + binding := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Labels: map[string]string{ + fleetv1beta1.CRPTrackingLabel: CRPName, + }, + }, + } + altCRPName := "another-test-placement" + + testCases := []struct { + name string + binding *fleetv1beta1.ClusterResourceBinding + policy *fleetv1beta1.ClusterPolicySnapshot + expectToFail bool + expectToFindNoBindings bool + }{ + { + name: "found matching bindings", + binding: binding, + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: utils.CRPV1Beta1GVK.Kind, + Name: CRPName, + }, + }, + }, + }, + }, + { + name: "no owner reference in policy", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + }, + expectToFail: true, + }, + { + name: "no matching bindings", + binding: binding, + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: utils.CRPV1Beta1GVK.Kind, + Name: altCRPName, + }, + }, + }, + }, + expectToFindNoBindings: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fakeClientBuilder := fake.NewClientBuilder().WithScheme(scheme.Scheme) + if tc.binding != nil { + fakeClientBuilder.WithObjects(tc.binding) + } + fakeClient := fakeClientBuilder.Build() + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + uncachedReader: fakeClient, + } + + ctx := context.Background() + bindings, err := f.collectBindings(ctx, tc.policy) + if tc.expectToFail { + if err == nil { + t.Fatalf("collectBindings() = %v, %v, want failed to collect bindings error", bindings, err) + } + return + } + + if err != nil { + t.Fatalf("collectBindings() = %v, %v, want %v, no error", bindings, err, binding) + } + wantBindings := []fleetv1beta1.ClusterResourceBinding{} + if !tc.expectToFindNoBindings { + wantBindings = append(wantBindings, *binding) + } + if !cmp.Equal(bindings, wantBindings) { + t.Fatalf("collectBindings() = %v, %v, want %v, no error", bindings, err, wantBindings) + } + }) + } +} + +// TestClassifyBindings tests the classifyBindings function. +func TestClassifyBindings(t *testing.T) { + timestamp := metav1.Now() + activeBinding := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "active-binding", + }, + } + deletedBindingWithDispatcherFinalizer := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-binding-with-dispatcher-finalizer", + DeletionTimestamp: ×tamp, + Finalizers: []string{ + utils.DispatcherFinalizer, + }, + }, + } + deletedBindingWithoutDispatcherFinalizer := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-binding-without-dispatcher-finalizer", + DeletionTimestamp: ×tamp, + }, + } + + active, deletedWith, deletedWithout := classifyBindings([]fleetv1beta1.ClusterResourceBinding{*activeBinding, *deletedBindingWithDispatcherFinalizer, *deletedBindingWithoutDispatcherFinalizer}) + if diff := cmp.Diff(active, []*fleetv1beta1.ClusterResourceBinding{activeBinding}); diff != "" { + t.Errorf("classifyBindings() active = %v, want %v", active, []*fleetv1beta1.ClusterResourceBinding{activeBinding}) + } + if !cmp.Equal(deletedWith, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithDispatcherFinalizer}) { + t.Errorf("classifyBindings() deletedWithDispatcherFinalizer = %v, want %v", deletedWith, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithDispatcherFinalizer}) + } + if !cmp.Equal(deletedWithout, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithoutDispatcherFinalizer}) { + t.Errorf("classifyBindings() deletedWithoutDispatcherFinalizer = %v, want %v", deletedWithout, []*fleetv1beta1.ClusterResourceBinding{deletedBindingWithoutDispatcherFinalizer}) + } +} + +// TestRemoveSchedulerFinalizerFromBindings tests the removeSchedulerFinalizerFromBindings method. +func TestRemoveSchedulerFinalizerFromBindings(t *testing.T) { + binding := &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Finalizers: []string{utils.SchedulerFinalizer}, + }, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(binding). + Build() + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + client: fakeClient, + } + + ctx := context.Background() + if err := f.removeSchedulerFinalizerFromBindings(ctx, []*fleetv1beta1.ClusterResourceBinding{binding}); err != nil { + t.Fatalf("removeSchedulerFinalizerFromBindings() = %v, want no error", err) + } + + // Verify that the finalizer has been removed. + updatedBinding := &fleetv1beta1.ClusterResourceBinding{} + if err := fakeClient.Get(ctx, types.NamespacedName{Name: bindingName}, updatedBinding); err != nil { + t.Fatalf("Binding Get(%v) = %v, want no error", bindingName, err) + } + + if controllerutil.ContainsFinalizer(updatedBinding, utils.SchedulerFinalizer) { + t.Fatalf("Binding %s finalizers = %v, want no scheduler finalizer", bindingName, updatedBinding.Finalizers) + } +} diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 882906a0c..12173f463 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -14,7 +14,10 @@ import ( // Plugin is the interface which all scheduler plugins should implement. type Plugin interface { Name() string - // TO-DO (chenyu1): add a method to help plugin set up the framework as needed. + + // SetUpWithFramework helps a plugin to set it up with a scheduler framework, such as + // spinning up an informer. + SetUpWithFramework(handle Handle) } // PostBatchPlugin is the interface which all plugins that would like to run at the PostBatch diff --git a/pkg/scheduler/framework/profile.go b/pkg/scheduler/framework/profile.go index 6283e32df..38ff63a94 100644 --- a/pkg/scheduler/framework/profile.go +++ b/pkg/scheduler/framework/profile.go @@ -61,6 +61,11 @@ func (profile *Profile) WithScorePlugin(plugin ScorePlugin) *Profile { return profile } +// Name returns the name of the profile. +func (profile *Profile) Name() string { + return profile.name +} + // NewProfile creates scheduling profile. func NewProfile(name string) *Profile { return &Profile{ diff --git a/pkg/scheduler/framework/profile_test.go b/pkg/scheduler/framework/profile_test.go index 2e39c69ce..3d82b681e 100644 --- a/pkg/scheduler/framework/profile_test.go +++ b/pkg/scheduler/framework/profile_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" ) @@ -51,6 +52,9 @@ func (p *DummyAllPurposePlugin) Score(ctx context.Context, state CycleStatePlugi return &ClusterScore{}, nil } +// SetUpWithFramework is a no-op to satisfy the Plugin interface. +func (p *DummyAllPurposePlugin) SetUpWithFramework(handle Handle) {} // nolint:revive + // TestProfile tests the basic ops of a Profile. func TestProfile(t *testing.T) { profile := NewProfile(dummyProfileName) diff --git a/pkg/scheduler/framework/utils.go b/pkg/scheduler/framework/utils.go new file mode 100644 index 000000000..ae26f56fb --- /dev/null +++ b/pkg/scheduler/framework/utils.go @@ -0,0 +1,74 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "fmt" + "strconv" + + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils" + "go.goms.io/fleet/pkg/utils/controller" +) + +// extractNumOfClustersFromPolicySnapshot extracts the numOfClusters from the policy snapshot. +func extractNumOfClustersFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (int, error) { + numOfClustersStr, ok := policy.Annotations[fleetv1beta1.NumOfClustersAnnotation] + if !ok { + return 0, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot find annotation %s", fleetv1beta1.NumOfClustersAnnotation)) + } + + // Cast the annotation to an integer; throw an error if the cast cannot be completed or the value is negative. + numOfClusters, err := strconv.Atoi(numOfClustersStr) + if err != nil || numOfClusters < 0 { + return 0, controller.NewUnexpectedBehaviorError(fmt.Errorf("invalid annotation %s: %s is not a valid count: %w", fleetv1beta1.NumOfClustersAnnotation, numOfClustersStr, err)) + } + + return numOfClusters, nil +} + +// extractOwnerCRPNameFromPolicySnapshot extracts the name of the owner CRP from the policy snapshot. +func extractOwnerCRPNameFromPolicySnapshot(policy *fleetv1beta1.ClusterPolicySnapshot) (string, error) { + var owner string + for _, ownerRef := range policy.OwnerReferences { + if ownerRef.Kind == utils.CRPV1Beta1GVK.Kind { + owner = ownerRef.Name + break + } + } + if len(owner) == 0 { + return "", fmt.Errorf("cannot find owner reference for policy snapshot %v", policy.Name) + } + return owner, nil +} + +// classifyBindings categorizes bindings into three groups: +// * active: active bindings, that is, bindings that are not marked for deletion; and +// * deletedWithDispatcherFinalizer: bindings that are marked for deletion, but still has the dispatcher finalizer present; and +// * deletedWithoutDispatcherFinalizer: bindings that are marked for deletion, and the dispatcher finalizer is already removed. +func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding) (active, deletedWithDispatcherFinalizer, deletedWithoutDispatcherFinalizer []*fleetv1beta1.ClusterResourceBinding) { + // Pre-allocate arrays. + active = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + deletedWithDispatcherFinalizer = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + deletedWithoutDispatcherFinalizer = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + + for idx := range bindings { + binding := bindings[idx] + if binding.DeletionTimestamp != nil { + if controllerutil.ContainsFinalizer(&binding, utils.DispatcherFinalizer) { + deletedWithDispatcherFinalizer = append(deletedWithDispatcherFinalizer, &binding) + } else { + deletedWithoutDispatcherFinalizer = append(deletedWithoutDispatcherFinalizer, &binding) + } + } else { + active = append(active, &binding) + } + } + + return active, deletedWithDispatcherFinalizer, deletedWithoutDispatcherFinalizer +} diff --git a/pkg/utils/common.go b/pkg/utils/common.go index 2a8d915c5..95f09d33b 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -24,6 +24,7 @@ import ( "k8s.io/klog/v2" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/utils/informer" ) @@ -59,6 +60,14 @@ const ( // MemberClusterFinalizer is used to make sure that we handle gc of all the member cluster resources on the hub cluster. MemberClusterFinalizer = "work.fleet.azure.com/membercluster-finalizer" + // DispatcherFinalizer is added by the dispatcher to make sure that a binding can only be deleted if the dispatcher + // has removed all selected resources from the bound cluster. + DispatcherFinalizer = "fleet.io/dispatcher-cleanup" + + // SchedulerFinalizer is added by the scheduler to make sure that a binding can only be deleted if the scheduler + // has relieved it from scheduling consideration. + SchedulerFinalizer = "fleet.io/scheduler-cleanup" + // LastWorkUpdateTimeAnnotationKey is used to mark the last update time on a work object. LastWorkUpdateTimeAnnotationKey = "work.fleet.azure.com/last-update-time" ) @@ -141,6 +150,13 @@ var ( Version: corev1.SchemeGroupVersion.Version, Resource: "services", } + + // TO-DO (chenyu1): Add more common GVRs/GVKs here. + CRPV1Beta1GVK = schema.GroupVersionKind{ + Group: fleetv1beta1.GroupVersion.Group, + Version: fleetv1beta1.GroupVersion.Version, + Kind: "ClusterResourcePlacement", + } ) // RandSecureInt returns a uniform random value in [1, max] or panic.