Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion manager/scheduler/nodeinfo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package scheduler

import "github.com/docker/swarmkit/api"
import (
"time"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
)

// NodeInfo contains a node and some additional metadata.
type NodeInfo struct {
Expand All @@ -9,6 +15,13 @@ type NodeInfo struct {
DesiredRunningTasksCount int
DesiredRunningTasksCountByService map[string]int
AvailableResources api.Resources

// recentFailures is a map from service ID to the timestamps of the
// most recent failures the node has experienced from replicas of that
// service.
// TODO(aaronl): When spec versioning is supported, this should track
// the version of the spec that failed.
recentFailures map[string][]time.Time
}

func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo {
Expand All @@ -17,6 +30,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api
Tasks: make(map[string]*api.Task),
DesiredRunningTasksCountByService: make(map[string]int),
AvailableResources: availableResources,
recentFailures: make(map[string][]time.Time),
}

for _, t := range tasks {
Expand Down Expand Up @@ -94,3 +108,35 @@ func taskReservations(spec api.TaskSpec) (reservations api.Resources) {
}
return
}

// taskFailed records a task failure from a given service.
func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, serviceID string) {
expired := 0
now := time.Now()
for _, timestamp := range nodeInfo.recentFailures[serviceID] {
if now.Sub(timestamp) < monitorFailures {
break
}
expired++
}

if len(nodeInfo.recentFailures[serviceID])-expired == maxFailures-1 {
log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, serviceID, maxFailures, monitorFailures.String())
}

nodeInfo.recentFailures[serviceID] = append(nodeInfo.recentFailures[serviceID][expired:], now)
}

// countRecentFailures returns the number of times the service has failed on
// this node within the lookback window monitorFailures.
func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, serviceID string) int {
recentFailureCount := len(nodeInfo.recentFailures[serviceID])
for i := recentFailureCount - 1; i >= 0; i-- {
if now.Sub(nodeInfo.recentFailures[serviceID][i]) > monitorFailures {
recentFailureCount -= i + 1
break
}
}

return recentFailureCount
}
39 changes: 37 additions & 2 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import (
"golang.org/x/net/context"
)

const (
// monitorFailures is the lookback period for counting failures of
// a task to determine if a node is faulty for a particular service.
monitorFailures = 5 * time.Minute

// maxFailures is the number of failures within monitorFailures that
// triggers downweighting of a node in the sorting function.
maxFailures = 5
)

type schedulingDecision struct {
old *api.Task
new *api.Task
Expand Down Expand Up @@ -224,8 +234,17 @@ func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) int {
// Ignore all tasks that have not reached ALLOCATED
// state, and tasks that no longer consume resources.
if t.Status.State > api.TaskStateRunning {
if oldTask != nil {
s.deleteTask(ctx, oldTask)
if oldTask == nil {
return 1
}
s.deleteTask(ctx, oldTask)
if t.Status.State != oldTask.Status.State &&
(t.Status.State == api.TaskStateFailed || t.Status.State == api.TaskStateRejected) {
nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
if err == nil {
nodeInfo.taskFailed(ctx, t.ServiceID)
s.nodeSet.updateNode(nodeInfo)
}
}
return 1
}
Expand Down Expand Up @@ -481,7 +500,23 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]

s.pipeline.SetTask(t)

now := time.Now()

nodeLess := func(a *NodeInfo, b *NodeInfo) bool {
// If either node has at least maxFailures recent failures,
// that's the deciding factor.
recentFailuresA := a.countRecentFailures(now, t.ServiceID)
recentFailuresB := b.countRecentFailures(now, t.ServiceID)

if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures {
if recentFailuresA > recentFailuresB {
return false
}
if recentFailuresB > recentFailuresA {
return true
}
}

tasksByServiceA := a.DesiredRunningTasksCountByService[t.ServiceID]
tasksByServiceB := b.DesiredRunningTasksCountByService[t.ServiceID]

Expand Down
102 changes: 102 additions & 0 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -700,6 +701,107 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
assert.Equal(t, "newnode", assignment.NodeID)
}

func TestSchedulerFaultyNode(t *testing.T) {
ctx := context.Background()

taskTemplate := &api.Task{
ServiceID: "service1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStateAllocated,
},
}

node1 := &api.Node{
ID: "id1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}

node2 := &api.Node{
ID: "id2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial nodes, and one task assigned to node id1
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))

task1 := taskTemplate.Copy()
task1.ID = "id1"
task1.NodeID = "id1"
task1.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task1))
return nil
})
assert.NoError(t, err)

scheduler := New(s)

watch, cancel := state.Watch(s.WatchQueue(), state.EventUpdateTask{})
defer cancel()

go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()

for i := 0; i != 8; i++ {
// Simulate a task failure cycle
newTask := taskTemplate.Copy()
newTask.ID = identity.NewID()

err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, newTask))
return nil
})
assert.NoError(t, err)

assignment := watchAssignment(t, watch)
assert.Equal(t, newTask.ID, assignment.ID)

if i < 5 {
// The first 5 attempts should be assigned to node id2 because
// it has no replicas of the service.
assert.Equal(t, "id2", assignment.NodeID)
} else {
// The next ones should be assigned to id1, since we'll
// flag id2 as potentially faulty.
assert.Equal(t, "id1", assignment.NodeID)
}

err = s.Update(func(tx store.Tx) error {
newTask := store.GetTask(tx, newTask.ID)
require.NotNil(t, newTask)
newTask.Status.State = api.TaskStateFailed
assert.NoError(t, store.UpdateTask(tx, newTask))
return nil
})
assert.NoError(t, err)
}
}

func TestSchedulerResourceConstraint(t *testing.T) {
ctx := context.Background()
// Create a ready node without enough memory to run the task.
Expand Down