From 5cc1eebfedae7536ade1e9d2bc7a283a460ffae5 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 13 Jun 2017 16:29:38 -0700 Subject: [PATCH 1/4] scheduler: Version-aware failure tracking This addresses a longstanding TODO which I forgot about. Currently, the scheduler tracks potential failure loops by service ID. However, if the service is updated, it won't distinguish the new version of the service from the old one. This means even if the operator fixes the underlying problem, the new tasks from the updated service will still avoid the node where the problem was detected. Change the failure tracking to be sensitive to SpecVersion as well. If SpecVersion changes, we'll treat the task as a different kind of task for failure tracking purposes. Signed-off-by: Aaron Lehmann --- manager/scheduler/nodeinfo.go | 48 ++++++++---- manager/scheduler/nodeset.go | 2 +- manager/scheduler/scheduler.go | 6 +- manager/scheduler/scheduler_test.go | 112 +++++++++++++++++++++++++++- 4 files changed, 148 insertions(+), 20 deletions(-) diff --git a/manager/scheduler/nodeinfo.go b/manager/scheduler/nodeinfo.go index 51322a054d..575090e2ea 100644 --- a/manager/scheduler/nodeinfo.go +++ b/manager/scheduler/nodeinfo.go @@ -15,6 +15,15 @@ type hostPortSpec struct { publishedPort uint32 } +// versionedService defines a tuple that contains a service ID and a spec +// version, so that failures can be tracked per spec version. Note that if the +// task predates spec versioning, specVersion will contain the zero value, and +// this will still work correctly. +type versionedService struct { + serviceID string + specVersion api.Version +} + // NodeInfo contains a node and some additional metadata. type NodeInfo struct { *api.Node @@ -24,12 +33,10 @@ type NodeInfo struct { AvailableResources *api.Resources usedHostPorts map[hostPortSpec]struct{} - // recentFailures is a map from service ID to the timestamps of the - // most recent failures the node has experienced from replicas of that - // service. - // TODO(aaronl): When spec versioning is supported, this should track - // the version of the spec that failed. - recentFailures map[string][]time.Time + // recentFailures is a map from service ID/version to the timestamps of + // the most recent failures the node has experienced from replicas of + // that service. + recentFailures map[versionedService][]time.Time } func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo { @@ -39,7 +46,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api ActiveTasksCountByService: make(map[string]int), AvailableResources: availableResources.Copy(), usedHostPorts: make(map[hostPortSpec]struct{}), - recentFailures: make(map[string][]time.Time), + recentFailures: make(map[versionedService][]time.Time), } for _, t := range tasks { @@ -149,29 +156,40 @@ func taskReservations(spec api.TaskSpec) (reservations api.Resources) { } // taskFailed records a task failure from a given service. -func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, serviceID string) { +func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, t *api.Task) { expired := 0 now := time.Now() - for _, timestamp := range nodeInfo.recentFailures[serviceID] { + + versionedService := versionedService{serviceID: t.ServiceID} + if t.SpecVersion != nil { + versionedService.specVersion = *t.SpecVersion + } + + for _, timestamp := range nodeInfo.recentFailures[versionedService] { if now.Sub(timestamp) < monitorFailures { break } expired++ } - if len(nodeInfo.recentFailures[serviceID])-expired == maxFailures-1 { - log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, serviceID, maxFailures, monitorFailures.String()) + if len(nodeInfo.recentFailures[versionedService])-expired == maxFailures-1 { + log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, t.ServiceID, maxFailures, monitorFailures.String()) } - nodeInfo.recentFailures[serviceID] = append(nodeInfo.recentFailures[serviceID][expired:], now) + nodeInfo.recentFailures[versionedService] = append(nodeInfo.recentFailures[versionedService][expired:], now) } // countRecentFailures returns the number of times the service has failed on // this node within the lookback window monitorFailures. -func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, serviceID string) int { - recentFailureCount := len(nodeInfo.recentFailures[serviceID]) +func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, t *api.Task) int { + versionedService := versionedService{serviceID: t.ServiceID} + if t.SpecVersion != nil { + versionedService.specVersion = *t.SpecVersion + } + + recentFailureCount := len(nodeInfo.recentFailures[versionedService]) for i := recentFailureCount - 1; i >= 0; i-- { - if now.Sub(nodeInfo.recentFailures[serviceID][i]) > monitorFailures { + if now.Sub(nodeInfo.recentFailures[versionedService][i]) > monitorFailures { recentFailureCount -= i + 1 break } diff --git a/manager/scheduler/nodeset.go b/manager/scheduler/nodeset.go index 7f899d8b26..b0391736ce 100644 --- a/manager/scheduler/nodeset.go +++ b/manager/scheduler/nodeset.go @@ -39,7 +39,7 @@ func (ns *nodeSet) addOrUpdateNode(n NodeInfo) { n.ActiveTasksCountByService = make(map[string]int) } if n.recentFailures == nil { - n.recentFailures = make(map[string][]time.Time) + n.recentFailures = make(map[versionedService][]time.Time) } ns.nodes[n.ID] = n diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 9ff921fd5b..fc3bb9027b 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -261,7 +261,7 @@ func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool { if _, wasPreassigned := s.preassignedTasks[t.ID]; !wasPreassigned { nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID) if err == nil { - nodeInfo.taskFailed(ctx, t.ServiceID) + nodeInfo.taskFailed(ctx, t) s.nodeSet.updateNode(nodeInfo) } } @@ -536,8 +536,8 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] nodeLess := func(a *NodeInfo, b *NodeInfo) bool { // If either node has at least maxFailures recent failures, // that's the deciding factor. - recentFailuresA := a.countRecentFailures(now, t.ServiceID) - recentFailuresB := b.countRecentFailures(now, t.ServiceID) + recentFailuresA := a.countRecentFailures(now, t) + recentFailuresB := b.countRecentFailures(now, t) if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures { if recentFailuresA > recentFailuresB { diff --git a/manager/scheduler/scheduler_test.go b/manager/scheduler/scheduler_test.go index 32825e1144..ef50e60209 100644 --- a/manager/scheduler/scheduler_test.go +++ b/manager/scheduler/scheduler_test.go @@ -1297,7 +1297,7 @@ func TestSchedulerFaultyNode(t *testing.T) { // marked as nodeInfo, err := scheduler.nodeSet.nodeInfo("id1") assert.NoError(t, err) - assert.Len(t, nodeInfo.recentFailures["service2"], 0) + assert.Len(t, nodeInfo.recentFailures[versionedService{serviceID: "service2"}], 0) err = s.Update(func(tx store.Tx) error { newReplicatedTask := store.GetTask(tx, newReplicatedTask.ID) @@ -1316,6 +1316,116 @@ func TestSchedulerFaultyNode(t *testing.T) { } } +func TestSchedulerFaultyNodeSpecVersion(t *testing.T) { + ctx := context.Background() + + taskTemplate := &api.Task{ + ServiceID: "service1", + SpecVersion: &api.Version{Index: 1}, + DesiredState: api.TaskStateRunning, + ServiceAnnotations: api.Annotations{ + Name: "name1", + }, + Status: api.TaskStatus{ + State: api.TaskStatePending, + }, + } + + node1 := &api.Node{ + ID: "id1", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "id1", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + } + + node2 := &api.Node{ + ID: "id2", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "id2", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + } + + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + err := s.Update(func(tx store.Tx) error { + // Add initial nodes, and one task assigned to node id1 + assert.NoError(t, store.CreateNode(tx, node1)) + assert.NoError(t, store.CreateNode(tx, node2)) + + task1 := taskTemplate.Copy() + task1.ID = "id1" + task1.NodeID = "id1" + task1.Status.State = api.TaskStateRunning + assert.NoError(t, store.CreateTask(tx, task1)) + return nil + }) + assert.NoError(t, err) + + scheduler := New(s) + + watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) + defer cancel() + + go func() { + assert.NoError(t, scheduler.Run(ctx)) + }() + defer scheduler.Stop() + + for i := 0; i != 15; i++ { + // Simulate a task failure cycle + newTask := taskTemplate.Copy() + newTask.ID = identity.NewID() + + // After the condition for node faultiness has been reached, + // bump the spec version to simulate a service update. + if i > 5 { + newTask.SpecVersion.Index++ + } + + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateTask(tx, newTask)) + return nil + }) + assert.NoError(t, err) + + assignment := watchAssignment(t, watch) + assert.Equal(t, newTask.ID, assignment.ID) + + if i < 5 || (i > 5 && i < 11) { + // The first 5 attempts should be assigned to node id2 because + // it has no replicas of the service. + // Same with i=6 to i=10 inclusive, which is repeating the + // same behavior with a different SpecVersion. + assert.Equal(t, "id2", assignment.NodeID) + } else { + // The next ones should be assigned to id1, since we'll + // flag id2 as potentially faulty. + assert.Equal(t, "id1", assignment.NodeID) + } + + err = s.Update(func(tx store.Tx) error { + newTask := store.GetTask(tx, newTask.ID) + require.NotNil(t, newTask) + newTask.Status.State = api.TaskStateFailed + assert.NoError(t, store.UpdateTask(tx, newTask)) + return nil + }) + assert.NoError(t, err) + } +} + func TestSchedulerResourceConstraint(t *testing.T) { ctx := context.Background() // Create a ready node without enough memory to run the task. From 33d6564a444648d3f17ba45ef8eb3fb5ad9f0e6a Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 13 Jun 2017 17:10:15 -0700 Subject: [PATCH 2/4] scheduler: Clean up recentFailures periodically This map could technically grow without bound. When adding a new failure record, if the cleanup hasn't happened recently, loop over all the recentFailures entries and remove any that only consist of expired records. Signed-off-by: Aaron Lehmann --- manager/scheduler/nodeinfo.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/manager/scheduler/nodeinfo.go b/manager/scheduler/nodeinfo.go index 575090e2ea..141f32dbc4 100644 --- a/manager/scheduler/nodeinfo.go +++ b/manager/scheduler/nodeinfo.go @@ -37,6 +37,10 @@ type NodeInfo struct { // the most recent failures the node has experienced from replicas of // that service. recentFailures map[versionedService][]time.Time + + // lastCleanup is the last time recentFailures was cleaned up. This is + // done periodically to avoid recentFailures growing without any limit. + lastCleanup time.Time } func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo { @@ -47,6 +51,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api AvailableResources: availableResources.Copy(), usedHostPorts: make(map[hostPortSpec]struct{}), recentFailures: make(map[versionedService][]time.Time), + lastCleanup: time.Now(), } for _, t := range tasks { @@ -155,11 +160,28 @@ func taskReservations(spec api.TaskSpec) (reservations api.Resources) { return } +func (nodeInfo *NodeInfo) cleanupFailures(now time.Time) { +entriesLoop: + for key, failuresEntry := range nodeInfo.recentFailures { + for _, timestamp := range failuresEntry { + if now.Sub(timestamp) < monitorFailures { + continue entriesLoop + } + } + delete(nodeInfo.recentFailures, key) + } + nodeInfo.lastCleanup = now +} + // taskFailed records a task failure from a given service. func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, t *api.Task) { expired := 0 now := time.Now() + if now.Sub(nodeInfo.lastCleanup) >= monitorFailures { + nodeInfo.cleanupFailures(now) + } + versionedService := versionedService{serviceID: t.ServiceID} if t.SpecVersion != nil { versionedService.specVersion = *t.SpecVersion From d5bfa241b660858e1bc440d9cfdbb5f40aa0c787 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 13 Jun 2017 17:11:34 -0700 Subject: [PATCH 3/4] scheduler: Clean up addOrUpdateNode This function previously could take an uninitialized NodeInfo structure and fill in whatever was missing. This is very error-prone, so remove this logic and change the only caller that relies on it to always pass in a properly initialized NodeInfo. Signed-off-by: Aaron Lehmann --- manager/scheduler/nodeset.go | 11 ----------- manager/scheduler/scheduler.go | 25 ++++++++++++++++--------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/manager/scheduler/nodeset.go b/manager/scheduler/nodeset.go index b0391736ce..b83704a18d 100644 --- a/manager/scheduler/nodeset.go +++ b/manager/scheduler/nodeset.go @@ -4,7 +4,6 @@ import ( "container/heap" "errors" "strings" - "time" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/constraint" @@ -32,16 +31,6 @@ func (ns *nodeSet) nodeInfo(nodeID string) (NodeInfo, error) { // addOrUpdateNode sets the number of tasks for a given node. It adds the node // to the set if it wasn't already tracked. func (ns *nodeSet) addOrUpdateNode(n NodeInfo) { - if n.Tasks == nil { - n.Tasks = make(map[string]*api.Task) - } - if n.ActiveTasksCountByService == nil { - n.ActiveTasksCountByService = make(map[string]int) - } - if n.recentFailures == nil { - n.recentFailures = make(map[versionedService][]time.Time) - } - ns.nodes[n.ID] = n } diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index fc3bb9027b..73349e391f 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -315,25 +315,32 @@ func (s *Scheduler) deleteTask(t *api.Task) bool { } func (s *Scheduler) createOrUpdateNode(n *api.Node) { - nodeInfo, _ := s.nodeSet.nodeInfo(n.ID) + nodeInfo, nodeInfoErr := s.nodeSet.nodeInfo(n.ID) var resources *api.Resources if n.Description != nil && n.Description.Resources != nil { resources = n.Description.Resources.Copy() // reconcile resources by looping over all tasks in this node - for _, task := range nodeInfo.Tasks { - reservations := taskReservations(task.Spec) + if nodeInfoErr == nil { + for _, task := range nodeInfo.Tasks { + reservations := taskReservations(task.Spec) - resources.MemoryBytes -= reservations.MemoryBytes - resources.NanoCPUs -= reservations.NanoCPUs + resources.MemoryBytes -= reservations.MemoryBytes + resources.NanoCPUs -= reservations.NanoCPUs - genericresource.ConsumeNodeResources(&resources.Generic, - task.AssignedGenericResources) + genericresource.ConsumeNodeResources(&resources.Generic, + task.AssignedGenericResources) + } } } else { resources = &api.Resources{} } - nodeInfo.Node = n - nodeInfo.AvailableResources = resources + + if nodeInfoErr != nil { + nodeInfo = newNodeInfo(n, nil, *resources) + } else { + nodeInfo.Node = n + nodeInfo.AvailableResources = resources + } s.nodeSet.addOrUpdateNode(nodeInfo) } From cf4dc9148fd63873ae735f3e9385b65e7c6a606e Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Thu, 15 Jun 2017 16:34:00 -0700 Subject: [PATCH 4/4] scheduler: Check monotonicity of recentFailures during tests Signed-off-by: Aaron Lehmann --- manager/scheduler/scheduler_test.go | 39 +++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/manager/scheduler/scheduler_test.go b/manager/scheduler/scheduler_test.go index ef50e60209..3bcbef3bc0 100644 --- a/manager/scheduler/scheduler_test.go +++ b/manager/scheduler/scheduler_test.go @@ -1278,6 +1278,23 @@ func TestSchedulerFaultyNode(t *testing.T) { assert.Equal(t, "id1", assignment.NodeID) } + node2Info, err := scheduler.nodeSet.nodeInfo("id2") + assert.NoError(t, err) + expectedNode2Failures := i + if i > 5 { + expectedNode2Failures = 5 + } + assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1"}], expectedNode2Failures) + + node1Info, err := scheduler.nodeSet.nodeInfo("id1") + assert.NoError(t, err) + + expectedNode1Failures := i - 5 + if i < 5 { + expectedNode1Failures = 0 + } + assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1"}], expectedNode1Failures) + newPreassignedTask := preassignedTaskTemplate.Copy() newPreassignedTask.ID = identity.NewID() @@ -1415,6 +1432,28 @@ func TestSchedulerFaultyNodeSpecVersion(t *testing.T) { assert.Equal(t, "id1", assignment.NodeID) } + node1Info, err := scheduler.nodeSet.nodeInfo("id1") + assert.NoError(t, err) + node2Info, err := scheduler.nodeSet.nodeInfo("id2") + assert.NoError(t, err) + expectedNode1Spec1Failures := 0 + expectedNode1Spec2Failures := 0 + expectedNode2Spec1Failures := i + expectedNode2Spec2Failures := 0 + if i > 5 { + expectedNode1Spec1Failures = 1 + expectedNode2Spec1Failures = 5 + expectedNode2Spec2Failures = i - 6 + } + if i > 11 { + expectedNode1Spec2Failures = i - 11 + expectedNode2Spec2Failures = 5 + } + assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 1}}], expectedNode1Spec1Failures) + assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 2}}], expectedNode1Spec2Failures) + assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 1}}], expectedNode2Spec1Failures) + assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 2}}], expectedNode2Spec2Failures) + err = s.Update(func(tx store.Tx) error { newTask := store.GetTask(tx, newTask.ID) require.NotNil(t, newTask)