From 38185ea8825f2ef8f440de7dbe890891d7f5aecd Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Sat, 1 Jul 2023 00:23:30 +0800 Subject: [PATCH 1/3] Added more scheduler logic --- pkg/scheduler/framework/cyclestate.go | 17 +- pkg/scheduler/framework/dummyplugin_test.go | 4 + pkg/scheduler/framework/framework.go | 178 +++++++++- pkg/scheduler/framework/framework_test.go | 373 ++++++++++++++++++++ pkg/scheduler/framework/status.go | 16 +- pkg/scheduler/framework/status_test.go | 4 +- 6 files changed, 583 insertions(+), 9 deletions(-) diff --git a/pkg/scheduler/framework/cyclestate.go b/pkg/scheduler/framework/cyclestate.go index 07a10a421..47d9929c6 100644 --- a/pkg/scheduler/framework/cyclestate.go +++ b/pkg/scheduler/framework/cyclestate.go @@ -8,6 +8,8 @@ package framework import ( "fmt" "sync" + + "k8s.io/apimachinery/pkg/util/sets" ) // StateKey is the key for a state value stored in a CycleState. @@ -17,6 +19,9 @@ type StateKey string type StateValue interface{} // CycleStatePluginReadWriter is an interface through which plugins can store and retrieve data. +// +// TO-DO (chenyu1): Add methods which allow plugins to query for bindings of different types being +// evaluated in the current scheduling cycle. type CycleStatePluginReadWriter interface { Read(key StateKey) (StateValue, error) Write(key StateKey, val StateValue) @@ -32,6 +37,13 @@ type CycleStatePluginReadWriter interface { type CycleState struct { // store is a concurrency-safe store (a map). store sync.Map + + // skippedFilterPlugins is a set of Filter plugins that should be skipped in the current scheduling cycle. + // + // TO-DO (chenyu1): the sets package has added support for Go generic types in 1.26, and + // the String set has been deprecated; transition to the generic set when the new version + // becomes available. + skippedFilterPlugins sets.String } // Read retrieves a value from CycleState by a key. @@ -54,5 +66,8 @@ func (c *CycleState) Delete(key StateKey) { // NewCycleState creates a CycleState. func NewCycleState() *CycleState { - return &CycleState{} + return &CycleState{ + store: sync.Map{}, + skippedFilterPlugins: sets.NewString(), + } } diff --git a/pkg/scheduler/framework/dummyplugin_test.go b/pkg/scheduler/framework/dummyplugin_test.go index 26f5a1f57..12e15f7a0 100644 --- a/pkg/scheduler/framework/dummyplugin_test.go +++ b/pkg/scheduler/framework/dummyplugin_test.go @@ -11,6 +11,10 @@ import ( fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" ) +const ( + dummyAllPurposePluginNameFormat = "dummyAllPurposePlugin-%d" +) + // A no-op, dummy plugin which connects to all extension points. type DummyAllPurposePlugin struct { name string diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index 5d7783420..e2d4e22d7 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -10,6 +10,7 @@ package framework import ( "context" "fmt" + "sync/atomic" "time" "k8s.io/apimachinery/pkg/labels" @@ -296,17 +297,186 @@ func (f *framework) markAsUnscheduledFor(ctx context.Context, bindings []*fleetv // // TO-DO (chenyu1): remove the nolint directives once the function is implemented. func (f *framework) runSchedulingCycleForPickAllPlacementType( - ctx context.Context, //nolint: revive - state *CycleState, //nolint: revive + ctx context.Context, + state *CycleState, crpName string, //nolint: revive - policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, //nolint: revive - clusters []fleetv1beta1.MemberCluster, //nolint: revive + policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, + clusters []fleetv1beta1.MemberCluster, bound, scheduled, obsolete []*fleetv1beta1.ClusterResourceBinding, //nolint: revive ) (result ctrl.Result, err error) { + policyRef := klog.KObj(policy) + + // The scheduler always needs to take action when processing scheduling policies of the PickAll + // placement type; enter the actual scheduling stages right away. + klog.V(2).InfoS("Scheduling is always needed for CRPs of the PickAll placement type; entering scheduling stages", "schedulingPolicySnapshot", policyRef) + + // Run all plugins needed. + // + // TO-DO (chenyu1): assign variables when needed. + _, _, err = f.runAllPluginsForPickAllPlacementType(ctx, state, policy, clusters) + if err != nil { + klog.ErrorS(err, "Failed to run all plugins (pickAll placement type)", "schedulingPolicySnapshot", policyRef) + return ctrl.Result{}, err + } + // Not yet implemented. return ctrl.Result{}, nil } +// runAllPluginsForPickAllPlacementType runs all plugins in each stage of the scheduling cycle for a +// scheduling policy of the PickAll placement type. +// +// Note that for policies of the PickAll placement type, only the following stages are needed: +// * PreFilter +// * Filter +func (f *framework) runAllPluginsForPickAllPlacementType( + ctx context.Context, + state *CycleState, + policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, + clusters []fleetv1beta1.MemberCluster, +) (scored ScoredClusters, filtered []*filteredClusterWithStatus, err error) { + policyRef := klog.KObj(policy) + + // Run pre-filter plugins. + // + // Each plugin can: + // * set up some common state for future calls (on different extensions points) in the scheduling cycle; and/or + // * check if it needs to run the the Filter stage. + // Any plugin that would like to be skipped is listed in the cycle state for future reference. + // + // Note that any failure would lead to the cancellation of the scheduling cycle. + if status := f.runPreFilterPlugins(ctx, state, policy); status.IsInteralError() { + klog.ErrorS(status.AsError(), "Failed to run pre filter plugins", "schedulingPolicySnapshot", policyRef) + return nil, nil, controller.NewUnexpectedBehaviorError(status.AsError()) + } + + // Run filter plugins. + // + // The scheduler checks each cluster candidate by calling the chain of filter plugins; if any plugin suggests + // that the cluster should not be bound, the cluster is ignored for the rest of the cycle. Note that clusters + // are inspected in parallel. + // + // Note that any failure would lead to the cancellation of the scheduling cycle. + passed, filtered, err := f.runFilterPlugins(ctx, state, policy, clusters) + if err != nil { + klog.ErrorS(err, "Failed to run filter plugins", "schedulingPolicySnapshot", policyRef) + return nil, nil, controller.NewUnexpectedBehaviorError(err) + } + + // Wrap all clusters that have passed the Filter stage as scored clusters. + scored = make(ScoredClusters, len(passed)) + for _, cluster := range passed { + scored = append(scored, &ScoredCluster{ + Cluster: cluster, + Score: &ClusterScore{}, + }) + } + return scored, filtered, nil +} + +// runPreFilterPlugins runs all pre filter plugins sequentially. +func (f *framework) runPreFilterPlugins(ctx context.Context, state *CycleState, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot) *Status { + for _, pl := range f.profile.preFilterPlugins { + status := pl.PreFilter(ctx, state, policy) + switch { + case status.IsSuccess(): // Do nothing. + case status.IsInteralError(): + return status + case status.IsSkip(): + state.skippedFilterPlugins.Insert(pl.Name()) + default: + // Any status that is not Success, InternalError, or Skip is considered an error. + return FromError(fmt.Errorf("prefilter plugin returned an unknown status %s", status), pl.Name()) + } + } + + return nil +} + +// runFilterPluginsFor runs filter plugins for a single cluster. +func (f *framework) runFilterPluginsFor(ctx context.Context, state *CycleState, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) *Status { + for _, pl := range f.profile.filterPlugins { + // Skip the plugin if it is not needed. + if state.skippedFilterPlugins.Has(pl.Name()) { + continue + } + status := pl.Filter(ctx, state, policy, cluster) + switch { + case status.IsSuccess(): // Do nothing. + case status.IsInteralError(): + return status + case status.IsClusterUnschedulable(): + return status + default: + // Any status that is not Success, InternalError, or ClusterUnschedulable is considered an error. + return FromError(fmt.Errorf("filter plugin returned an unknown status %s", status), pl.Name()) + } + } + + return nil +} + +// filteredClusterWithStatus is struct that documents clusters filtered out at the Filter stage, +// along with a plugin status, which documents why a cluster is filtered out. +// +// This struct is used for the purpose of keeping reasons for returning scheduling decision to +// the user. +type filteredClusterWithStatus struct { + cluster *fleetv1beta1.MemberCluster + status *Status +} + +// runFilterPlugins runs filter plugins on clusters in parallel. +func (f *framework) runFilterPlugins(ctx context.Context, state *CycleState, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, clusters []fleetv1beta1.MemberCluster) (passed []*fleetv1beta1.MemberCluster, filtered []*filteredClusterWithStatus, err error) { + // Create a child context. + childCtx, cancel := context.WithCancel(ctx) + + // Pre-allocate slices to avoid races. + passed = make([]*fleetv1beta1.MemberCluster, len(clusters)) + var passedIdx int32 = -1 + filtered = make([]*filteredClusterWithStatus, len(clusters)) + var filteredIdx int32 = -1 + + errFlag := parallelizer.NewErrorFlag() + + doWork := func(pieces int) { + cluster := clusters[pieces] + status := f.runFilterPluginsFor(childCtx, state, policy, &cluster) + switch { + case status.IsSuccess(): + // Use atomic add to avoid races with minimum overhead. + newPassedIdx := atomic.AddInt32(&passedIdx, 1) + passed[newPassedIdx] = &cluster + case status.IsClusterUnschedulable(): + // Use atomic add to avoid races with minimum overhead. + newFilteredIdx := atomic.AddInt32(&filteredIdx, 1) + filtered[newFilteredIdx] = &filteredClusterWithStatus{ + cluster: &cluster, + status: status, + } + default: // An error has occurred. + errFlag.Raise(status.AsError()) + // Cancel the child context, which will lead the parallelizer to stop running tasks. + cancel() + } + } + + // Run inspection in parallel. + // + // Note that the parallel run will be stopped immediately upon encounter of the first error. + f.parallelizer.ParallelizeUntil(childCtx, len(clusters), doWork, "runFilterPlugins") + // Retrieve the first error from the error flag. + if err := errFlag.Lower(); err != nil { + return nil, nil, err + } + + // Trim the slices to the actual size. + passed = passed[:passedIdx+1] + filtered = filtered[:filteredIdx+1] + + return passed, filtered, nil +} + // runSchedulingCycleForPickNPlacementType runs the scheduling cycle for a scheduling policy of the PickN // placement type. // diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index 331983fe8..2362677ac 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -7,6 +7,7 @@ package framework import ( "context" + "fmt" "log" "os" "testing" @@ -19,6 +20,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/scheduler/framework/parallelizer" ) const ( @@ -35,6 +37,7 @@ const ( var ( ignoreObjectMetaResourceVersionField = cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion") ignoreTypeMetaAPIVersionKindFields = cmpopts.IgnoreFields(metav1.TypeMeta{}, "APIVersion", "Kind") + ignoredStatusFields = cmpopts.IgnoreFields(Status{}, "reasons", "err") ) // TO-DO (chenyu1): expand the test cases as development stablizes. @@ -304,3 +307,373 @@ func TestMarkAsUnscheduled(t *testing.T) { t.Errorf("binding diff (-got, +want): %s", diff) } } + +// TestRunPreFilterPlugins tests the runPreFilterPlugins method. +func TestRunPreFilterPlugins(t *testing.T) { + dummyPreFilterPluginNameA := fmt.Sprintf(dummyAllPurposePluginNameFormat, 0) + dummyPreFilterPluginNameB := fmt.Sprintf(dummyAllPurposePluginNameFormat, 1) + + testCases := []struct { + name string + preFilterPlugins []PreFilterPlugin + wantSkippedPluginNames []string + wantStatus *Status + }{ + { + name: "single plugin, success", + preFilterPlugins: []PreFilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyPreFilterPluginNameA, + preFilterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot) *Status { + return nil + }, + }, + }, + }, + { + name: "multiple plugins, one success, one skip", + preFilterPlugins: []PreFilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyPreFilterPluginNameA, + preFilterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot) (status *Status) { + return nil + }, + }, + &DummyAllPurposePlugin{ + name: dummyPreFilterPluginNameB, + preFilterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot) (status *Status) { + return NewNonErrorStatus(Skip, dummyPreFilterPluginNameB) + }, + }, + }, + wantSkippedPluginNames: []string{dummyPreFilterPluginNameB}, + }, + { + name: "single plugin, internal error", + preFilterPlugins: []PreFilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyPreFilterPluginNameA, + preFilterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot) *Status { + return FromError(fmt.Errorf("internal error"), dummyPreFilterPluginNameA) + }, + }, + }, + wantStatus: FromError(fmt.Errorf("internal error"), dummyPreFilterPluginNameA), + }, + { + name: "single plugin, unschedulable", + preFilterPlugins: []PreFilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyPreFilterPluginNameA, + preFilterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot) *Status { + return NewNonErrorStatus(ClusterUnschedulable, dummyPreFilterPluginNameA) + }, + }, + }, + wantStatus: FromError(fmt.Errorf("cluster is unschedulable"), dummyPreFilterPluginNameA), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + profile := NewProfile(dummyProfileName) + for _, p := range tc.preFilterPlugins { + profile.WithPreFilterPlugin(p) + } + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + profile: profile, + } + + ctx := context.Background() + state := NewCycleState() + policy := &fleetv1beta1.ClusterSchedulingPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + } + + status := f.runPreFilterPlugins(ctx, state, policy) + if diff := cmp.Diff(status, tc.wantStatus, cmp.AllowUnexported(Status{}), ignoredStatusFields); diff != "" { + t.Errorf("runPreFilterPlugins() returned status diff (-got, +want): %s", diff) + } + }) + } +} + +// TestRunFilterPluginsFor tests the runFilterPluginsFor method. +func TestRunFilterPluginsFor(t *testing.T) { + dummyFilterPluginNameA := fmt.Sprintf(dummyAllPurposePluginNameFormat, 0) + dummyFilterPluginNameB := fmt.Sprintf(dummyAllPurposePluginNameFormat, 1) + + testCases := []struct { + name string + filterPlugins []FilterPlugin + skippedPluginNames []string + wantStatus *Status + }{ + { + name: "single plugin, success", + filterPlugins: []FilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameA, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return nil + }, + }, + }, + }, + { + name: "multiple plugins, one success, one skipped", + filterPlugins: []FilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameA, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return nil + }, + }, + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameB, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return NewNonErrorStatus(ClusterUnschedulable, dummyFilterPluginNameB) + }, + }, + }, + skippedPluginNames: []string{dummyFilterPluginNameB}, + }, + { + name: "single plugin, internal error", + filterPlugins: []FilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameA, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return FromError(fmt.Errorf("internal error"), dummyFilterPluginNameA) + }, + }, + }, + wantStatus: FromError(fmt.Errorf("internal error"), dummyFilterPluginNameA), + }, + { + name: "multiple plugins, one unschedulable, one success", + filterPlugins: []FilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameA, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return NewNonErrorStatus(ClusterUnschedulable, dummyFilterPluginNameA) + }, + }, + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameB, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return nil + }, + }, + }, + wantStatus: NewNonErrorStatus(ClusterUnschedulable, dummyFilterPluginNameA), + }, + { + name: "single plugin, skip", + filterPlugins: []FilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameA, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return NewNonErrorStatus(Skip, dummyFilterPluginNameA) + }, + }, + }, + wantStatus: FromError(fmt.Errorf("internal error"), dummyFilterPluginNameA), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + profile := NewProfile(dummyProfileName) + for _, p := range tc.filterPlugins { + profile.WithFilterPlugin(p) + } + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + profile: profile, + } + + ctx := context.Background() + state := NewCycleState() + for _, name := range tc.skippedPluginNames { + state.skippedFilterPlugins.Insert(name) + } + policy := &fleetv1beta1.ClusterSchedulingPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + } + cluster := &fleetv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + }, + } + + status := f.runFilterPluginsFor(ctx, state, policy, cluster) + if diff := cmp.Diff(status, tc.wantStatus, cmpopts.IgnoreUnexported(Status{}), ignoredStatusFields); diff != "" { + t.Errorf("runFilterPluginsFor() returned status diff (-got, +want) = %s", diff) + } + }) + } +} + +// TestRunFilterPlugins tests the runFilterPlugins method. +func TestRunFilterPlugins(t *testing.T) { + dummyFilterPluginNameA := fmt.Sprintf(dummyAllPurposePluginNameFormat, 0) + dummyFilterPluginNameB := fmt.Sprintf(dummyAllPurposePluginNameFormat, 1) + + clusters := []fleetv1beta1.MemberCluster{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: altClusterName, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: anotherClusterName, + }, + }, + } + + testCases := []struct { + name string + filterPlugins []FilterPlugin + wantPassedClusterNames []string + wantFilteredClusterNames []string + expectedToFail bool + }{ + { + name: "three clusters, two filter plugins, all passed", + filterPlugins: []FilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameA, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return nil + }, + }, + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameB, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return nil + }, + }, + }, + wantPassedClusterNames: []string{clusterName, altClusterName, anotherClusterName}, + }, + { + name: "three clusters, two filter plugins, two filtered", + filterPlugins: []FilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameA, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + if cluster.Name == clusterName { + return NewNonErrorStatus(ClusterUnschedulable, dummyFilterPluginNameA) + } + return nil + }, + }, + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameB, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + if cluster.Name == anotherClusterName { + return NewNonErrorStatus(ClusterUnschedulable, dummyFilterPluginNameB) + } + return nil + }, + }, + }, + wantPassedClusterNames: []string{altClusterName}, + wantFilteredClusterNames: []string{clusterName, anotherClusterName}, + }, + { + name: "three clusters, two filter plugins, one success, one internal error on specific cluster", + filterPlugins: []FilterPlugin{ + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameA, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + return nil + }, + }, + &DummyAllPurposePlugin{ + name: dummyFilterPluginNameB, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy *fleetv1beta1.ClusterSchedulingPolicySnapshot, cluster *fleetv1beta1.MemberCluster) (status *Status) { + if cluster.Name == anotherClusterName { + return FromError(fmt.Errorf("internal error"), dummyFilterPluginNameB) + } + return nil + }, + }, + }, + expectedToFail: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + profile := NewProfile(dummyProfileName) + for _, p := range tc.filterPlugins { + profile.WithFilterPlugin(p) + } + f := &framework{ + profile: profile, + parallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers), + } + + ctx := context.Background() + state := NewCycleState() + policy := &fleetv1beta1.ClusterSchedulingPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + } + + passed, filtered, err := f.runFilterPlugins(ctx, state, policy, clusters) + if tc.expectedToFail { + if err == nil { + t.Fatalf("runFilterPlugins(%v, %v, %v) = %v %v %v, want error", state, policy, clusters, passed, filtered, err) + } + return + } + + // The method runs in parallel; as a result the order cannot be guaranteed. + // Organize the results into maps for easier comparison. + passedMap := make(map[string]bool) + for _, cluster := range passed { + passedMap[cluster.Name] = true + } + wantPassedMap := make(map[string]bool) + for _, name := range tc.wantPassedClusterNames { + wantPassedMap[name] = true + } + + if diff := cmp.Diff(passedMap, wantPassedMap); diff != "" { + t.Errorf("passed clusters diff (-got, +want): %s", diff) + } + + filteredMap := make(map[string]bool) + for _, item := range filtered { + filteredMap[item.cluster.Name] = true + // As a sanity check, verify if all status are of the ClusterUnschedulable status code. + if !item.status.IsClusterUnschedulable() { + t.Errorf("filtered cluster %s status, got %v, want status code ClusterUnschedulable", item.cluster.Name, item.status) + } + } + wantFilteredMap := make(map[string]bool) + for _, name := range tc.wantFilteredClusterNames { + wantFilteredMap[name] = true + } + + if diff := cmp.Diff(filteredMap, wantFilteredMap); diff != "" { + t.Errorf("filtered clusters diff (-got, +want): %s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/status.go b/pkg/scheduler/framework/status.go index 5e63d67a9..5076c6434 100644 --- a/pkg/scheduler/framework/status.go +++ b/pkg/scheduler/framework/status.go @@ -5,7 +5,10 @@ Licensed under the MIT license. package framework -import "strings" +import ( + "fmt" + "strings" +) // StatusCode is the status code of a Status, returned by a plugin. type StatusCode int @@ -70,7 +73,7 @@ func (s *Status) IsInteralError() bool { } // IsPreSkip returns if a Status is of the status code Skip. -func (s *Status) IsPreSkip() bool { +func (s *Status) IsSkip() bool { return s.code() == Skip } @@ -116,6 +119,15 @@ func (s *Status) String() string { return strings.Join(desc, ", ") } +// AsError returns a status as an error; it returns nil if the status is of the internalError code. +func (s *Status) AsError() error { + if !s.IsInteralError() { + return nil + } + + return fmt.Errorf("plugin %s returned an error %s", s.sourcePlugin, s.String()) +} + // NewNonErrorStatus returns a Status with a non-error status code. // To return a Status of the internalError status code, use FromError() instead. func NewNonErrorStatus(code StatusCode, sourcePlugin string, reasons ...string) *Status { diff --git a/pkg/scheduler/framework/status_test.go b/pkg/scheduler/framework/status_test.go index 8e50d6f20..13dfbe638 100644 --- a/pkg/scheduler/framework/status_test.go +++ b/pkg/scheduler/framework/status_test.go @@ -73,7 +73,7 @@ func TestNonNilStatusMethods(t *testing.T) { status.IsSuccess, status.IsInteralError, status.IsClusterUnschedulable, - status.IsPreSkip, + status.IsSkip, } for idx, checkFunc := range checkFuncs { if wantCheckOutputs[idx] != checkFunc() { @@ -114,7 +114,7 @@ func TestNilStatusMethods(t *testing.T) { status.IsSuccess, status.IsInteralError, status.IsClusterUnschedulable, - status.IsPreSkip, + status.IsSkip, } for idx, checkFunc := range checkFuncs { if wantCheckOutputs[idx] != checkFunc() { From 235dd495fe00478be644e4f1a3bc795c14afd6b9 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Mon, 3 Jul 2023 16:26:21 +0800 Subject: [PATCH 2/3] Minor fixes --- pkg/scheduler/framework/framework_test.go | 77 +++++++++++++++-------- 1 file changed, 52 insertions(+), 25 deletions(-) diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index 2362677ac..e7870b1e0 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -546,6 +546,8 @@ func TestRunFilterPlugins(t *testing.T) { testCases := []struct { name string filterPlugins []FilterPlugin + wantClusters []*fleetv1beta1.MemberCluster + wantFiltered []*filteredClusterWithStatus wantPassedClusterNames []string wantFilteredClusterNames []string expectedToFail bool @@ -566,7 +568,24 @@ func TestRunFilterPlugins(t *testing.T) { }, }, }, - wantPassedClusterNames: []string{clusterName, altClusterName, anotherClusterName}, + wantClusters: []*fleetv1beta1.MemberCluster{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: altClusterName, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: anotherClusterName, + }, + }, + }, + wantFiltered: []*filteredClusterWithStatus{}, }, { name: "three clusters, two filter plugins, two filtered", @@ -590,8 +609,31 @@ func TestRunFilterPlugins(t *testing.T) { }, }, }, - wantPassedClusterNames: []string{altClusterName}, - wantFilteredClusterNames: []string{clusterName, anotherClusterName}, + wantClusters: []*fleetv1beta1.MemberCluster{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: altClusterName, + }, + }, + }, + wantFiltered: []*filteredClusterWithStatus{ + { + cluster: &fleetv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + }, + }, + status: NewNonErrorStatus(ClusterUnschedulable, dummyFilterPluginNameA), + }, + { + cluster: &fleetv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: anotherClusterName, + }, + }, + status: NewNonErrorStatus(ClusterUnschedulable, dummyFilterPluginNameB), + }, + }, }, { name: "three clusters, two filter plugins, one success, one internal error on specific cluster", @@ -644,34 +686,19 @@ func TestRunFilterPlugins(t *testing.T) { } // The method runs in parallel; as a result the order cannot be guaranteed. - // Organize the results into maps for easier comparison. - passedMap := make(map[string]bool) - for _, cluster := range passed { - passedMap[cluster.Name] = true + // Sort the results by cluster name for comparison. + lessFuncCluster := func(cluster1, cluster2 *fleetv1beta1.MemberCluster) bool { + return cluster1.Name < cluster2.Name } - wantPassedMap := make(map[string]bool) - for _, name := range tc.wantPassedClusterNames { - wantPassedMap[name] = true + lessFuncFilteredCluster := func(filtered1, filtered2 *filteredClusterWithStatus) bool { + return filtered1.cluster.Name < filtered2.cluster.Name } - if diff := cmp.Diff(passedMap, wantPassedMap); diff != "" { + if diff := cmp.Diff(passed, tc.wantClusters, cmpopts.SortSlices(lessFuncCluster)); diff != "" { t.Errorf("passed clusters diff (-got, +want): %s", diff) } - filteredMap := make(map[string]bool) - for _, item := range filtered { - filteredMap[item.cluster.Name] = true - // As a sanity check, verify if all status are of the ClusterUnschedulable status code. - if !item.status.IsClusterUnschedulable() { - t.Errorf("filtered cluster %s status, got %v, want status code ClusterUnschedulable", item.cluster.Name, item.status) - } - } - wantFilteredMap := make(map[string]bool) - for _, name := range tc.wantFilteredClusterNames { - wantFilteredMap[name] = true - } - - if diff := cmp.Diff(filteredMap, wantFilteredMap); diff != "" { + if diff := cmp.Diff(filtered, tc.wantFiltered, cmpopts.SortSlices(lessFuncFilteredCluster), cmp.AllowUnexported(filteredClusterWithStatus{}, Status{})); diff != "" { t.Errorf("filtered clusters diff (-got, +want): %s", diff) } }) From f3c517a69fb47859dd3510481e9c7379b4880833 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Mon, 3 Jul 2023 16:47:29 +0800 Subject: [PATCH 3/3] Minor fixes --- pkg/scheduler/framework/framework_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index e7870b1e0..6107996d7 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -544,13 +544,11 @@ func TestRunFilterPlugins(t *testing.T) { } testCases := []struct { - name string - filterPlugins []FilterPlugin - wantClusters []*fleetv1beta1.MemberCluster - wantFiltered []*filteredClusterWithStatus - wantPassedClusterNames []string - wantFilteredClusterNames []string - expectedToFail bool + name string + filterPlugins []FilterPlugin + wantClusters []*fleetv1beta1.MemberCluster + wantFiltered []*filteredClusterWithStatus + expectedToFail bool }{ { name: "three clusters, two filter plugins, all passed",