diff --git a/manager/scheduler/nodeinfo.go b/manager/scheduler/nodeinfo.go index 5f45fd6581..d222f557c8 100644 --- a/manager/scheduler/nodeinfo.go +++ b/manager/scheduler/nodeinfo.go @@ -1,6 +1,12 @@ package scheduler -import "github.com/docker/swarmkit/api" +import ( + "time" + + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/log" + "golang.org/x/net/context" +) // NodeInfo contains a node and some additional metadata. type NodeInfo struct { @@ -9,6 +15,13 @@ type NodeInfo struct { DesiredRunningTasksCount int DesiredRunningTasksCountByService map[string]int AvailableResources api.Resources + + // recentFailures is a map from service ID to the timestamps of the + // most recent failures the node has experienced from replicas of that + // service. + // TODO(aaronl): When spec versioning is supported, this should track + // the version of the spec that failed. + recentFailures map[string][]time.Time } func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo { @@ -17,6 +30,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api Tasks: make(map[string]*api.Task), DesiredRunningTasksCountByService: make(map[string]int), AvailableResources: availableResources, + recentFailures: make(map[string][]time.Time), } for _, t := range tasks { @@ -94,3 +108,35 @@ func taskReservations(spec api.TaskSpec) (reservations api.Resources) { } return } + +// taskFailed records a task failure from a given service. +func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, serviceID string) { + expired := 0 + now := time.Now() + for _, timestamp := range nodeInfo.recentFailures[serviceID] { + if now.Sub(timestamp) < monitorFailures { + break + } + expired++ + } + + if len(nodeInfo.recentFailures[serviceID])-expired == maxFailures-1 { + log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, serviceID, maxFailures, monitorFailures.String()) + } + + nodeInfo.recentFailures[serviceID] = append(nodeInfo.recentFailures[serviceID][expired:], now) +} + +// countRecentFailures returns the number of times the service has failed on +// this node within the lookback window monitorFailures. +func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, serviceID string) int { + recentFailureCount := len(nodeInfo.recentFailures[serviceID]) + for i := recentFailureCount - 1; i >= 0; i-- { + if now.Sub(nodeInfo.recentFailures[serviceID][i]) > monitorFailures { + recentFailureCount -= i + 1 + break + } + } + + return recentFailureCount +} diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index c18a278d95..c8fc8602fa 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -12,6 +12,16 @@ import ( "golang.org/x/net/context" ) +const ( + // monitorFailures is the lookback period for counting failures of + // a task to determine if a node is faulty for a particular service. + monitorFailures = 5 * time.Minute + + // maxFailures is the number of failures within monitorFailures that + // triggers downweighting of a node in the sorting function. + maxFailures = 5 +) + type schedulingDecision struct { old *api.Task new *api.Task @@ -224,8 +234,17 @@ func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) int { // Ignore all tasks that have not reached ALLOCATED // state, and tasks that no longer consume resources. if t.Status.State > api.TaskStateRunning { - if oldTask != nil { - s.deleteTask(ctx, oldTask) + if oldTask == nil { + return 1 + } + s.deleteTask(ctx, oldTask) + if t.Status.State != oldTask.Status.State && + (t.Status.State == api.TaskStateFailed || t.Status.State == api.TaskStateRejected) { + nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID) + if err == nil { + nodeInfo.taskFailed(ctx, t.ServiceID) + s.nodeSet.updateNode(nodeInfo) + } } return 1 } @@ -481,7 +500,23 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] s.pipeline.SetTask(t) + now := time.Now() + nodeLess := func(a *NodeInfo, b *NodeInfo) bool { + // If either node has at least maxFailures recent failures, + // that's the deciding factor. + recentFailuresA := a.countRecentFailures(now, t.ServiceID) + recentFailuresB := b.countRecentFailures(now, t.ServiceID) + + if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures { + if recentFailuresA > recentFailuresB { + return false + } + if recentFailuresB > recentFailuresA { + return true + } + } + tasksByServiceA := a.DesiredRunningTasksCountByService[t.ServiceID] tasksByServiceB := b.DesiredRunningTasksCountByService[t.ServiceID] diff --git a/manager/scheduler/scheduler_test.go b/manager/scheduler/scheduler_test.go index 5e99f54857..7507a170e2 100644 --- a/manager/scheduler/scheduler_test.go +++ b/manager/scheduler/scheduler_test.go @@ -14,6 +14,7 @@ import ( "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/net/context" ) @@ -700,6 +701,107 @@ func TestSchedulerNoReadyNodes(t *testing.T) { assert.Equal(t, "newnode", assignment.NodeID) } +func TestSchedulerFaultyNode(t *testing.T) { + ctx := context.Background() + + taskTemplate := &api.Task{ + ServiceID: "service1", + DesiredState: api.TaskStateRunning, + ServiceAnnotations: api.Annotations{ + Name: "name1", + }, + Status: api.TaskStatus{ + State: api.TaskStateAllocated, + }, + } + + node1 := &api.Node{ + ID: "id1", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "id1", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + } + + node2 := &api.Node{ + ID: "id2", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "id2", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + } + + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + err := s.Update(func(tx store.Tx) error { + // Add initial nodes, and one task assigned to node id1 + assert.NoError(t, store.CreateNode(tx, node1)) + assert.NoError(t, store.CreateNode(tx, node2)) + + task1 := taskTemplate.Copy() + task1.ID = "id1" + task1.NodeID = "id1" + task1.Status.State = api.TaskStateRunning + assert.NoError(t, store.CreateTask(tx, task1)) + return nil + }) + assert.NoError(t, err) + + scheduler := New(s) + + watch, cancel := state.Watch(s.WatchQueue(), state.EventUpdateTask{}) + defer cancel() + + go func() { + assert.NoError(t, scheduler.Run(ctx)) + }() + defer scheduler.Stop() + + for i := 0; i != 8; i++ { + // Simulate a task failure cycle + newTask := taskTemplate.Copy() + newTask.ID = identity.NewID() + + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateTask(tx, newTask)) + return nil + }) + assert.NoError(t, err) + + assignment := watchAssignment(t, watch) + assert.Equal(t, newTask.ID, assignment.ID) + + if i < 5 { + // The first 5 attempts should be assigned to node id2 because + // it has no replicas of the service. + assert.Equal(t, "id2", assignment.NodeID) + } else { + // The next ones should be assigned to id1, since we'll + // flag id2 as potentially faulty. + assert.Equal(t, "id1", assignment.NodeID) + } + + err = s.Update(func(tx store.Tx) error { + newTask := store.GetTask(tx, newTask.ID) + require.NotNil(t, newTask) + newTask.Status.State = api.TaskStateFailed + assert.NoError(t, store.UpdateTask(tx, newTask)) + return nil + }) + assert.NoError(t, err) + } +} + func TestSchedulerResourceConstraint(t *testing.T) { ctx := context.Background() // Create a ready node without enough memory to run the task.