diff --git a/manager/scheduler/nodeinfo.go b/manager/scheduler/nodeinfo.go index 51322a054d..141f32dbc4 100644 --- a/manager/scheduler/nodeinfo.go +++ b/manager/scheduler/nodeinfo.go @@ -15,6 +15,15 @@ type hostPortSpec struct { publishedPort uint32 } +// versionedService defines a tuple that contains a service ID and a spec +// version, so that failures can be tracked per spec version. Note that if the +// task predates spec versioning, specVersion will contain the zero value, and +// this will still work correctly. +type versionedService struct { + serviceID string + specVersion api.Version +} + // NodeInfo contains a node and some additional metadata. type NodeInfo struct { *api.Node @@ -24,12 +33,14 @@ type NodeInfo struct { AvailableResources *api.Resources usedHostPorts map[hostPortSpec]struct{} - // recentFailures is a map from service ID to the timestamps of the - // most recent failures the node has experienced from replicas of that - // service. - // TODO(aaronl): When spec versioning is supported, this should track - // the version of the spec that failed. - recentFailures map[string][]time.Time + // recentFailures is a map from service ID/version to the timestamps of + // the most recent failures the node has experienced from replicas of + // that service. + recentFailures map[versionedService][]time.Time + + // lastCleanup is the last time recentFailures was cleaned up. This is + // done periodically to avoid recentFailures growing without any limit. + lastCleanup time.Time } func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo { @@ -39,7 +50,8 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api ActiveTasksCountByService: make(map[string]int), AvailableResources: availableResources.Copy(), usedHostPorts: make(map[hostPortSpec]struct{}), - recentFailures: make(map[string][]time.Time), + recentFailures: make(map[versionedService][]time.Time), + lastCleanup: time.Now(), } for _, t := range tasks { @@ -148,30 +160,58 @@ func taskReservations(spec api.TaskSpec) (reservations api.Resources) { return } +func (nodeInfo *NodeInfo) cleanupFailures(now time.Time) { +entriesLoop: + for key, failuresEntry := range nodeInfo.recentFailures { + for _, timestamp := range failuresEntry { + if now.Sub(timestamp) < monitorFailures { + continue entriesLoop + } + } + delete(nodeInfo.recentFailures, key) + } + nodeInfo.lastCleanup = now +} + // taskFailed records a task failure from a given service. -func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, serviceID string) { +func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, t *api.Task) { expired := 0 now := time.Now() - for _, timestamp := range nodeInfo.recentFailures[serviceID] { + + if now.Sub(nodeInfo.lastCleanup) >= monitorFailures { + nodeInfo.cleanupFailures(now) + } + + versionedService := versionedService{serviceID: t.ServiceID} + if t.SpecVersion != nil { + versionedService.specVersion = *t.SpecVersion + } + + for _, timestamp := range nodeInfo.recentFailures[versionedService] { if now.Sub(timestamp) < monitorFailures { break } expired++ } - if len(nodeInfo.recentFailures[serviceID])-expired == maxFailures-1 { - log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, serviceID, maxFailures, monitorFailures.String()) + if len(nodeInfo.recentFailures[versionedService])-expired == maxFailures-1 { + log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, t.ServiceID, maxFailures, monitorFailures.String()) } - nodeInfo.recentFailures[serviceID] = append(nodeInfo.recentFailures[serviceID][expired:], now) + nodeInfo.recentFailures[versionedService] = append(nodeInfo.recentFailures[versionedService][expired:], now) } // countRecentFailures returns the number of times the service has failed on // this node within the lookback window monitorFailures. -func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, serviceID string) int { - recentFailureCount := len(nodeInfo.recentFailures[serviceID]) +func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, t *api.Task) int { + versionedService := versionedService{serviceID: t.ServiceID} + if t.SpecVersion != nil { + versionedService.specVersion = *t.SpecVersion + } + + recentFailureCount := len(nodeInfo.recentFailures[versionedService]) for i := recentFailureCount - 1; i >= 0; i-- { - if now.Sub(nodeInfo.recentFailures[serviceID][i]) > monitorFailures { + if now.Sub(nodeInfo.recentFailures[versionedService][i]) > monitorFailures { recentFailureCount -= i + 1 break } diff --git a/manager/scheduler/nodeset.go b/manager/scheduler/nodeset.go index 7f899d8b26..b83704a18d 100644 --- a/manager/scheduler/nodeset.go +++ b/manager/scheduler/nodeset.go @@ -4,7 +4,6 @@ import ( "container/heap" "errors" "strings" - "time" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/constraint" @@ -32,16 +31,6 @@ func (ns *nodeSet) nodeInfo(nodeID string) (NodeInfo, error) { // addOrUpdateNode sets the number of tasks for a given node. It adds the node // to the set if it wasn't already tracked. func (ns *nodeSet) addOrUpdateNode(n NodeInfo) { - if n.Tasks == nil { - n.Tasks = make(map[string]*api.Task) - } - if n.ActiveTasksCountByService == nil { - n.ActiveTasksCountByService = make(map[string]int) - } - if n.recentFailures == nil { - n.recentFailures = make(map[string][]time.Time) - } - ns.nodes[n.ID] = n } diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 9ff921fd5b..73349e391f 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -261,7 +261,7 @@ func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool { if _, wasPreassigned := s.preassignedTasks[t.ID]; !wasPreassigned { nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID) if err == nil { - nodeInfo.taskFailed(ctx, t.ServiceID) + nodeInfo.taskFailed(ctx, t) s.nodeSet.updateNode(nodeInfo) } } @@ -315,25 +315,32 @@ func (s *Scheduler) deleteTask(t *api.Task) bool { } func (s *Scheduler) createOrUpdateNode(n *api.Node) { - nodeInfo, _ := s.nodeSet.nodeInfo(n.ID) + nodeInfo, nodeInfoErr := s.nodeSet.nodeInfo(n.ID) var resources *api.Resources if n.Description != nil && n.Description.Resources != nil { resources = n.Description.Resources.Copy() // reconcile resources by looping over all tasks in this node - for _, task := range nodeInfo.Tasks { - reservations := taskReservations(task.Spec) + if nodeInfoErr == nil { + for _, task := range nodeInfo.Tasks { + reservations := taskReservations(task.Spec) - resources.MemoryBytes -= reservations.MemoryBytes - resources.NanoCPUs -= reservations.NanoCPUs + resources.MemoryBytes -= reservations.MemoryBytes + resources.NanoCPUs -= reservations.NanoCPUs - genericresource.ConsumeNodeResources(&resources.Generic, - task.AssignedGenericResources) + genericresource.ConsumeNodeResources(&resources.Generic, + task.AssignedGenericResources) + } } } else { resources = &api.Resources{} } - nodeInfo.Node = n - nodeInfo.AvailableResources = resources + + if nodeInfoErr != nil { + nodeInfo = newNodeInfo(n, nil, *resources) + } else { + nodeInfo.Node = n + nodeInfo.AvailableResources = resources + } s.nodeSet.addOrUpdateNode(nodeInfo) } @@ -536,8 +543,8 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] nodeLess := func(a *NodeInfo, b *NodeInfo) bool { // If either node has at least maxFailures recent failures, // that's the deciding factor. - recentFailuresA := a.countRecentFailures(now, t.ServiceID) - recentFailuresB := b.countRecentFailures(now, t.ServiceID) + recentFailuresA := a.countRecentFailures(now, t) + recentFailuresB := b.countRecentFailures(now, t) if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures { if recentFailuresA > recentFailuresB { diff --git a/manager/scheduler/scheduler_test.go b/manager/scheduler/scheduler_test.go index 32825e1144..3bcbef3bc0 100644 --- a/manager/scheduler/scheduler_test.go +++ b/manager/scheduler/scheduler_test.go @@ -1278,6 +1278,23 @@ func TestSchedulerFaultyNode(t *testing.T) { assert.Equal(t, "id1", assignment.NodeID) } + node2Info, err := scheduler.nodeSet.nodeInfo("id2") + assert.NoError(t, err) + expectedNode2Failures := i + if i > 5 { + expectedNode2Failures = 5 + } + assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1"}], expectedNode2Failures) + + node1Info, err := scheduler.nodeSet.nodeInfo("id1") + assert.NoError(t, err) + + expectedNode1Failures := i - 5 + if i < 5 { + expectedNode1Failures = 0 + } + assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1"}], expectedNode1Failures) + newPreassignedTask := preassignedTaskTemplate.Copy() newPreassignedTask.ID = identity.NewID() @@ -1297,7 +1314,7 @@ func TestSchedulerFaultyNode(t *testing.T) { // marked as nodeInfo, err := scheduler.nodeSet.nodeInfo("id1") assert.NoError(t, err) - assert.Len(t, nodeInfo.recentFailures["service2"], 0) + assert.Len(t, nodeInfo.recentFailures[versionedService{serviceID: "service2"}], 0) err = s.Update(func(tx store.Tx) error { newReplicatedTask := store.GetTask(tx, newReplicatedTask.ID) @@ -1316,6 +1333,138 @@ func TestSchedulerFaultyNode(t *testing.T) { } } +func TestSchedulerFaultyNodeSpecVersion(t *testing.T) { + ctx := context.Background() + + taskTemplate := &api.Task{ + ServiceID: "service1", + SpecVersion: &api.Version{Index: 1}, + DesiredState: api.TaskStateRunning, + ServiceAnnotations: api.Annotations{ + Name: "name1", + }, + Status: api.TaskStatus{ + State: api.TaskStatePending, + }, + } + + node1 := &api.Node{ + ID: "id1", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "id1", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + } + + node2 := &api.Node{ + ID: "id2", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "id2", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + } + + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + err := s.Update(func(tx store.Tx) error { + // Add initial nodes, and one task assigned to node id1 + assert.NoError(t, store.CreateNode(tx, node1)) + assert.NoError(t, store.CreateNode(tx, node2)) + + task1 := taskTemplate.Copy() + task1.ID = "id1" + task1.NodeID = "id1" + task1.Status.State = api.TaskStateRunning + assert.NoError(t, store.CreateTask(tx, task1)) + return nil + }) + assert.NoError(t, err) + + scheduler := New(s) + + watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) + defer cancel() + + go func() { + assert.NoError(t, scheduler.Run(ctx)) + }() + defer scheduler.Stop() + + for i := 0; i != 15; i++ { + // Simulate a task failure cycle + newTask := taskTemplate.Copy() + newTask.ID = identity.NewID() + + // After the condition for node faultiness has been reached, + // bump the spec version to simulate a service update. + if i > 5 { + newTask.SpecVersion.Index++ + } + + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateTask(tx, newTask)) + return nil + }) + assert.NoError(t, err) + + assignment := watchAssignment(t, watch) + assert.Equal(t, newTask.ID, assignment.ID) + + if i < 5 || (i > 5 && i < 11) { + // The first 5 attempts should be assigned to node id2 because + // it has no replicas of the service. + // Same with i=6 to i=10 inclusive, which is repeating the + // same behavior with a different SpecVersion. + assert.Equal(t, "id2", assignment.NodeID) + } else { + // The next ones should be assigned to id1, since we'll + // flag id2 as potentially faulty. + assert.Equal(t, "id1", assignment.NodeID) + } + + node1Info, err := scheduler.nodeSet.nodeInfo("id1") + assert.NoError(t, err) + node2Info, err := scheduler.nodeSet.nodeInfo("id2") + assert.NoError(t, err) + expectedNode1Spec1Failures := 0 + expectedNode1Spec2Failures := 0 + expectedNode2Spec1Failures := i + expectedNode2Spec2Failures := 0 + if i > 5 { + expectedNode1Spec1Failures = 1 + expectedNode2Spec1Failures = 5 + expectedNode2Spec2Failures = i - 6 + } + if i > 11 { + expectedNode1Spec2Failures = i - 11 + expectedNode2Spec2Failures = 5 + } + assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 1}}], expectedNode1Spec1Failures) + assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 2}}], expectedNode1Spec2Failures) + assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 1}}], expectedNode2Spec1Failures) + assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 2}}], expectedNode2Spec2Failures) + + err = s.Update(func(tx store.Tx) error { + newTask := store.GetTask(tx, newTask.ID) + require.NotNil(t, newTask) + newTask.Status.State = api.TaskStateFailed + assert.NoError(t, store.UpdateTask(tx, newTask)) + return nil + }) + assert.NoError(t, err) + } +} + func TestSchedulerResourceConstraint(t *testing.T) { ctx := context.Background() // Create a ready node without enough memory to run the task.