Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions manager/scheduler/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package scheduler

import "github.com/docker/swarm-v2/api"

// Filter checks whether the given task can run on the given node.
type Filter interface {
// Enabled returns true when the filter is enabled for a given task.
// For instance, a constraints filter would return `false` if the task doesn't contain any constraints.
Enabled(*api.Task) bool

// Check returns true if the task can be scheduled into the given node.
// This function should not be called if the the filter is not Enabled.
Check(*api.Task, *NodeInfo) bool
}

// ReadyFilter checks that the node is ready to schedule tasks.
type ReadyFilter struct {
}

// Enabled returns true when the filter is enabled for a given task.
func (f *ReadyFilter) Enabled(t *api.Task) bool {
return true
}

// Check returns true if the task can be scheduled into the given node.
func (f *ReadyFilter) Check(t *api.Task, n *NodeInfo) bool {
return n.Status.State == api.NodeStatus_READY &&
(n.Spec == nil || n.Spec.Availability == api.NodeAvailabilityActive)
}

// ResourceFilter checks that the node has enough resources available to run
// the task.
type ResourceFilter struct {
}

// Enabled returns true when the filter is enabled for a given task.
func (f *ResourceFilter) Enabled(t *api.Task) bool {
c := t.Spec.GetContainer()
if c == nil {
return false
}
if r := c.Resources; r == nil || (r.Reservations.NanoCPUs == 0 && r.Reservations.MemoryBytes == 0) {
return false
}
return true
}

// Check returns true if the task can be scheduled into the given node.
func (f *ResourceFilter) Check(t *api.Task, n *NodeInfo) bool {
if t.Spec == nil {
return true
}
container := t.Spec.GetContainer()
if container == nil || container.Resources == nil || container.Resources.Reservations == nil {
return true
}

res := container.Resources.Reservations

if res.NanoCPUs > n.AvailableResources.NanoCPUs {
return false
}

if res.MemoryBytes > n.AvailableResources.MemoryBytes {
return false
}

return true
}
71 changes: 37 additions & 34 deletions manager/scheduler/indexed_node_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@ import (
"github.com/docker/swarm-v2/api"
)

type nodeHeapItem struct {
node *api.Node
numTasks int
}

// A nodeHeap implements heap.Interface for nodes. It also includes an index
// by node id.
type nodeHeap struct {
heap []nodeHeapItem
heap []NodeInfo
index map[string]int // map from node id to heap index
}

Expand All @@ -23,87 +18,95 @@ func (nh nodeHeap) Len() int {
}

func (nh nodeHeap) Less(i, j int) bool {
return nh.heap[i].numTasks < nh.heap[j].numTasks
return len(nh.heap[i].Tasks) < len(nh.heap[j].Tasks)
}

func (nh nodeHeap) Swap(i, j int) {
nh.heap[i], nh.heap[j] = nh.heap[j], nh.heap[i]
nh.index[nh.heap[i].node.ID] = i
nh.index[nh.heap[j].node.ID] = j
nh.index[nh.heap[i].ID] = i
nh.index[nh.heap[j].ID] = j
}

func (nh *nodeHeap) Push(x interface{}) {
n := len(nh.heap)
item := x.(nodeHeapItem)
nh.index[item.node.ID] = n
item := x.(NodeInfo)
nh.index[item.ID] = n
nh.heap = append(nh.heap, item)
}

func (nh *nodeHeap) Pop() interface{} {
old := nh.heap
n := len(old)
item := old[n-1]
delete(nh.index, item.node.ID)
delete(nh.index, item.ID)
nh.heap = old[0 : n-1]
return item
}

func (nh *nodeHeap) alloc(n int) {
nh.heap = make([]nodeHeapItem, 0, n)
nh.heap = make([]NodeInfo, 0, n)
nh.index = make(map[string]int, n)
}

func (nh *nodeHeap) peek() *nodeHeapItem {
func (nh *nodeHeap) peek() *NodeInfo {
if len(nh.heap) == 0 {
return nil
}
return &nh.heap[0]
}

// nodeInfo returns the NodeInfo struct for a given node identified by its ID.
func (nh *nodeHeap) nodeInfo(nodeID string) NodeInfo {
index, ok := nh.index[nodeID]
if ok {
return nh.heap[index]
}
return NodeInfo{}
}

// addOrUpdateNode sets the number of tasks for a given node. It adds the node
// to the heap if it wasn't already tracked.
func (nh *nodeHeap) addOrUpdateNode(n *api.Node, numTasks int) {
func (nh *nodeHeap) addOrUpdateNode(n NodeInfo) {
index, ok := nh.index[n.ID]
if ok {
nh.heap[index].node = n
nh.heap[index].numTasks = numTasks
nh.heap[index] = n
heap.Fix(nh, index)
} else {
heap.Push(nh, nodeHeapItem{node: n, numTasks: numTasks})
heap.Push(nh, n)
}
}

// updateNode sets the number of tasks for a given node. It ignores the update
// if the node isn't already tracked in the heap.
func (nh *nodeHeap) updateNode(nodeID string, numTasks int) {
index, ok := nh.index[nodeID]
func (nh *nodeHeap) updateNode(n NodeInfo) {
index, ok := nh.index[n.ID]
if ok {
nh.heap[index].numTasks = numTasks
nh.heap[index] = n
heap.Fix(nh, index)
}
}

func (nh *nodeHeap) remove(nodeID string) {
index, ok := nh.index[nodeID]
if ok {
nh.heap[index].numTasks = -1
nh.heap[index].Tasks = nil
heap.Fix(nh, index)
heap.Pop(nh)
}
}

func (nh *nodeHeap) findMin(meetsConstraints func(*api.Node) bool, scanAllNodes bool) (*api.Node, int) {
func (nh *nodeHeap) findMin(meetsConstraints func(*NodeInfo) bool, scanAllNodes bool) (*api.Node, int) {
var bestNode *api.Node
minTasks := int(^uint(0) >> 1) // max int
nextStoppingPoint := 0
levelSize := 1

for i := 0; i < len(nh.heap); i++ {
heapEntry := nh.heap[i]
heapEntry := &nh.heap[i]

if meetsConstraints(heapEntry.node) && heapEntry.numTasks < minTasks {
bestNode = heapEntry.node
minTasks = heapEntry.numTasks
if meetsConstraints(heapEntry) && len(heapEntry.Tasks) < minTasks {
bestNode = heapEntry.Node
minTasks = len(heapEntry.Tasks)
}
if !scanAllNodes {
if i == nextStoppingPoint && bestNode != nil {
Expand All @@ -112,8 +115,8 @@ func (nh *nodeHeap) findMin(meetsConstraints func(*api.Node) bool, scanAllNodes
// constraints, check their children
// recursively.
for j := i - levelSize + 1; j <= i; j++ {
heapEntry = nh.heap[i]
if heapEntry.numTasks < minTasks {
heapEntry = &nh.heap[i]
if len(heapEntry.Tasks) < minTasks {
newBestNode, newMinTasks := nh.findBestChildBelowThreshold(meetsConstraints, i, minTasks)
if newBestNode != nil {
bestNode, minTasks = newBestNode, newMinTasks
Expand All @@ -131,17 +134,17 @@ func (nh *nodeHeap) findMin(meetsConstraints func(*api.Node) bool, scanAllNodes
return bestNode, minTasks
}

func (nh *nodeHeap) findBestChildBelowThreshold(meetsConstraints func(*api.Node) bool, index int, threshold int) (*api.Node, int) {
func (nh *nodeHeap) findBestChildBelowThreshold(meetsConstraints func(*NodeInfo) bool, index int, threshold int) (*api.Node, int) {
var bestNode *api.Node

for i := index*2 + 1; i <= index*2+2; i++ {
if i <= len(nh.heap) {
break
}
heapEntry := nh.heap[i]
if heapEntry.numTasks < threshold {
if meetsConstraints(heapEntry.node) {
bestNode, threshold = heapEntry.node, heapEntry.numTasks
heapEntry := &nh.heap[i]
if len(heapEntry.Tasks) < threshold {
if meetsConstraints(heapEntry) {
bestNode, threshold = heapEntry.Node, len(heapEntry.Tasks)
} else {
newBestNode, newMinTasks := nh.findBestChildBelowThreshold(meetsConstraints, i, threshold)
if newBestNode != nil {
Expand Down
20 changes: 12 additions & 8 deletions manager/scheduler/indexed_node_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ func TestFindMin(t *testing.T) {
if i%100 == 0 {
n.Spec.Meta.Labels["special"] = "true"
}
nh.heap = append(nh.heap, nodeHeapItem{node: n, numTasks: int(rand.Int())})
tasks := make(map[string]*api.Task)
for i := rand.Intn(25); i > 0; i-- {
tasks[strconv.Itoa(i)] = &api.Task{ID: strconv.Itoa(i)}
}
nh.heap = append(nh.heap, NodeInfo{Node: n, Tasks: tasks})
nh.index[n.ID] = i
}

heap.Init(&nh)

isSpecial := func(n *api.Node) bool {
isSpecial := func(n *NodeInfo) bool {
return n.Spec.Meta.Labels["special"] == "true"
}

Expand All @@ -50,16 +54,16 @@ func TestFindMin(t *testing.T) {
var manualBestNode *api.Node
manualBestTasks := uint64(math.MaxUint64)
for i := 0; i < nh.Len(); i++ {
if !isSpecial(nh.heap[i].node) {
if !isSpecial(&nh.heap[i]) {
continue
}
if uint64(nh.heap[i].numTasks) < manualBestTasks {
manualBestNode = nh.heap[i].node
manualBestTasks = uint64(nh.heap[i].numTasks)
} else if uint64(nh.heap[i].numTasks) == manualBestTasks && nh.heap[i].node == bestNode {
if uint64(len(nh.heap[i].Tasks)) < manualBestTasks {
manualBestNode = nh.heap[i].Node
manualBestTasks = uint64(len(nh.heap[i].Tasks))
} else if uint64(len(nh.heap[i].Tasks)) == manualBestTasks && nh.heap[i].Node == bestNode {
// prefer the node that findMin chose when
// there are multiple best choices
manualBestNode = nh.heap[i].node
manualBestNode = nh.heap[i].Node
}
}

Expand Down
48 changes: 48 additions & 0 deletions manager/scheduler/nodeinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package scheduler

import "github.com/docker/swarm-v2/api"

// NodeInfo contains a node and some additional metadata.
type NodeInfo struct {
*api.Node
Tasks map[string]*api.Task
AvailableResources api.Resources
}

func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo {
return NodeInfo{
Node: n,
Tasks: tasks,
AvailableResources: availableResources,
}
}

func (nodeInfo *NodeInfo) removeTask(t *api.Task) {
reservations := taskReservations(t)
if nodeInfo.Tasks != nil {
delete(nodeInfo.Tasks, t.ID)
}
nodeInfo.AvailableResources.MemoryBytes += reservations.MemoryBytes
nodeInfo.AvailableResources.NanoCPUs += reservations.NanoCPUs
}

func (nodeInfo *NodeInfo) addTask(t *api.Task) {
reservations := taskReservations(t)
if nodeInfo.Tasks == nil {
nodeInfo.Tasks = make(map[string]*api.Task)
}
nodeInfo.Tasks[t.ID] = t
nodeInfo.AvailableResources.MemoryBytes -= reservations.MemoryBytes
nodeInfo.AvailableResources.NanoCPUs -= reservations.NanoCPUs
}

func taskReservations(t *api.Task) (reservations api.Resources) {
if t.Spec == nil {
return
}
container := t.Spec.GetContainer()
if container != nil && container.Resources != nil && container.Resources.Reservations != nil {
reservations = *container.Resources.Reservations
}
return
}
49 changes: 49 additions & 0 deletions manager/scheduler/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package scheduler

import "github.com/docker/swarm-v2/api"

var (
defaultFilters = []Filter{
// Always check for readiness first.
&ReadyFilter{},
&ResourceFilter{},
}
)

// Pipeline runs a set of filters against nodes.
type Pipeline struct {
task *api.Task
checklist []Filter
}

// NewPipeline returns a pipeline with the default set of filters.
func NewPipeline(t *api.Task) *Pipeline {
p := &Pipeline{
task: t,
checklist: []Filter{},
}

// FIXME(aaronl): This is quite alloc-heavy. It may be better to
// redesign Pipeline so it doesn't require allocating a separate slice
// for each task. Maybe it could use a bitfield to specify which
// filters are enabled.
for _, f := range defaultFilters {
if f.Enabled(t) {
p.checklist = append(p.checklist, f)
}
}

return p
}

// Process a node through the filter pipeline.
// Returns true if all filters pass, false otherwise.
func (p *Pipeline) Process(n *NodeInfo) bool {
for _, f := range p.checklist {
if !f.Check(p.task, n) {
// Immediately stop on first failure.
return false
}
}
return true
}
Loading