From 9730baea7033dd6cb063de411b75db02ddf3a8d1 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Thu, 8 Jun 2023 17:18:33 +0800 Subject: [PATCH 1/2] Added interfaces + scheduler profile --- pkg/scheduler/framework/interface.go | 90 ++++++++++++++++++++ pkg/scheduler/framework/profile.go | 70 +++++++++++++++ pkg/scheduler/framework/profile_test.go | 108 ++++++++++++++++++++++++ 3 files changed, 268 insertions(+) create mode 100644 pkg/scheduler/framework/interface.go create mode 100644 pkg/scheduler/framework/profile.go create mode 100644 pkg/scheduler/framework/profile_test.go diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go new file mode 100644 index 000000000..fc6255661 --- /dev/null +++ b/pkg/scheduler/framework/interface.go @@ -0,0 +1,90 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "context" + + fleetv1 "go.goms.io/fleet/apis/v1" +) + +// Plugin is the interface which all scheduler plugins should implement. +type Plugin interface { + Name() string + // TO-DO (chenyu1): add a method to help plugin set up the framework as needed. +} + +// PostBatchPlugin is the interface which all plugins that would like to run at the PostBatch +// extension point should implement. +type PostBatchPlugin interface { + Plugin + + // PostBatch runs after the scheduler has determined the number of bindings to create; + // a plugin which registers at this extension point must return one of the follows: + // * A Success status with a new batch size; or + // * A Skip status, if no changes in batch size is needed; or + // * An InternalError status, if an expected error has occurred + PostBatch(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot) (size int, status *Status) +} + +// PreFilterPlugin is the interface which all plugins that would like to run at the PreFilter +// extension point should implement. +type PreFilterPlugin interface { + Plugin + + // PreFilter runs before the scheduler enters the Filter stage; a plugin may perform + // some setup at this extension point, such as caching the results that will be used in + // following Filter calls, and/or run some checks to determine if it should be skipped in + // the Filter stage. + // + // A plugin which registers at this extension point must return one of the follows: + // * A Success status, if the plugin should run at the Filter stage; or + // * A Skip status, if the plugin should be skipped at the Filter stage; or + // * An InternalError status, if an expected error has occurred + PreFilter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot) (status *Status) +} + +// FilterPlugin is the interface which all plugins that would like to run at the Filter +// extension point should implement. +type FilterPlugin interface { + Plugin + + // Filter runs at the Filter stage, to check if a placement can be bound to a specific cluster. + // A plugin which registers at this extension point must return one of the follows: + // * A Success status, if the placement can be bound to the cluster; or + // * A ClusterUnschedulable status, if the placement cannot be bound to the cluster; or + // * An InternalError status, if an expected error has occurred + Filter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot, cluster *fleetv1.MemberCluster) (status *Status) +} + +// PreScorePlugin is the interface which all plugins that would like to run at the PreScore +// extension point should implement. +type PreScorePlugin interface { + Plugin + + // PreScore runs before the scheduler enters the Score stage; a plugin may perform + // some setup at this extension point, such as caching the results that will be used in + // following Score calls, and/or run some checks to determine if it should be skipped in + // the Filter stage. + // + // A plugin which registers at this extension point must return one of the follows: + // * A Success status, if the plugin should run at the Score stage; or + // * A Skip status, if the plugin should be skipped at the Score stage; or + // * An InternalError status, if an expected error has occurred + PreScore(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot) (status *Status) +} + +// ScorePlugin is the interface which all plugins that would like to run at the Score +// extension point should implement. +type ScorePlugin interface { + Plugin + + // Score runs at the Score stage, to score a cluster for a specific placement. + // A plugin which registers at this extension point must return one of the follows: + // * A Success status, with the score for the cluster; or + // * An InternalError status, if an expected error has occurred + Score(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot, cluster *fleetv1.MemberCluster) (score *ClusterScore, status *Status) +} diff --git a/pkg/scheduler/framework/profile.go b/pkg/scheduler/framework/profile.go new file mode 100644 index 000000000..6283e32df --- /dev/null +++ b/pkg/scheduler/framework/profile.go @@ -0,0 +1,70 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +// Profile specifies the scheduling profile a framework uses; it includes the plugins in use +// by the framework at each extension point in order. +// +// At this moment, since Fleet does not support runtime profiles, all plugins are registered +// directly to one universal profile, in their instantiated forms, rather than decoupled using +// a factory registry and instantiated along with the profile's associated framework. +type Profile struct { + name string + + postBatchPlugins []PostBatchPlugin + preFilterPlugins []PreFilterPlugin + filterPlugins []FilterPlugin + preScorePlugins []PreScorePlugin + scorePlugins []ScorePlugin + + // RegisteredPlugins is a map of all plugins registered to the profile, keyed by their names. + // This helps to avoid setting up same plugin multiple times with the framework if the plugin + // registers at multiple extension points. + registeredPlugins map[string]Plugin +} + +// WithPostBatchPlugin registers a PostBatchPlugin to the profile. +func (profile *Profile) WithPostBatchPlugin(plugin PostBatchPlugin) *Profile { + profile.postBatchPlugins = append(profile.postBatchPlugins, plugin) + profile.registeredPlugins[plugin.Name()] = plugin + return profile +} + +// WithPreFilterPlugin registers a PreFilterPlugin to the profile. +func (profile *Profile) WithPreFilterPlugin(plugin PreFilterPlugin) *Profile { + profile.preFilterPlugins = append(profile.preFilterPlugins, plugin) + profile.registeredPlugins[plugin.Name()] = plugin + return profile +} + +// WithFilterPlugin registers a FilterPlugin to the profile. +func (profile *Profile) WithFilterPlugin(plugin FilterPlugin) *Profile { + profile.filterPlugins = append(profile.filterPlugins, plugin) + profile.registeredPlugins[plugin.Name()] = plugin + return profile +} + +// WithPreScorePlugin registers a PreScorePlugin to the profile. +func (profile *Profile) WithPreScorePlugin(plugin PreScorePlugin) *Profile { + profile.preScorePlugins = append(profile.preScorePlugins, plugin) + profile.registeredPlugins[plugin.Name()] = plugin + return profile +} + +// WithScorePlugin registers a ScorePlugin to the profile. +func (profile *Profile) WithScorePlugin(plugin ScorePlugin) *Profile { + profile.scorePlugins = append(profile.scorePlugins, plugin) + profile.registeredPlugins[plugin.Name()] = plugin + return profile +} + +// NewProfile creates scheduling profile. +func NewProfile(name string) *Profile { + return &Profile{ + name: name, + registeredPlugins: map[string]Plugin{}, + } +} diff --git a/pkg/scheduler/framework/profile_test.go b/pkg/scheduler/framework/profile_test.go new file mode 100644 index 000000000..c50a7d6c4 --- /dev/null +++ b/pkg/scheduler/framework/profile_test.go @@ -0,0 +1,108 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "context" + "testing" + + fleetv1 "go.goms.io/fleet/apis/v1" +) + +const ( + dummyPluginName = "dummyAllPurposePlugin" +) + +// A no-op, dummy plugin which connects to all extension points. +type DummyAllPurposePlugin struct{} + +// Name returns the name of the dummy plugin. +func (p *DummyAllPurposePlugin) Name() string { + return dummyPluginName +} + +// PostBatch implements the PostBatch interface for the dummy plugin. +func (p *DummyAllPurposePlugin) PostBatch(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot) (size int, status *Status) { //nolint:revive + return 1, nil +} + +// PreFilter implements the PreFilter interface for the dummy plugin. +func (p *DummyAllPurposePlugin) PreFilter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot) (status *Status) { //nolint:revive + return nil +} + +// Filter implements the Filter interface for the dummy plugin. +func (p *DummyAllPurposePlugin) Filter(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot, cluster *fleetv1.MemberCluster) (status *Status) { //nolint:revive + return nil +} + +// PreScore implements the PreScore interface for the dummy plugin. +func (p *DummyAllPurposePlugin) PreScore(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot) (status *Status) { //nolint:revive + return nil +} + +// Score implements the Score interface for the dummy plugin. +func (p *DummyAllPurposePlugin) Score(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1.ClusterPolicySnapshot, cluster *fleetv1.MemberCluster) (score *ClusterScore, status *Status) { //nolint:revive + return &ClusterScore{}, nil +} + +// TestProfile tests the basic ops of a Profile. +func TestProfile(t *testing.T) { + profileName := "testProfile" + profile := NewProfile(profileName) + + dummyAllPurposePlugin := &DummyAllPurposePlugin{} + dummyPlugin := Plugin(dummyAllPurposePlugin) + + profile.WithPostBatchPlugin(dummyAllPurposePlugin) + profile.WithPreFilterPlugin(dummyAllPurposePlugin) + profile.WithFilterPlugin(dummyAllPurposePlugin) + profile.WithPreScorePlugin(dummyAllPurposePlugin) + profile.WithScorePlugin(dummyAllPurposePlugin) + + // Same plugin should be registered only once, even if it connects to multiple extension points. + if len(profile.registeredPlugins) != 1 { + t.Fatalf("registerPlugins len() = %d, want %d", len(profile.registeredPlugins), 1) + } + if pl, ok := profile.registeredPlugins[dummyPluginName]; !ok || pl != dummyPlugin { + t.Fatalf("registeredPlugins[%s] = %v, %t, want %v, %t", dummyPluginName, pl, ok, dummyPlugin, true) + } + + if len(profile.postBatchPlugins) != 1 { + t.Fatalf("postBatchPlugins len() = %d, want %d", len(profile.postBatchPlugins), 1) + } + if pl := profile.postBatchPlugins[0]; pl != dummyPlugin { + t.Fatalf("postBatchPlugins[0] = %v, want %v", pl, dummyPlugin) + } + + if len(profile.preFilterPlugins) != 1 { + t.Fatalf("preFilterPlugins len() = %d, want %d", len(profile.preFilterPlugins), 1) + } + if pl := profile.preFilterPlugins[0]; pl != dummyPlugin { + t.Fatalf("preFilterPlugins[0] = %v, want %v", pl, dummyPlugin) + } + + if len(profile.filterPlugins) != 1 { + t.Fatalf("filterPlugins len() = %d, want %d", len(profile.filterPlugins), 1) + } + if pl := profile.filterPlugins[0]; pl != dummyPlugin { + t.Fatalf("filterPlugins[0] = %v, want %v", pl, dummyPlugin) + } + + if len(profile.preScorePlugins) != 1 { + t.Fatalf("preScorePlugins len() = %d, want %d", len(profile.preScorePlugins), 1) + } + if pl := profile.preScorePlugins[0]; pl != dummyPlugin { + t.Fatalf("preScorePlugins[0] = %v, want %v", pl, dummyPlugin) + } + + if len(profile.scorePlugins) != 1 { + t.Fatalf("scorePlugins len() = %d, want %d", len(profile.scorePlugins), 1) + } + if pl := profile.scorePlugins[0]; pl != dummyPlugin { + t.Fatalf("scorePlugins[0] = %v, want %v", pl, dummyPlugin) + } +} From aa286b4baf6b9e0213fe2b0f9f5194bd51ff05d8 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Mon, 12 Jun 2023 20:13:03 +0800 Subject: [PATCH 2/2] Minor fixes --- pkg/scheduler/framework/profile_test.go | 58 +++++++------------------ 1 file changed, 16 insertions(+), 42 deletions(-) diff --git a/pkg/scheduler/framework/profile_test.go b/pkg/scheduler/framework/profile_test.go index c50a7d6c4..614e47c50 100644 --- a/pkg/scheduler/framework/profile_test.go +++ b/pkg/scheduler/framework/profile_test.go @@ -9,11 +9,13 @@ import ( "context" "testing" + "github.com/google/go-cmp/cmp" fleetv1 "go.goms.io/fleet/apis/v1" ) const ( - dummyPluginName = "dummyAllPurposePlugin" + dummyProfileName = "dummyProfile" + dummyPluginName = "dummyAllPurposePlugin" ) // A no-op, dummy plugin which connects to all extension points. @@ -51,8 +53,7 @@ func (p *DummyAllPurposePlugin) Score(ctx context.Context, state CycleStatePlugi // TestProfile tests the basic ops of a Profile. func TestProfile(t *testing.T) { - profileName := "testProfile" - profile := NewProfile(profileName) + profile := NewProfile(dummyProfileName) dummyAllPurposePlugin := &DummyAllPurposePlugin{} dummyPlugin := Plugin(dummyAllPurposePlugin) @@ -63,46 +64,19 @@ func TestProfile(t *testing.T) { profile.WithPreScorePlugin(dummyAllPurposePlugin) profile.WithScorePlugin(dummyAllPurposePlugin) - // Same plugin should be registered only once, even if it connects to multiple extension points. - if len(profile.registeredPlugins) != 1 { - t.Fatalf("registerPlugins len() = %d, want %d", len(profile.registeredPlugins), 1) - } - if pl, ok := profile.registeredPlugins[dummyPluginName]; !ok || pl != dummyPlugin { - t.Fatalf("registeredPlugins[%s] = %v, %t, want %v, %t", dummyPluginName, pl, ok, dummyPlugin, true) - } - - if len(profile.postBatchPlugins) != 1 { - t.Fatalf("postBatchPlugins len() = %d, want %d", len(profile.postBatchPlugins), 1) - } - if pl := profile.postBatchPlugins[0]; pl != dummyPlugin { - t.Fatalf("postBatchPlugins[0] = %v, want %v", pl, dummyPlugin) - } - - if len(profile.preFilterPlugins) != 1 { - t.Fatalf("preFilterPlugins len() = %d, want %d", len(profile.preFilterPlugins), 1) - } - if pl := profile.preFilterPlugins[0]; pl != dummyPlugin { - t.Fatalf("preFilterPlugins[0] = %v, want %v", pl, dummyPlugin) + wantProfile := &Profile{ + name: dummyProfileName, + postBatchPlugins: []PostBatchPlugin{dummyAllPurposePlugin}, + preFilterPlugins: []PreFilterPlugin{dummyAllPurposePlugin}, + filterPlugins: []FilterPlugin{dummyAllPurposePlugin}, + preScorePlugins: []PreScorePlugin{dummyAllPurposePlugin}, + scorePlugins: []ScorePlugin{dummyAllPurposePlugin}, + registeredPlugins: map[string]Plugin{ + dummyPluginName: dummyPlugin, + }, } - if len(profile.filterPlugins) != 1 { - t.Fatalf("filterPlugins len() = %d, want %d", len(profile.filterPlugins), 1) - } - if pl := profile.filterPlugins[0]; pl != dummyPlugin { - t.Fatalf("filterPlugins[0] = %v, want %v", pl, dummyPlugin) - } - - if len(profile.preScorePlugins) != 1 { - t.Fatalf("preScorePlugins len() = %d, want %d", len(profile.preScorePlugins), 1) - } - if pl := profile.preScorePlugins[0]; pl != dummyPlugin { - t.Fatalf("preScorePlugins[0] = %v, want %v", pl, dummyPlugin) - } - - if len(profile.scorePlugins) != 1 { - t.Fatalf("scorePlugins len() = %d, want %d", len(profile.scorePlugins), 1) - } - if pl := profile.scorePlugins[0]; pl != dummyPlugin { - t.Fatalf("scorePlugins[0] = %v, want %v", pl, dummyPlugin) + if !cmp.Equal(profile, wantProfile, cmp.AllowUnexported(Profile{})) { + t.Fatalf("NewProfile() = %v, want %v", profile, wantProfile) } }