diff --git a/pkg/scheduler/framework/cyclestate.go b/pkg/scheduler/framework/cyclestate.go new file mode 100644 index 000000000..07a10a421 --- /dev/null +++ b/pkg/scheduler/framework/cyclestate.go @@ -0,0 +1,58 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "fmt" + "sync" +) + +// StateKey is the key for a state value stored in a CycleState. +type StateKey string + +// StateValue is the value stored in a CycleState under a specific key. +type StateValue interface{} + +// CycleStatePluginReadWriter is an interface through which plugins can store and retrieve data. +type CycleStatePluginReadWriter interface { + Read(key StateKey) (StateValue, error) + Write(key StateKey, val StateValue) + Delete(key StateKey) +} + +// CycleState is, similar to its namesake in kube-scheduler, provides a way for plugins to +// store and retrieve arbitrary data during a scheduling cycle. The scheduler also uses +// this struct to keep some global states during a scheduling cycle; note that these +// state are only accessible to the scheduler itself, not to plugins. +// +// It uses a sync.Map for concurrency-safe storage. +type CycleState struct { + // store is a concurrency-safe store (a map). + store sync.Map +} + +// Read retrieves a value from CycleState by a key. +func (c *CycleState) Read(key StateKey) (StateValue, error) { + if v, ok := c.store.Load(key); ok { + return v, nil + } + return nil, fmt.Errorf("key %s is not found", key) +} + +// Write stores a value in CycleState under a key. +func (c *CycleState) Write(key StateKey, val StateValue) { + c.store.Store(key, val) +} + +// Delete deletes a key from CycleState. +func (c *CycleState) Delete(key StateKey) { + c.store.Delete(key) +} + +// NewCycleState creates a CycleState. +func NewCycleState() *CycleState { + return &CycleState{} +} diff --git a/pkg/scheduler/framework/cyclestate_test.go b/pkg/scheduler/framework/cyclestate_test.go new file mode 100644 index 000000000..b39e659ed --- /dev/null +++ b/pkg/scheduler/framework/cyclestate_test.go @@ -0,0 +1,23 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import "testing" + +// TestCycleStateBasicOps tests the basic ops (Read, Write, and Delete) of a CycleState. +func TestCycleStateBasicOps(t *testing.T) { + cs := NewCycleState() + + k, v := "key", "value" + cs.Write(StateKey(k), StateValue(v)) + if out, err := cs.Read("key"); out != "value" || err != nil { + t.Fatalf("Read(%v) = %v, %v, want %v, nil", k, out, err, v) + } + cs.Delete(StateKey(k)) + if out, err := cs.Read("key"); out != nil || err == nil { + t.Fatalf("Read(%v) = %v, %v, want nil, not found error", k, out, err) + } +} diff --git a/pkg/scheduler/framework/score.go b/pkg/scheduler/framework/score.go new file mode 100644 index 000000000..7c4a902d5 --- /dev/null +++ b/pkg/scheduler/framework/score.go @@ -0,0 +1,65 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + fleetv1 "go.goms.io/fleet/apis/v1" +) + +// ClusterScore is the scores the scheduler assigns to a cluster. +type ClusterScore struct { + // TopologySpreadScore determines how much a binding would satisfy the topology spread + // constraints specified by the user. + TopologySpreadScore int + // AffinityScore determines how much a binding would satisfy the affinity terms + // specified by the user. + AffinityScore int + // PriorityScore determines how much a binding would satisfy the priority terms + // specified by the user. + PriorityScore int +} + +// Add adds a ClusterScore to another ClusterScore. +func (s1 *ClusterScore) Add(s2 ClusterScore) { + s1.TopologySpreadScore += s2.TopologySpreadScore + s1.AffinityScore += s2.AffinityScore + s1.PriorityScore += s2.PriorityScore +} + +// Less returns true if a ClusterScore is less than another. +func (s1 *ClusterScore) Less(s2 *ClusterScore) bool { + if s1.TopologySpreadScore != s2.TopologySpreadScore { + return s1.TopologySpreadScore < s2.TopologySpreadScore + } + + if s1.AffinityScore != s2.AffinityScore { + return s1.AffinityScore < s2.AffinityScore + } + + return s1.PriorityScore < s2.PriorityScore +} + +// ScoredCluster is a cluster with a score. +type ScoredCluster struct { + Cluster *fleetv1.MemberCluster + Score *ClusterScore +} + +// ScoredClusters is a list of ScoredClusters; this type implements the sort.Interface. +type ScoredClusters []*ScoredCluster + +// Len returns the length of a ScoredClusters; it implemented sort.Interface.Len(). +func (sc ScoredClusters) Len() int { return len(sc) } + +// Less returns true if a ScoredCluster is of a lower score than another; it implemented sort.Interface.Less(). +func (sc ScoredClusters) Less(i, j int) bool { + return sc[i].Score.Less(sc[j].Score) +} + +// Swap swaps two ScoredClusters in the list; it implemented sort.Interface.Swap(). +func (sc ScoredClusters) Swap(i, j int) { + sc[i], sc[j] = sc[j], sc[i] +} diff --git a/pkg/scheduler/framework/score_test.go b/pkg/scheduler/framework/score_test.go new file mode 100644 index 000000000..218c8b258 --- /dev/null +++ b/pkg/scheduler/framework/score_test.go @@ -0,0 +1,357 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "sort" + "testing" + + "github.com/google/go-cmp/cmp" + + fleetv1 "go.goms.io/fleet/apis/v1" +) + +func TestClusterScoreAdd(t *testing.T) { + s1 := &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 0, + PriorityScore: 0, + } + + s2 := &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 5, + PriorityScore: 80, + } + + s1.Add(*s2) + want := &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 5, + PriorityScore: 80, + } + if !cmp.Equal(s1, want) { + t.Fatalf("Add() = %v, want %v", s1, want) + } +} + +func TestClusterScoreLess(t *testing.T) { + testCases := []struct { + name string + s1 *ClusterScore + s2 *ClusterScore + want bool + }{ + { + name: "s1 is less than s2 in topology spread score", + s1: &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 10, + PriorityScore: 20, + }, + s2: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 20, + PriorityScore: 30, + }, + want: true, + }, + { + name: "s1 is less than s2 in affinity score", + s1: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 20, + }, + s2: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 20, + PriorityScore: 30, + }, + want: true, + }, + { + name: "s1 is less than s2 in priority score", + s1: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 20, + }, + s2: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 30, + }, + want: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.s1.Less(tc.s2) != tc.want { + t.Fatalf("Less(%v, %v) = %t, want %t", tc.s1, tc.s2, !tc.want, tc.want) + } + + if tc.s2.Less(tc.s1) != !tc.want { + t.Fatalf("Less(%v, %v) = %t, want %t", tc.s2, tc.s1, tc.want, !tc.want) + } + }) + } +} + +func TestClusterScoreEqual(t *testing.T) { + s1 := &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 0, + PriorityScore: 0, + } + + s2 := &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 0, + PriorityScore: 0, + } + + if s1.Less(s2) || s2.Less(s1) { + t.Fatalf("Less(%v, %v) = %v, Less(%v, %v) = %v, want both to be false", s1, s2, s1.Less(s2), s2, s1, s2.Less(s1)) + } +} + +func TestScoredClustersSort(t *testing.T) { + clusterA := &fleetv1.MemberCluster{} + clusterB := &fleetv1.MemberCluster{} + clusterC := &fleetv1.MemberCluster{} + clusterD := &fleetv1.MemberCluster{} + + testCases := []struct { + name string + scs ScoredClusters + want ScoredClusters + }{ + { + name: "sort asc values", + scs: ScoredClusters{ + { + Cluster: clusterA, + Score: &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + { + Cluster: clusterB, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + { + Cluster: clusterC, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 20, + PriorityScore: 20, + }, + }, + { + Cluster: clusterD, + Score: &ClusterScore{ + TopologySpreadScore: 2, + AffinityScore: 30, + PriorityScore: 40, + }, + }, + }, + want: ScoredClusters{ + { + Cluster: clusterD, + Score: &ClusterScore{ + TopologySpreadScore: 2, + AffinityScore: 30, + PriorityScore: 40, + }, + }, + { + Cluster: clusterC, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 20, + PriorityScore: 20, + }, + }, + { + Cluster: clusterB, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + { + Cluster: clusterA, + Score: &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + }, + }, + { + name: "sort desc values", + scs: ScoredClusters{ + { + Cluster: clusterD, + Score: &ClusterScore{ + TopologySpreadScore: 2, + AffinityScore: 30, + PriorityScore: 40, + }, + }, + { + Cluster: clusterC, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 20, + PriorityScore: 20, + }, + }, + { + Cluster: clusterB, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + { + Cluster: clusterA, + Score: &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + }, + want: ScoredClusters{ + { + Cluster: clusterD, + Score: &ClusterScore{ + TopologySpreadScore: 2, + AffinityScore: 30, + PriorityScore: 40, + }, + }, + { + Cluster: clusterC, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 20, + PriorityScore: 20, + }, + }, + { + Cluster: clusterB, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + { + Cluster: clusterA, + Score: &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + }, + }, + { + name: "sort values in random", + scs: ScoredClusters{ + { + Cluster: clusterC, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 20, + PriorityScore: 20, + }, + }, + { + Cluster: clusterD, + Score: &ClusterScore{ + TopologySpreadScore: 2, + AffinityScore: 30, + PriorityScore: 40, + }, + }, + { + Cluster: clusterA, + Score: &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + { + Cluster: clusterB, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + }, + want: ScoredClusters{ + { + Cluster: clusterD, + Score: &ClusterScore{ + TopologySpreadScore: 2, + AffinityScore: 30, + PriorityScore: 40, + }, + }, + { + Cluster: clusterC, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 20, + PriorityScore: 20, + }, + }, + { + Cluster: clusterB, + Score: &ClusterScore{ + TopologySpreadScore: 1, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + { + Cluster: clusterA, + Score: &ClusterScore{ + TopologySpreadScore: 0, + AffinityScore: 10, + PriorityScore: 20, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sort.Sort(sort.Reverse(tc.scs)) + if !cmp.Equal(tc.scs, tc.want) { + t.Fatalf("Sort() = %v, want %v", tc.scs, tc.want) + } + }) + } +} diff --git a/pkg/scheduler/framework/status.go b/pkg/scheduler/framework/status.go new file mode 100644 index 000000000..cdce0ac20 --- /dev/null +++ b/pkg/scheduler/framework/status.go @@ -0,0 +1,134 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import "strings" + +// StatusCode is the status code of a Status, returned by a plugin. +type StatusCode int + +// Pre-defined status codes. +const ( + // Success signals that a plugin has completed its run successfully. + // Note that a nil *Status is also considered as a Success Status. + Success StatusCode = iota + // internalError signals that a plugin has encountered an internal error. + // Note that this status code is NOT exported; to return an internalError status, use the + // FromError() call. + internalError + // ClusterUnschedulable signals that a plugin has found that a placement should not be bound + // to a specific cluster. + ClusterUnschedulable + // PreSkip signals that a plugin should be skipped in the following stage. This is returned + // by plugins at PreFilture or PreScore stages only, to save some overhead. + PreSkip +) + +var statusCodeNames = []string{"Success", "InternalError", "ClusterUnschedulable", "PreSkip"} + +// Name returns the name of a status code. +func (sc StatusCode) Name() string { + return statusCodeNames[sc] +} + +// Status is the result yielded by a plugin. +type Status struct { + // statusCode is the status code of a Status. + statusCode StatusCode + // The reasons behind a Status; this should be empty if the Status is of the status code + // Success. + reasons []string + // The error associated with a Status; this is only set when the Status is of the status code + // internalError. + err error + // The name of the plugin which returns the Status. + sourcePlugin string +} + +// code returns the status code of a Status. +func (s *Status) code() StatusCode { + if s == nil { + return Success + } + return s.statusCode +} + +// IsSuccess returns if a Status is of the status code Success. +func (s *Status) IsSuccess() bool { + return s.code() == Success +} + +// IsInternalError returns if a Status is of the status code interalError. +func (s *Status) IsInteralError() bool { + return s.code() == internalError +} + +// IsPreSkip returns if a Status is of the status code PreSkip. +func (s *Status) IsPreSkip() bool { + return s.code() == PreSkip +} + +// IsClusterUnschedulable returns if a Status is of the status code ClusterUnschedulable. +func (s *Status) IsClusterUnschedulable() bool { + return s.code() == ClusterUnschedulable +} + +// Reasons returns the reasons of a Status. +func (s *Status) Reasons() []string { + if s == nil { + return []string{} + } + return s.reasons +} + +// SourcePlugin returns the source plugin associated with a Status. +func (s *Status) SourcePlugin() string { + if s == nil { + return "" + } + return s.sourcePlugin +} + +// InternalError returns the error associated with a Status. +func (s *Status) InternalError() error { + if s == nil { + return nil + } + return s.err +} + +// String returns the description of a Status. +func (s *Status) String() string { + if s == nil { + return s.code().Name() + } + desc := []string{s.code().Name()} + if s.err != nil { + desc = append(desc, s.err.Error()) + } + desc = append(desc, s.reasons...) + return strings.Join(desc, ", ") +} + +// NewNonErrorStatus returns a Status with a non-error status code. +// To return a Status of the internalError status code, use FromError() instead. +func NewNonErrorStatus(code StatusCode, sourcePlugin string, reasons ...string) *Status { + return &Status{ + statusCode: code, + reasons: reasons, + sourcePlugin: sourcePlugin, + } +} + +// FromError returns a Status from an error. +func FromError(err error, sourcePlugin string, reasons ...string) *Status { + return &Status{ + statusCode: internalError, + reasons: reasons, + err: err, + sourcePlugin: sourcePlugin, + } +} diff --git a/pkg/scheduler/framework/status_test.go b/pkg/scheduler/framework/status_test.go new file mode 100644 index 000000000..6875f6c7f --- /dev/null +++ b/pkg/scheduler/framework/status_test.go @@ -0,0 +1,141 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package framework + +import ( + "fmt" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +const ( + dummyPlugin = "dummyPlugin" +) + +var ( + dummyReasons = []string{"reason1", "reason2"} +) + +func TestNonNilStatusMethods(t *testing.T) { + testCases := []struct { + name string + statusCode StatusCode + reasons []string + err error + sourcePlugin string + desc string + }{ + { + name: "status success", + statusCode: Success, + reasons: []string{}, + sourcePlugin: dummyPlugin, + }, + { + name: "status error", + statusCode: internalError, + err: fmt.Errorf("an unexpected error has occurred"), + reasons: dummyReasons, + sourcePlugin: dummyPlugin, + }, + { + name: "status unschedulable", + statusCode: ClusterUnschedulable, + reasons: dummyReasons, + sourcePlugin: dummyPlugin, + }, + { + name: "status preskip", + statusCode: PreSkip, + reasons: dummyReasons, + sourcePlugin: dummyPlugin, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var status *Status + if tc.err != nil { + status = FromError(tc.err, tc.sourcePlugin, tc.reasons...) + } else { + status = NewNonErrorStatus(tc.statusCode, tc.sourcePlugin, tc.reasons...) + } + + wantCheckOutputs := make([]bool, len(statusCodeNames)) + wantCheckOutputs[tc.statusCode] = true + checkFuncs := []func() bool{ + status.IsSuccess, + status.IsInteralError, + status.IsClusterUnschedulable, + status.IsPreSkip, + } + for idx, checkFunc := range checkFuncs { + if wantCheckOutputs[idx] != checkFunc() { + t.Fatalf("check function for %s = %t, want %t", statusCodeNames[idx], checkFunc(), wantCheckOutputs[idx]) + } + } + + if !cmp.Equal(status.Reasons(), tc.reasons) { + t.Fatalf("Reasons() = %v, want %v", status.Reasons(), tc.reasons) + } + + if !cmp.Equal(status.SourcePlugin(), tc.sourcePlugin) { + t.Fatalf("SourcePlugin() = %s, want %s", status.SourcePlugin(), tc.sourcePlugin) + } + + if !cmp.Equal(status.InternalError(), tc.err, cmpopts.EquateErrors()) { + t.Fatalf("InternalError() = %v, want %v", status.InternalError(), tc.err) + } + + descElems := []string{statusCodeNames[tc.statusCode]} + if tc.err != nil { + descElems = append(descElems, tc.err.Error()) + } + descElems = append(descElems, tc.reasons...) + wantDesc := strings.Join(descElems, ", ") + if !cmp.Equal(status.String(), wantDesc) { + t.Fatalf("String() = %s, want %s", status.String(), wantDesc) + } + }) + } +} + +func TestNilStatusMethods(t *testing.T) { + var status *Status + wantCheckOutputs := make([]bool, len(statusCodeNames)) + wantCheckOutputs[Success] = true + checkFuncs := []func() bool{ + status.IsSuccess, + status.IsInteralError, + status.IsClusterUnschedulable, + status.IsPreSkip, + } + for idx, checkFunc := range checkFuncs { + if wantCheckOutputs[idx] != checkFunc() { + t.Fatalf("check function for %s = %t, want %t", statusCodeNames[idx], checkFunc(), wantCheckOutputs[idx]) + } + } + + if !cmp.Equal(status.Reasons(), []string{}) { + t.Fatalf("Reasons() = %v, want %v", status.Reasons(), []string{}) + } + + if !cmp.Equal(status.SourcePlugin(), "") { + t.Fatalf("SourcePlugin() = %s, want %s", status.SourcePlugin(), "") + } + + if !cmp.Equal(status.InternalError(), nil, cmpopts.EquateErrors()) { + t.Fatalf("InternalError() = %v, want %v", status.InternalError(), nil) + } + + wantDesc := statusCodeNames[Success] + if !cmp.Equal(status.String(), wantDesc) { + t.Fatalf("String() = %s, want %s", status.String(), wantDesc) + } +} diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go new file mode 100644 index 000000000..935283478 --- /dev/null +++ b/pkg/scheduler/queue/queue.go @@ -0,0 +1,129 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +// Package schedulingqueue features a scheduling queue, which keeps track of all placements for the scheduler +// to schedule. +package schedulingqueue + +import ( + "k8s.io/client-go/util/workqueue" +) + +// PolicySnapshotKey is the unique identifier for a PolicySnapshot stored in a scheduling queue. +type PolicySnapshotKey string + +// PolicySnapshotKeySchedulingQueueWriter is an interface which allows sources, such as controllers, to add +// PolicySnapshots to the scheduling queue. +type PolicySnapshotKeySchedulingQueueWriter interface { + Add(cpsKey PolicySnapshotKey) +} + +// PolicySnapshotSchedulingQueue is an interface which queues PolicySnapshots for the scheduler to schedule. +type PolicySnapshotKeySchedulingQueue interface { + PolicySnapshotKeySchedulingQueueWriter + + // Run starts the scheduling queue. + Run() + // Close closes the scheduling queue immediately. + Close() + // CloseWithDrain closes the scheduling queue after all items in the queue are processed. + CloseWithDrain() + // NextClusterPolicySnapshotKey returns the next-in-line PolicySnapshot key for the scheduler to schedule. + NextClusterPolicySnapshotKey() PolicySnapshotKey + // Done marks a PolicySnapshot key as done. + Done(cpsKey PolicySnapshotKey) +} + +// simplePolicySnapshotKeySchedulingQueue is a simple implementation of PolicySnapshotKeySchedulingQueue. +// +// At this moment, one single workqueue would suffice, as sources such as the cluster watcher, +// the binding watcher, etc., can catch all changes that need the scheduler's attention. +// In the future, when more features, e.g., inter-placement affinity/anti-affinity, are added, +// more queues, such as a backoff queue, might become necessary. +type simplePolicySnapshotKeySchedulingQueue struct { + policySanpshotWorkQueue workqueue.RateLimitingInterface +} + +// Verify that simplePolicySnapshotKeySchedulingQueue implements PolicySnapshotKeySchedulingQueue +// at compile time. +var _ PolicySnapshotKeySchedulingQueue = &simplePolicySnapshotKeySchedulingQueue{} + +// simplePolicySnapshotKeySchedulingQueueOptions are the options for the simplePolicySnapshotKeySchedulingQueue. +type simplePolicySnapshotKeySchedulingQueueOptions struct { + workqueueRateLimiter workqueue.RateLimiter + workqueueName string +} + +// Option is the function that configures the simplePolicySnapshotKeySchedulingQueue. +type Option func(*simplePolicySnapshotKeySchedulingQueueOptions) + +var defaultSimplePolicySnapshotKeySchedulingQueueOptions = simplePolicySnapshotKeySchedulingQueueOptions{ + workqueueRateLimiter: workqueue.DefaultControllerRateLimiter(), + workqueueName: "policySnapshotKeySchedulingQueue", +} + +// WithWorkqueueRateLimiter sets a rate limiter for the workqueue. +func WithWorkqueueRateLimiter(rateLimiter workqueue.RateLimiter) Option { + return func(o *simplePolicySnapshotKeySchedulingQueueOptions) { + o.workqueueRateLimiter = rateLimiter + } +} + +// WithWorkqueueName sets a name for the workqueue. +func WithWorkqueueName(name string) Option { + return func(o *simplePolicySnapshotKeySchedulingQueueOptions) { + o.workqueueName = name + } +} + +// Run starts the scheduling queue. +// +// At this moment, Run is an no-op as there is only one queue present; in the future, +// when more queues are added, Run would start goroutines that move items between queues as +// appropriate. +func (sq *simplePolicySnapshotKeySchedulingQueue) Run() {} + +// Close shuts down the scheduling queue immediately. +func (sq *simplePolicySnapshotKeySchedulingQueue) Close() { + sq.policySanpshotWorkQueue.ShutDown() +} + +// CloseWithDrain shuts down the scheduling queue and returns until all items are processed. +func (sq *simplePolicySnapshotKeySchedulingQueue) CloseWithDrain() { + sq.policySanpshotWorkQueue.ShutDownWithDrain() +} + +// NextClusterPolicySnapshotKey returns the next PolicySnapshot key in the work queue for +// the scheduler to process. +func (sq *simplePolicySnapshotKeySchedulingQueue) NextClusterPolicySnapshotKey() PolicySnapshotKey { + // This will block on a condition variable if the queue is empty. + cpsKey, shutdown := sq.policySanpshotWorkQueue.Get() + if shutdown { + return "" + } + return cpsKey.(PolicySnapshotKey) +} + +// Done marks a PolicySnapshot key as done. +func (sq *simplePolicySnapshotKeySchedulingQueue) Done(cpsKey PolicySnapshotKey) { + sq.policySanpshotWorkQueue.Done(cpsKey) +} + +// Add adds a PolicySnapshot key to the work queue. +func (sq *simplePolicySnapshotKeySchedulingQueue) Add(cpsKey PolicySnapshotKey) { + sq.policySanpshotWorkQueue.Add(cpsKey) +} + +// NewSimplePolicySnapshotKeySchedulingQueue returns a simplePolicySnapshotKeySchedulingQueue. +func NewSimplePolicySnapshotKeySchedulingQueue(opts ...Option) PolicySnapshotKeySchedulingQueue { + options := defaultSimplePolicySnapshotKeySchedulingQueueOptions + for _, opt := range opts { + opt(&options) + } + + return &simplePolicySnapshotKeySchedulingQueue{ + policySanpshotWorkQueue: workqueue.NewNamedRateLimitingQueue(options.workqueueRateLimiter, options.workqueueName), + } +} diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go new file mode 100644 index 000000000..8749aa5d1 --- /dev/null +++ b/pkg/scheduler/queue/queue_test.go @@ -0,0 +1,37 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package schedulingqueue + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +// TestSimplePolicySnapshotKeySchedulingQueueBasicOps tests the basic ops +// (Add, NextClusterPolicySnapshotKey, Done) of a simplePolicySnapshotKeySchedulingQueue. +func TestSimplePolicySnapshotKeySchedulingQueueBasicOps(t *testing.T) { + sq := NewSimplePolicySnapshotKeySchedulingQueue() + sq.Run() + + keysToAdd := []PolicySnapshotKey{"A", "B", "C", "D", "E"} + for _, key := range keysToAdd { + sq.Add(key) + } + + keysRecved := []PolicySnapshotKey{} + for i := 0; i < len(keysToAdd); i++ { + key := sq.NextClusterPolicySnapshotKey() + keysRecved = append(keysRecved, key) + sq.Done(key) + } + + if !cmp.Equal(keysToAdd, keysRecved) { + t.Fatalf("Received keys %v, want %v", keysRecved, keysToAdd) + } + + sq.Close() +}