From 471b93eb522c5ef96d58a6c6fc09985942eb935d Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Mon, 4 Apr 2016 19:13:02 -0700 Subject: [PATCH 1/4] scheduler Signed-off-by: Andrea Luzzardi --- manager/scheduler/scheduler.go | 35 ++++++---------------------------- 1 file changed, 6 insertions(+), 29 deletions(-) diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 96558b8d5b..47e5247a89 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -102,9 +102,7 @@ func (s *Scheduler) Run() error { case state.EventUpdateNode: pendingChanges += s.updateNode(v.Node) case state.EventDeleteNode: - if schedulableNode(v.Node) { - s.nodeHeap.remove(v.Node.ID) - } + s.nodeHeap.remove(v.Node.ID) case state.EventCommit: if pendingChanges > 0 { s.tick() @@ -191,24 +189,13 @@ func (s *Scheduler) deleteTask(t *api.Task) { } func (s *Scheduler) createNode(n *api.Node) int { - if schedulableNode(n) { - s.nodeHeap.addOrUpdateNode(n, len(s.tasksByNode[n.ID])) - return 1 - } - return 0 + s.nodeHeap.addOrUpdateNode(n, len(s.tasksByNode[n.ID])) + return 1 } func (s *Scheduler) updateNode(n *api.Node) int { - pendingChanges := 0 - - if !schedulableNode(n) { - s.nodeHeap.remove(n.ID) - } else if schedulableNode(n) { - s.nodeHeap.addOrUpdateNode(n, len(s.tasksByNode[n.ID])) - pendingChanges = 1 - } - - return pendingChanges + s.nodeHeap.addOrUpdateNode(n, len(s.tasksByNode[n.ID])) + return 1 } // tick attempts to schedule the queue. @@ -288,13 +275,7 @@ func (s *Scheduler) rollbackLocalState(decisions map[string]schedulingDecision) // scheduleTask schedules a single task. func (s *Scheduler) scheduleTask(t *api.Task) *api.Task { - meetsConstraints := func(n *api.Node) bool { - // TODO(aaronl): This is where we should check that a node - // satisfies any necessary constraints. - return true - } - - n, numTasks := s.nodeHeap.findMin(meetsConstraints, s.scanAllNodes) + n, numTasks := s.nodeHeap.findMin(s.schedulableNode, s.scanAllNodes) if n == nil { log.WithField("task.id", t.ID).Debug("No nodes available to assign tasks to") return nil @@ -323,10 +304,6 @@ func (s *Scheduler) buildNodeHeap(tx state.ReadTx) error { i := 0 for _, n := range nodes { - if !schedulableNode(n) { - continue - } - s.nodeHeap.heap = append(s.nodeHeap.heap, nodeHeapItem{node: n, numTasks: len(s.tasksByNode[n.ID])}) s.nodeHeap.index[n.ID] = i i++ From 06b0ba5f83f65e012bb617253275e322f8f2483f Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 9 Apr 2016 17:41:44 -0700 Subject: [PATCH 2/4] scheduler: Add basic filter pipelining. Signed-off-by: Andrea Luzzardi --- manager/scheduler/filter/filter.go | 54 ++++++++++++++++++++++++++++ manager/scheduler/filter/pipeline.go | 43 ++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 manager/scheduler/filter/filter.go create mode 100644 manager/scheduler/filter/pipeline.go diff --git a/manager/scheduler/filter/filter.go b/manager/scheduler/filter/filter.go new file mode 100644 index 0000000000..7a33d3c613 --- /dev/null +++ b/manager/scheduler/filter/filter.go @@ -0,0 +1,54 @@ +package filter + +import "github.com/docker/swarm-v2/api" + +type Filter interface { + // Enabled returns true when the filter is enabled for a given task. + // For instance, a constraints filter would return `false` if the task doesn't contain any constraints. + Enabled(*api.Task) bool + + // Check returns true if the task can be scheduled into the given node. + // This function should not be called if the the filter is not Enabled. + Check(*api.Task, *api.Node) bool +} + +type ReadyFilter struct { +} + +func (f *ReadyFilter) Enabled(t *api.Task) bool { + return true +} + +func (f *ReadyFilter) Check(t *api.Task, n *api.Node) bool { + return n.Status.State == api.NodeStatus_READY && + (n.Spec == nil || n.Spec.Availability == api.NodeAvailabilityActive) +} + +type ResourceFilter struct { +} + +func (f *ResourceFilter) Enabled(t *api.Task) bool { + c := t.Spec.GetContainer() + if c == nil { + return false + } + if r := c.Resources; r == nil || (r.Reservations.NanoCPUs == 0 && r.Reservations.MemoryBytes == 0) { + return false + } + return true +} + +func (f *ResourceFilter) Check(t *api.Task, n *api.Node) bool { + res := t.Spec.GetContainer().Resources.Reservations + + // TODO(aluzzardi): Actually check adding this task to the node won't put it above its resource capacity. + if res.NanoCPUs > n.Description.Resources.NanoCPUs { + return false + } + + if res.MemoryBytes > n.Description.Resources.MemoryBytes { + return false + } + + return true +} diff --git a/manager/scheduler/filter/pipeline.go b/manager/scheduler/filter/pipeline.go new file mode 100644 index 0000000000..c2a3c2a582 --- /dev/null +++ b/manager/scheduler/filter/pipeline.go @@ -0,0 +1,43 @@ +package filter + +import "github.com/docker/swarm-v2/api" + +var ( + defaultFilters = []Filter{ + // Always check for readiness first. + &ReadyFilter{}, + &ResourceFilter{}, + } +) + +type Pipeline struct { + task *api.Task + checklist []Filter +} + +func NewPipeline(t *api.Task) *Pipeline { + p := &Pipeline{ + task: t, + checklist: []Filter{}, + } + + for _, f := range defaultFilters { + if f.Enabled(t) { + p.checklist = append(p.checklist, f) + } + } + + return p +} + +// Process a node through the filter pipeline. +// Returns true if all filters pass, false otherwise. +func (p *Pipeline) Process(n *api.Node) bool { + for _, f := range p.checklist { + if !f.Check(p.task, n) { + // Immediately stop on first failure. + return false + } + } + return true +} From d5201c6ff19242fa191a12ee80848e639fd8a87e Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Mon, 11 Apr 2016 10:14:30 -0700 Subject: [PATCH 3/4] Rename nodeHeapItem to exported type NodeInfo This is in preparation for adding additional metadata to this type and using it in the filtering pipeline. Signed-off-by: Aaron Lehmann --- manager/scheduler/indexed_node_heap.go | 57 +++++++++++---------- manager/scheduler/indexed_node_heap_test.go | 16 +++--- manager/scheduler/scheduler.go | 6 +-- 3 files changed, 42 insertions(+), 37 deletions(-) diff --git a/manager/scheduler/indexed_node_heap.go b/manager/scheduler/indexed_node_heap.go index f4d13f4f65..5763f76cc4 100644 --- a/manager/scheduler/indexed_node_heap.go +++ b/manager/scheduler/indexed_node_heap.go @@ -6,15 +6,20 @@ import ( "github.com/docker/swarm-v2/api" ) -type nodeHeapItem struct { - node *api.Node - numTasks int +// NodeInfo contains a node and some additional metadata. +type NodeInfo struct { + *api.Node + NumTasks int +} + +func newNodeInfo(n *api.Node, numTasks int) NodeInfo { + return NodeInfo{Node: n, NumTasks: numTasks} } // A nodeHeap implements heap.Interface for nodes. It also includes an index // by node id. type nodeHeap struct { - heap []nodeHeapItem + heap []NodeInfo index map[string]int // map from node id to heap index } @@ -23,19 +28,19 @@ func (nh nodeHeap) Len() int { } func (nh nodeHeap) Less(i, j int) bool { - return nh.heap[i].numTasks < nh.heap[j].numTasks + return nh.heap[i].NumTasks < nh.heap[j].NumTasks } func (nh nodeHeap) Swap(i, j int) { nh.heap[i], nh.heap[j] = nh.heap[j], nh.heap[i] - nh.index[nh.heap[i].node.ID] = i - nh.index[nh.heap[j].node.ID] = j + nh.index[nh.heap[i].ID] = i + nh.index[nh.heap[j].ID] = j } func (nh *nodeHeap) Push(x interface{}) { n := len(nh.heap) - item := x.(nodeHeapItem) - nh.index[item.node.ID] = n + item := x.(NodeInfo) + nh.index[item.ID] = n nh.heap = append(nh.heap, item) } @@ -43,17 +48,17 @@ func (nh *nodeHeap) Pop() interface{} { old := nh.heap n := len(old) item := old[n-1] - delete(nh.index, item.node.ID) + delete(nh.index, item.ID) nh.heap = old[0 : n-1] return item } func (nh *nodeHeap) alloc(n int) { - nh.heap = make([]nodeHeapItem, 0, n) + nh.heap = make([]NodeInfo, 0, n) nh.index = make(map[string]int, n) } -func (nh *nodeHeap) peek() *nodeHeapItem { +func (nh *nodeHeap) peek() *NodeInfo { if len(nh.heap) == 0 { return nil } @@ -65,11 +70,11 @@ func (nh *nodeHeap) peek() *nodeHeapItem { func (nh *nodeHeap) addOrUpdateNode(n *api.Node, numTasks int) { index, ok := nh.index[n.ID] if ok { - nh.heap[index].node = n - nh.heap[index].numTasks = numTasks + nh.heap[index].Node = n + nh.heap[index].NumTasks = numTasks heap.Fix(nh, index) } else { - heap.Push(nh, nodeHeapItem{node: n, numTasks: numTasks}) + heap.Push(nh, newNodeInfo(n, numTasks)) } } @@ -78,7 +83,7 @@ func (nh *nodeHeap) addOrUpdateNode(n *api.Node, numTasks int) { func (nh *nodeHeap) updateNode(nodeID string, numTasks int) { index, ok := nh.index[nodeID] if ok { - nh.heap[index].numTasks = numTasks + nh.heap[index].NumTasks = numTasks heap.Fix(nh, index) } } @@ -86,13 +91,13 @@ func (nh *nodeHeap) updateNode(nodeID string, numTasks int) { func (nh *nodeHeap) remove(nodeID string) { index, ok := nh.index[nodeID] if ok { - nh.heap[index].numTasks = -1 + nh.heap[index].NumTasks = -1 heap.Fix(nh, index) heap.Pop(nh) } } -func (nh *nodeHeap) findMin(meetsConstraints func(*api.Node) bool, scanAllNodes bool) (*api.Node, int) { +func (nh *nodeHeap) findMin(meetsConstraints func(NodeInfo) bool, scanAllNodes bool) (*api.Node, int) { var bestNode *api.Node minTasks := int(^uint(0) >> 1) // max int nextStoppingPoint := 0 @@ -101,9 +106,9 @@ func (nh *nodeHeap) findMin(meetsConstraints func(*api.Node) bool, scanAllNodes for i := 0; i < len(nh.heap); i++ { heapEntry := nh.heap[i] - if meetsConstraints(heapEntry.node) && heapEntry.numTasks < minTasks { - bestNode = heapEntry.node - minTasks = heapEntry.numTasks + if meetsConstraints(heapEntry) && heapEntry.NumTasks < minTasks { + bestNode = heapEntry.Node + minTasks = heapEntry.NumTasks } if !scanAllNodes { if i == nextStoppingPoint && bestNode != nil { @@ -113,7 +118,7 @@ func (nh *nodeHeap) findMin(meetsConstraints func(*api.Node) bool, scanAllNodes // recursively. for j := i - levelSize + 1; j <= i; j++ { heapEntry = nh.heap[i] - if heapEntry.numTasks < minTasks { + if heapEntry.NumTasks < minTasks { newBestNode, newMinTasks := nh.findBestChildBelowThreshold(meetsConstraints, i, minTasks) if newBestNode != nil { bestNode, minTasks = newBestNode, newMinTasks @@ -131,7 +136,7 @@ func (nh *nodeHeap) findMin(meetsConstraints func(*api.Node) bool, scanAllNodes return bestNode, minTasks } -func (nh *nodeHeap) findBestChildBelowThreshold(meetsConstraints func(*api.Node) bool, index int, threshold int) (*api.Node, int) { +func (nh *nodeHeap) findBestChildBelowThreshold(meetsConstraints func(NodeInfo) bool, index int, threshold int) (*api.Node, int) { var bestNode *api.Node for i := index*2 + 1; i <= index*2+2; i++ { @@ -139,9 +144,9 @@ func (nh *nodeHeap) findBestChildBelowThreshold(meetsConstraints func(*api.Node) break } heapEntry := nh.heap[i] - if heapEntry.numTasks < threshold { - if meetsConstraints(heapEntry.node) { - bestNode, threshold = heapEntry.node, heapEntry.numTasks + if heapEntry.NumTasks < threshold { + if meetsConstraints(heapEntry) { + bestNode, threshold = heapEntry.Node, heapEntry.NumTasks } else { newBestNode, newMinTasks := nh.findBestChildBelowThreshold(meetsConstraints, i, threshold) if newBestNode != nil { diff --git a/manager/scheduler/indexed_node_heap_test.go b/manager/scheduler/indexed_node_heap_test.go index 0b5a904a49..a8154da92a 100644 --- a/manager/scheduler/indexed_node_heap_test.go +++ b/manager/scheduler/indexed_node_heap_test.go @@ -33,13 +33,13 @@ func TestFindMin(t *testing.T) { if i%100 == 0 { n.Spec.Meta.Labels["special"] = "true" } - nh.heap = append(nh.heap, nodeHeapItem{node: n, numTasks: int(rand.Int())}) + nh.heap = append(nh.heap, NodeInfo{Node: n, NumTasks: int(rand.Int())}) nh.index[n.ID] = i } heap.Init(&nh) - isSpecial := func(n *api.Node) bool { + isSpecial := func(n NodeInfo) bool { return n.Spec.Meta.Labels["special"] == "true" } @@ -50,16 +50,16 @@ func TestFindMin(t *testing.T) { var manualBestNode *api.Node manualBestTasks := uint64(math.MaxUint64) for i := 0; i < nh.Len(); i++ { - if !isSpecial(nh.heap[i].node) { + if !isSpecial(nh.heap[i]) { continue } - if uint64(nh.heap[i].numTasks) < manualBestTasks { - manualBestNode = nh.heap[i].node - manualBestTasks = uint64(nh.heap[i].numTasks) - } else if uint64(nh.heap[i].numTasks) == manualBestTasks && nh.heap[i].node == bestNode { + if uint64(nh.heap[i].NumTasks) < manualBestTasks { + manualBestNode = nh.heap[i].Node + manualBestTasks = uint64(nh.heap[i].NumTasks) + } else if uint64(nh.heap[i].NumTasks) == manualBestTasks && nh.heap[i].Node == bestNode { // prefer the node that findMin chose when // there are multiple best choices - manualBestNode = nh.heap[i].node + manualBestNode = nh.heap[i].Node } } diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 47e5247a89..94bda2855f 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -127,7 +127,7 @@ func (s *Scheduler) enqueue(t *api.Task) { s.unassignedTasks.PushBack(t) } -func schedulableNode(n *api.Node) bool { +func schedulableNode(n NodeInfo) bool { return n.Status.State == api.NodeStatus_READY && (n.Spec == nil || n.Spec.Availability == api.NodeAvailabilityActive) } @@ -275,7 +275,7 @@ func (s *Scheduler) rollbackLocalState(decisions map[string]schedulingDecision) // scheduleTask schedules a single task. func (s *Scheduler) scheduleTask(t *api.Task) *api.Task { - n, numTasks := s.nodeHeap.findMin(s.schedulableNode, s.scanAllNodes) + n, numTasks := s.nodeHeap.findMin(schedulableNode, s.scanAllNodes) if n == nil { log.WithField("task.id", t.ID).Debug("No nodes available to assign tasks to") return nil @@ -304,7 +304,7 @@ func (s *Scheduler) buildNodeHeap(tx state.ReadTx) error { i := 0 for _, n := range nodes { - s.nodeHeap.heap = append(s.nodeHeap.heap, nodeHeapItem{node: n, numTasks: len(s.tasksByNode[n.ID])}) + s.nodeHeap.heap = append(s.nodeHeap.heap, newNodeInfo(n, len(s.tasksByNode[n.ID]))) s.nodeHeap.index[n.ID] = i i++ } From c41b567e3774e38c1761d1549f896e7d2ebded8c Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Mon, 11 Apr 2016 11:28:54 -0700 Subject: [PATCH 4/4] Make scheduler keep track of node resources Fix filter pipeline to consider available resources. Hook up filter pipeline to scheduler. Signed-off-by: Aaron Lehmann --- manager/scheduler/filter.go | 69 ++++++++++++ manager/scheduler/filter/filter.go | 54 ---------- manager/scheduler/indexed_node_heap.go | 56 +++++----- manager/scheduler/indexed_node_heap_test.go | 16 +-- manager/scheduler/nodeinfo.go | 48 +++++++++ manager/scheduler/{filter => }/pipeline.go | 10 +- manager/scheduler/scheduler.go | 114 ++++++++------------ manager/scheduler/scheduler_test.go | 89 +++++++++++++++ 8 files changed, 293 insertions(+), 163 deletions(-) create mode 100644 manager/scheduler/filter.go delete mode 100644 manager/scheduler/filter/filter.go create mode 100644 manager/scheduler/nodeinfo.go rename manager/scheduler/{filter => }/pipeline.go (62%) diff --git a/manager/scheduler/filter.go b/manager/scheduler/filter.go new file mode 100644 index 0000000000..87f4aec6f8 --- /dev/null +++ b/manager/scheduler/filter.go @@ -0,0 +1,69 @@ +package scheduler + +import "github.com/docker/swarm-v2/api" + +// Filter checks whether the given task can run on the given node. +type Filter interface { + // Enabled returns true when the filter is enabled for a given task. + // For instance, a constraints filter would return `false` if the task doesn't contain any constraints. + Enabled(*api.Task) bool + + // Check returns true if the task can be scheduled into the given node. + // This function should not be called if the the filter is not Enabled. + Check(*api.Task, *NodeInfo) bool +} + +// ReadyFilter checks that the node is ready to schedule tasks. +type ReadyFilter struct { +} + +// Enabled returns true when the filter is enabled for a given task. +func (f *ReadyFilter) Enabled(t *api.Task) bool { + return true +} + +// Check returns true if the task can be scheduled into the given node. +func (f *ReadyFilter) Check(t *api.Task, n *NodeInfo) bool { + return n.Status.State == api.NodeStatus_READY && + (n.Spec == nil || n.Spec.Availability == api.NodeAvailabilityActive) +} + +// ResourceFilter checks that the node has enough resources available to run +// the task. +type ResourceFilter struct { +} + +// Enabled returns true when the filter is enabled for a given task. +func (f *ResourceFilter) Enabled(t *api.Task) bool { + c := t.Spec.GetContainer() + if c == nil { + return false + } + if r := c.Resources; r == nil || (r.Reservations.NanoCPUs == 0 && r.Reservations.MemoryBytes == 0) { + return false + } + return true +} + +// Check returns true if the task can be scheduled into the given node. +func (f *ResourceFilter) Check(t *api.Task, n *NodeInfo) bool { + if t.Spec == nil { + return true + } + container := t.Spec.GetContainer() + if container == nil || container.Resources == nil || container.Resources.Reservations == nil { + return true + } + + res := container.Resources.Reservations + + if res.NanoCPUs > n.AvailableResources.NanoCPUs { + return false + } + + if res.MemoryBytes > n.AvailableResources.MemoryBytes { + return false + } + + return true +} diff --git a/manager/scheduler/filter/filter.go b/manager/scheduler/filter/filter.go deleted file mode 100644 index 7a33d3c613..0000000000 --- a/manager/scheduler/filter/filter.go +++ /dev/null @@ -1,54 +0,0 @@ -package filter - -import "github.com/docker/swarm-v2/api" - -type Filter interface { - // Enabled returns true when the filter is enabled for a given task. - // For instance, a constraints filter would return `false` if the task doesn't contain any constraints. - Enabled(*api.Task) bool - - // Check returns true if the task can be scheduled into the given node. - // This function should not be called if the the filter is not Enabled. - Check(*api.Task, *api.Node) bool -} - -type ReadyFilter struct { -} - -func (f *ReadyFilter) Enabled(t *api.Task) bool { - return true -} - -func (f *ReadyFilter) Check(t *api.Task, n *api.Node) bool { - return n.Status.State == api.NodeStatus_READY && - (n.Spec == nil || n.Spec.Availability == api.NodeAvailabilityActive) -} - -type ResourceFilter struct { -} - -func (f *ResourceFilter) Enabled(t *api.Task) bool { - c := t.Spec.GetContainer() - if c == nil { - return false - } - if r := c.Resources; r == nil || (r.Reservations.NanoCPUs == 0 && r.Reservations.MemoryBytes == 0) { - return false - } - return true -} - -func (f *ResourceFilter) Check(t *api.Task, n *api.Node) bool { - res := t.Spec.GetContainer().Resources.Reservations - - // TODO(aluzzardi): Actually check adding this task to the node won't put it above its resource capacity. - if res.NanoCPUs > n.Description.Resources.NanoCPUs { - return false - } - - if res.MemoryBytes > n.Description.Resources.MemoryBytes { - return false - } - - return true -} diff --git a/manager/scheduler/indexed_node_heap.go b/manager/scheduler/indexed_node_heap.go index 5763f76cc4..4b825349c0 100644 --- a/manager/scheduler/indexed_node_heap.go +++ b/manager/scheduler/indexed_node_heap.go @@ -6,16 +6,6 @@ import ( "github.com/docker/swarm-v2/api" ) -// NodeInfo contains a node and some additional metadata. -type NodeInfo struct { - *api.Node - NumTasks int -} - -func newNodeInfo(n *api.Node, numTasks int) NodeInfo { - return NodeInfo{Node: n, NumTasks: numTasks} -} - // A nodeHeap implements heap.Interface for nodes. It also includes an index // by node id. type nodeHeap struct { @@ -28,7 +18,7 @@ func (nh nodeHeap) Len() int { } func (nh nodeHeap) Less(i, j int) bool { - return nh.heap[i].NumTasks < nh.heap[j].NumTasks + return len(nh.heap[i].Tasks) < len(nh.heap[j].Tasks) } func (nh nodeHeap) Swap(i, j int) { @@ -65,25 +55,33 @@ func (nh *nodeHeap) peek() *NodeInfo { return &nh.heap[0] } +// nodeInfo returns the NodeInfo struct for a given node identified by its ID. +func (nh *nodeHeap) nodeInfo(nodeID string) NodeInfo { + index, ok := nh.index[nodeID] + if ok { + return nh.heap[index] + } + return NodeInfo{} +} + // addOrUpdateNode sets the number of tasks for a given node. It adds the node // to the heap if it wasn't already tracked. -func (nh *nodeHeap) addOrUpdateNode(n *api.Node, numTasks int) { +func (nh *nodeHeap) addOrUpdateNode(n NodeInfo) { index, ok := nh.index[n.ID] if ok { - nh.heap[index].Node = n - nh.heap[index].NumTasks = numTasks + nh.heap[index] = n heap.Fix(nh, index) } else { - heap.Push(nh, newNodeInfo(n, numTasks)) + heap.Push(nh, n) } } // updateNode sets the number of tasks for a given node. It ignores the update // if the node isn't already tracked in the heap. -func (nh *nodeHeap) updateNode(nodeID string, numTasks int) { - index, ok := nh.index[nodeID] +func (nh *nodeHeap) updateNode(n NodeInfo) { + index, ok := nh.index[n.ID] if ok { - nh.heap[index].NumTasks = numTasks + nh.heap[index] = n heap.Fix(nh, index) } } @@ -91,24 +89,24 @@ func (nh *nodeHeap) updateNode(nodeID string, numTasks int) { func (nh *nodeHeap) remove(nodeID string) { index, ok := nh.index[nodeID] if ok { - nh.heap[index].NumTasks = -1 + nh.heap[index].Tasks = nil heap.Fix(nh, index) heap.Pop(nh) } } -func (nh *nodeHeap) findMin(meetsConstraints func(NodeInfo) bool, scanAllNodes bool) (*api.Node, int) { +func (nh *nodeHeap) findMin(meetsConstraints func(*NodeInfo) bool, scanAllNodes bool) (*api.Node, int) { var bestNode *api.Node minTasks := int(^uint(0) >> 1) // max int nextStoppingPoint := 0 levelSize := 1 for i := 0; i < len(nh.heap); i++ { - heapEntry := nh.heap[i] + heapEntry := &nh.heap[i] - if meetsConstraints(heapEntry) && heapEntry.NumTasks < minTasks { + if meetsConstraints(heapEntry) && len(heapEntry.Tasks) < minTasks { bestNode = heapEntry.Node - minTasks = heapEntry.NumTasks + minTasks = len(heapEntry.Tasks) } if !scanAllNodes { if i == nextStoppingPoint && bestNode != nil { @@ -117,8 +115,8 @@ func (nh *nodeHeap) findMin(meetsConstraints func(NodeInfo) bool, scanAllNodes b // constraints, check their children // recursively. for j := i - levelSize + 1; j <= i; j++ { - heapEntry = nh.heap[i] - if heapEntry.NumTasks < minTasks { + heapEntry = &nh.heap[i] + if len(heapEntry.Tasks) < minTasks { newBestNode, newMinTasks := nh.findBestChildBelowThreshold(meetsConstraints, i, minTasks) if newBestNode != nil { bestNode, minTasks = newBestNode, newMinTasks @@ -136,17 +134,17 @@ func (nh *nodeHeap) findMin(meetsConstraints func(NodeInfo) bool, scanAllNodes b return bestNode, minTasks } -func (nh *nodeHeap) findBestChildBelowThreshold(meetsConstraints func(NodeInfo) bool, index int, threshold int) (*api.Node, int) { +func (nh *nodeHeap) findBestChildBelowThreshold(meetsConstraints func(*NodeInfo) bool, index int, threshold int) (*api.Node, int) { var bestNode *api.Node for i := index*2 + 1; i <= index*2+2; i++ { if i <= len(nh.heap) { break } - heapEntry := nh.heap[i] - if heapEntry.NumTasks < threshold { + heapEntry := &nh.heap[i] + if len(heapEntry.Tasks) < threshold { if meetsConstraints(heapEntry) { - bestNode, threshold = heapEntry.Node, heapEntry.NumTasks + bestNode, threshold = heapEntry.Node, len(heapEntry.Tasks) } else { newBestNode, newMinTasks := nh.findBestChildBelowThreshold(meetsConstraints, i, threshold) if newBestNode != nil { diff --git a/manager/scheduler/indexed_node_heap_test.go b/manager/scheduler/indexed_node_heap_test.go index a8154da92a..b01c9f3296 100644 --- a/manager/scheduler/indexed_node_heap_test.go +++ b/manager/scheduler/indexed_node_heap_test.go @@ -33,13 +33,17 @@ func TestFindMin(t *testing.T) { if i%100 == 0 { n.Spec.Meta.Labels["special"] = "true" } - nh.heap = append(nh.heap, NodeInfo{Node: n, NumTasks: int(rand.Int())}) + tasks := make(map[string]*api.Task) + for i := rand.Intn(25); i > 0; i-- { + tasks[strconv.Itoa(i)] = &api.Task{ID: strconv.Itoa(i)} + } + nh.heap = append(nh.heap, NodeInfo{Node: n, Tasks: tasks}) nh.index[n.ID] = i } heap.Init(&nh) - isSpecial := func(n NodeInfo) bool { + isSpecial := func(n *NodeInfo) bool { return n.Spec.Meta.Labels["special"] == "true" } @@ -50,13 +54,13 @@ func TestFindMin(t *testing.T) { var manualBestNode *api.Node manualBestTasks := uint64(math.MaxUint64) for i := 0; i < nh.Len(); i++ { - if !isSpecial(nh.heap[i]) { + if !isSpecial(&nh.heap[i]) { continue } - if uint64(nh.heap[i].NumTasks) < manualBestTasks { + if uint64(len(nh.heap[i].Tasks)) < manualBestTasks { manualBestNode = nh.heap[i].Node - manualBestTasks = uint64(nh.heap[i].NumTasks) - } else if uint64(nh.heap[i].NumTasks) == manualBestTasks && nh.heap[i].Node == bestNode { + manualBestTasks = uint64(len(nh.heap[i].Tasks)) + } else if uint64(len(nh.heap[i].Tasks)) == manualBestTasks && nh.heap[i].Node == bestNode { // prefer the node that findMin chose when // there are multiple best choices manualBestNode = nh.heap[i].Node diff --git a/manager/scheduler/nodeinfo.go b/manager/scheduler/nodeinfo.go new file mode 100644 index 0000000000..02c17b275e --- /dev/null +++ b/manager/scheduler/nodeinfo.go @@ -0,0 +1,48 @@ +package scheduler + +import "github.com/docker/swarm-v2/api" + +// NodeInfo contains a node and some additional metadata. +type NodeInfo struct { + *api.Node + Tasks map[string]*api.Task + AvailableResources api.Resources +} + +func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo { + return NodeInfo{ + Node: n, + Tasks: tasks, + AvailableResources: availableResources, + } +} + +func (nodeInfo *NodeInfo) removeTask(t *api.Task) { + reservations := taskReservations(t) + if nodeInfo.Tasks != nil { + delete(nodeInfo.Tasks, t.ID) + } + nodeInfo.AvailableResources.MemoryBytes += reservations.MemoryBytes + nodeInfo.AvailableResources.NanoCPUs += reservations.NanoCPUs +} + +func (nodeInfo *NodeInfo) addTask(t *api.Task) { + reservations := taskReservations(t) + if nodeInfo.Tasks == nil { + nodeInfo.Tasks = make(map[string]*api.Task) + } + nodeInfo.Tasks[t.ID] = t + nodeInfo.AvailableResources.MemoryBytes -= reservations.MemoryBytes + nodeInfo.AvailableResources.NanoCPUs -= reservations.NanoCPUs +} + +func taskReservations(t *api.Task) (reservations api.Resources) { + if t.Spec == nil { + return + } + container := t.Spec.GetContainer() + if container != nil && container.Resources != nil && container.Resources.Reservations != nil { + reservations = *container.Resources.Reservations + } + return +} diff --git a/manager/scheduler/filter/pipeline.go b/manager/scheduler/pipeline.go similarity index 62% rename from manager/scheduler/filter/pipeline.go rename to manager/scheduler/pipeline.go index c2a3c2a582..ac5aa7a7e9 100644 --- a/manager/scheduler/filter/pipeline.go +++ b/manager/scheduler/pipeline.go @@ -1,4 +1,4 @@ -package filter +package scheduler import "github.com/docker/swarm-v2/api" @@ -10,17 +10,23 @@ var ( } ) +// Pipeline runs a set of filters against nodes. type Pipeline struct { task *api.Task checklist []Filter } +// NewPipeline returns a pipeline with the default set of filters. func NewPipeline(t *api.Task) *Pipeline { p := &Pipeline{ task: t, checklist: []Filter{}, } + // FIXME(aaronl): This is quite alloc-heavy. It may be better to + // redesign Pipeline so it doesn't require allocating a separate slice + // for each task. Maybe it could use a bitfield to specify which + // filters are enabled. for _, f := range defaultFilters { if f.Enabled(t) { p.checklist = append(p.checklist, f) @@ -32,7 +38,7 @@ func NewPipeline(t *api.Task) *Pipeline { // Process a node through the filter pipeline. // Returns true if all filters pass, false otherwise. -func (p *Pipeline) Process(n *api.Node) bool { +func (p *Pipeline) Process(n *NodeInfo) bool { for _, f := range p.checklist { if !f.Check(p.task, n) { // Immediately stop on first failure. diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 94bda2855f..ac3134f248 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -21,7 +21,6 @@ type Scheduler struct { unassignedTasks *list.List nodeHeap nodeHeap allTasks map[string]*api.Task - tasksByNode map[string]map[string]*api.Task // stopChan signals to the state machine to stop running stopChan chan struct{} @@ -40,7 +39,6 @@ func New(store state.WatchableStore) *Scheduler { store: store, unassignedTasks: list.New(), allTasks: make(map[string]*api.Task), - tasksByNode: make(map[string]map[string]*api.Task), stopChan: make(chan struct{}), doneChan: make(chan struct{}), } @@ -51,19 +49,21 @@ func (s *Scheduler) setupTasksList(tx state.ReadTx) error { if err != nil { return err } + + tasksByNode := make(map[string]map[string]*api.Task) for _, t := range tasks { s.allTasks[t.ID] = t if t.NodeID == "" { s.enqueue(t) } else { - if s.tasksByNode[t.NodeID] == nil { - s.tasksByNode[t.NodeID] = make(map[string]*api.Task) + if tasksByNode[t.NodeID] == nil { + tasksByNode[t.NodeID] = make(map[string]*api.Task) } - s.tasksByNode[t.NodeID][t.ID] = t + tasksByNode[t.NodeID][t.ID] = t } } - if err := s.buildNodeHeap(tx); err != nil { + if err := s.buildNodeHeap(tx, tasksByNode); err != nil { return err } @@ -98,9 +98,11 @@ func (s *Scheduler) Run() error { case state.EventDeleteTask: s.deleteTask(v.Task) case state.EventCreateNode: - pendingChanges += s.createNode(v.Node) + s.createOrUpdateNode(v.Node) + pendingChanges++ case state.EventUpdateNode: - pendingChanges += s.updateNode(v.Node) + s.createOrUpdateNode(v.Node) + pendingChanges++ case state.EventDeleteNode: s.nodeHeap.remove(v.Node.ID) case state.EventCommit: @@ -127,11 +129,6 @@ func (s *Scheduler) enqueue(t *api.Task) { s.unassignedTasks.PushBack(t) } -func schedulableNode(n NodeInfo) bool { - return n.Status.State == api.NodeStatus_READY && - (n.Spec == nil || n.Spec.Availability == api.NodeAvailabilityActive) -} - func (s *Scheduler) createTask(t *api.Task) int { s.allTasks[t.ID] = t if t.NodeID == "" { @@ -139,63 +136,35 @@ func (s *Scheduler) createTask(t *api.Task) int { s.enqueue(t) return 1 } - if s.tasksByNode[t.NodeID] == nil { - s.tasksByNode[t.NodeID] = make(map[string]*api.Task) - } - s.tasksByNode[t.NodeID][t.ID] = t - s.nodeHeap.updateNode(t.NodeID, len(s.tasksByNode[t.NodeID])) + nodeInfo := s.nodeHeap.nodeInfo(t.NodeID) + nodeInfo.addTask(t) + s.nodeHeap.updateNode(nodeInfo) return 0 } func (s *Scheduler) updateTask(t *api.Task) int { oldTask := s.allTasks[t.ID] - if oldTask != nil && t.NodeID != oldTask.NodeID { - if s.tasksByNode[oldTask.NodeID] != nil { - delete(s.tasksByNode[oldTask.NodeID], oldTask.ID) - if len(s.tasksByNode[oldTask.NodeID]) == 0 { - delete(s.tasksByNode, oldTask.NodeID) - } - s.nodeHeap.updateNode(oldTask.NodeID, len(s.tasksByNode[oldTask.NodeID])) - } - if t.NodeID != "" { - if s.tasksByNode[t.NodeID] == nil { - s.tasksByNode[t.NodeID] = make(map[string]*api.Task) - } - s.tasksByNode[t.NodeID][t.ID] = t - s.nodeHeap.updateNode(t.NodeID, len(s.tasksByNode[t.NodeID])) - } + if oldTask != nil { + s.deleteTask(oldTask) } - s.allTasks[t.ID] = t - - if t.NodeID == "" { - // unassigned task - s.enqueue(t) - return 1 - } - return 0 + return s.createTask(t) } func (s *Scheduler) deleteTask(t *api.Task) { delete(s.allTasks, t.ID) - if s.tasksByNode[t.NodeID] != nil { - delete(s.tasksByNode[t.NodeID], t.ID) - if len(s.tasksByNode[t.NodeID]) == 0 { - delete(s.tasksByNode, t.NodeID) - } - s.nodeHeap.updateNode(t.NodeID, len(s.tasksByNode[t.NodeID])) - } + nodeInfo := s.nodeHeap.nodeInfo(t.NodeID) + nodeInfo.removeTask(t) + s.nodeHeap.updateNode(nodeInfo) } -func (s *Scheduler) createNode(n *api.Node) int { - s.nodeHeap.addOrUpdateNode(n, len(s.tasksByNode[n.ID])) - return 1 -} - -func (s *Scheduler) updateNode(n *api.Node) int { - s.nodeHeap.addOrUpdateNode(n, len(s.tasksByNode[n.ID])) - return 1 +func (s *Scheduler) createOrUpdateNode(n *api.Node) { + var resources api.Resources + if n.Description != nil && n.Description.Resources != nil { + resources = *n.Description.Resources + } + s.nodeHeap.addOrUpdateNode(newNodeInfo(n, map[string]*api.Task{}, resources)) } // tick attempts to schedule the queue. @@ -259,15 +228,12 @@ func (s *Scheduler) tick() { } func (s *Scheduler) rollbackLocalState(decisions map[string]schedulingDecision) { - for taskID, decision := range decisions { - assignedNodeID := decision.new.NodeID - + for _, decision := range decisions { s.allTasks[decision.old.ID] = decision.old - delete(s.tasksByNode[assignedNodeID], taskID) - if len(s.tasksByNode[assignedNodeID]) == 0 { - delete(s.tasksByNode, assignedNodeID) - } - s.nodeHeap.updateNode(assignedNodeID, len(s.tasksByNode[assignedNodeID])) + + nodeInfo := s.nodeHeap.nodeInfo(decision.new.NodeID) + nodeInfo.removeTask(decision.new) + s.nodeHeap.updateNode(nodeInfo) s.enqueue(decision.old) } @@ -275,7 +241,8 @@ func (s *Scheduler) rollbackLocalState(decisions map[string]schedulingDecision) // scheduleTask schedules a single task. func (s *Scheduler) scheduleTask(t *api.Task) *api.Task { - n, numTasks := s.nodeHeap.findMin(schedulableNode, s.scanAllNodes) + pipeline := NewPipeline(t) + n, _ := s.nodeHeap.findMin(pipeline.Process, s.scanAllNodes) if n == nil { log.WithField("task.id", t.ID).Debug("No nodes available to assign tasks to") return nil @@ -286,15 +253,14 @@ func (s *Scheduler) scheduleTask(t *api.Task) *api.Task { newT.NodeID = n.ID newT.Status = &api.TaskStatus{State: api.TaskStateAssigned} s.allTasks[t.ID] = &newT - if s.tasksByNode[t.NodeID] == nil { - s.tasksByNode[t.NodeID] = make(map[string]*api.Task) - } - s.tasksByNode[t.NodeID][t.ID] = &newT - s.nodeHeap.updateNode(n.ID, numTasks+1) + + nodeInfo := s.nodeHeap.nodeInfo(n.ID) + nodeInfo.addTask(&newT) + s.nodeHeap.updateNode(nodeInfo) return &newT } -func (s *Scheduler) buildNodeHeap(tx state.ReadTx) error { +func (s *Scheduler) buildNodeHeap(tx state.ReadTx, tasksByNode map[string]map[string]*api.Task) error { nodes, err := tx.Nodes().Find(state.All) if err != nil { return err @@ -304,7 +270,11 @@ func (s *Scheduler) buildNodeHeap(tx state.ReadTx) error { i := 0 for _, n := range nodes { - s.nodeHeap.heap = append(s.nodeHeap.heap, newNodeInfo(n, len(s.tasksByNode[n.ID]))) + var resources api.Resources + if n.Description != nil && n.Description.Resources != nil { + resources = *n.Description.Resources + } + s.nodeHeap.heap = append(s.nodeHeap.heap, newNodeInfo(n, tasksByNode[n.ID], resources)) s.nodeHeap.index[n.ID] = i i++ } diff --git a/manager/scheduler/scheduler_test.go b/manager/scheduler/scheduler_test.go index 0c40b2a45c..61658d4a32 100644 --- a/manager/scheduler/scheduler_test.go +++ b/manager/scheduler/scheduler_test.go @@ -374,6 +374,95 @@ func TestSchedulerNoReadyNodes(t *testing.T) { scheduler.Stop() } +func TestSchedulerResourceConstraint(t *testing.T) { + // Create a ready node without enough memory to run the task. + underprovisionedNode := &api.Node{ + ID: "underprovisioned", + Spec: &api.NodeSpec{ + Meta: api.Meta{ + Name: "underprovisioned", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + Description: &api.NodeDescription{ + Resources: &api.Resources{ + NanoCPUs: 1e9, + MemoryBytes: 1e9, + }, + }, + } + + initialTask := &api.Task{ + ID: "id1", + Spec: &api.TaskSpec{ + Runtime: &api.TaskSpec_Container{ + Container: &api.ContainerSpec{ + Resources: &api.ResourceRequirements{ + Reservations: &api.Resources{ + MemoryBytes: 2e9, + }, + }, + }, + }, + }, + Meta: api.Meta{ + Name: "name1", + }, + } + + store := state.NewMemoryStore(nil) + assert.NotNil(t, store) + + err := store.Update(func(tx state.Tx) error { + // Add initial node and task + assert.NoError(t, tx.Tasks().Create(initialTask)) + assert.NoError(t, tx.Nodes().Create(underprovisionedNode)) + return nil + }) + assert.NoError(t, err) + + scheduler := New(store) + + watch, cancel := state.Watch(store.WatchQueue(), state.EventUpdateTask{}) + defer cancel() + + go func() { + assert.NoError(t, scheduler.Run()) + }() + + err = store.Update(func(tx state.Tx) error { + // Create a node with enough memory. The task should get + // assigned to this node. + node := &api.Node{ + ID: "bignode", + Spec: &api.NodeSpec{ + Meta: api.Meta{ + Name: "bignode", + }, + }, + Description: &api.NodeDescription{ + Resources: &api.Resources{ + NanoCPUs: 4e9, + MemoryBytes: 8e9, + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + } + assert.NoError(t, tx.Nodes().Create(node)) + return nil + }) + assert.NoError(t, err) + + assignment := watchAssignment(t, watch) + assert.Equal(t, "bignode", assignment.NodeID) + + scheduler.Stop() +} + func watchAssignment(t *testing.T, watch chan events.Event) *api.Task { for { select {