Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions manager/scheduler/nodeinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ type hostPortSpec struct {
publishedPort uint32
}

// versionedService defines a tuple that contains a service ID and a spec
// version, so that failures can be tracked per spec version. Note that if the
// task predates spec versioning, specVersion will contain the zero value, and
// this will still work correctly.
type versionedService struct {
serviceID string
specVersion api.Version
}

// NodeInfo contains a node and some additional metadata.
type NodeInfo struct {
*api.Node
Expand All @@ -24,12 +33,14 @@ type NodeInfo struct {
AvailableResources *api.Resources
usedHostPorts map[hostPortSpec]struct{}

// recentFailures is a map from service ID to the timestamps of the
// most recent failures the node has experienced from replicas of that
// service.
// TODO(aaronl): When spec versioning is supported, this should track
// the version of the spec that failed.
recentFailures map[string][]time.Time
// recentFailures is a map from service ID/version to the timestamps of
// the most recent failures the node has experienced from replicas of
// that service.
recentFailures map[versionedService][]time.Time

// lastCleanup is the last time recentFailures was cleaned up. This is
// done periodically to avoid recentFailures growing without any limit.
lastCleanup time.Time
}

func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo {
Expand All @@ -39,7 +50,8 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api
ActiveTasksCountByService: make(map[string]int),
AvailableResources: availableResources.Copy(),
usedHostPorts: make(map[hostPortSpec]struct{}),
recentFailures: make(map[string][]time.Time),
recentFailures: make(map[versionedService][]time.Time),
lastCleanup: time.Now(),
}

for _, t := range tasks {
Expand Down Expand Up @@ -148,30 +160,58 @@ func taskReservations(spec api.TaskSpec) (reservations api.Resources) {
return
}

func (nodeInfo *NodeInfo) cleanupFailures(now time.Time) {
entriesLoop:
for key, failuresEntry := range nodeInfo.recentFailures {
for _, timestamp := range failuresEntry {
if now.Sub(timestamp) < monitorFailures {
continue entriesLoop
}
}
delete(nodeInfo.recentFailures, key)
}
nodeInfo.lastCleanup = now
}

// taskFailed records a task failure from a given service.
func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, serviceID string) {
func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, t *api.Task) {
expired := 0
now := time.Now()
for _, timestamp := range nodeInfo.recentFailures[serviceID] {

if now.Sub(nodeInfo.lastCleanup) >= monitorFailures {
nodeInfo.cleanupFailures(now)
}

versionedService := versionedService{serviceID: t.ServiceID}
if t.SpecVersion != nil {
versionedService.specVersion = *t.SpecVersion
}

for _, timestamp := range nodeInfo.recentFailures[versionedService] {
if now.Sub(timestamp) < monitorFailures {
break
}
expired++
}

if len(nodeInfo.recentFailures[serviceID])-expired == maxFailures-1 {
log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, serviceID, maxFailures, monitorFailures.String())
if len(nodeInfo.recentFailures[versionedService])-expired == maxFailures-1 {
log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, t.ServiceID, maxFailures, monitorFailures.String())
}

nodeInfo.recentFailures[serviceID] = append(nodeInfo.recentFailures[serviceID][expired:], now)
nodeInfo.recentFailures[versionedService] = append(nodeInfo.recentFailures[versionedService][expired:], now)
}

// countRecentFailures returns the number of times the service has failed on
// this node within the lookback window monitorFailures.
func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, serviceID string) int {
recentFailureCount := len(nodeInfo.recentFailures[serviceID])
func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, t *api.Task) int {
versionedService := versionedService{serviceID: t.ServiceID}
if t.SpecVersion != nil {
versionedService.specVersion = *t.SpecVersion
}

recentFailureCount := len(nodeInfo.recentFailures[versionedService])
for i := recentFailureCount - 1; i >= 0; i-- {
if now.Sub(nodeInfo.recentFailures[serviceID][i]) > monitorFailures {
if now.Sub(nodeInfo.recentFailures[versionedService][i]) > monitorFailures {
recentFailureCount -= i + 1
break
}
Expand Down
11 changes: 0 additions & 11 deletions manager/scheduler/nodeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"container/heap"
"errors"
"strings"
"time"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/constraint"
Expand Down Expand Up @@ -32,16 +31,6 @@ func (ns *nodeSet) nodeInfo(nodeID string) (NodeInfo, error) {
// addOrUpdateNode sets the number of tasks for a given node. It adds the node
// to the set if it wasn't already tracked.
func (ns *nodeSet) addOrUpdateNode(n NodeInfo) {
if n.Tasks == nil {
n.Tasks = make(map[string]*api.Task)
}
if n.ActiveTasksCountByService == nil {
n.ActiveTasksCountByService = make(map[string]int)
}
if n.recentFailures == nil {
n.recentFailures = make(map[string][]time.Time)
}

ns.nodes[n.ID] = n
}

Expand Down
31 changes: 19 additions & 12 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool {
if _, wasPreassigned := s.preassignedTasks[t.ID]; !wasPreassigned {
nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
if err == nil {
nodeInfo.taskFailed(ctx, t.ServiceID)
nodeInfo.taskFailed(ctx, t)
s.nodeSet.updateNode(nodeInfo)
}
}
Expand Down Expand Up @@ -315,25 +315,32 @@ func (s *Scheduler) deleteTask(t *api.Task) bool {
}

func (s *Scheduler) createOrUpdateNode(n *api.Node) {
nodeInfo, _ := s.nodeSet.nodeInfo(n.ID)
nodeInfo, nodeInfoErr := s.nodeSet.nodeInfo(n.ID)
var resources *api.Resources
if n.Description != nil && n.Description.Resources != nil {
resources = n.Description.Resources.Copy()
// reconcile resources by looping over all tasks in this node
for _, task := range nodeInfo.Tasks {
reservations := taskReservations(task.Spec)
if nodeInfoErr == nil {
for _, task := range nodeInfo.Tasks {
reservations := taskReservations(task.Spec)

resources.MemoryBytes -= reservations.MemoryBytes
resources.NanoCPUs -= reservations.NanoCPUs
resources.MemoryBytes -= reservations.MemoryBytes
resources.NanoCPUs -= reservations.NanoCPUs

genericresource.ConsumeNodeResources(&resources.Generic,
task.AssignedGenericResources)
genericresource.ConsumeNodeResources(&resources.Generic,
task.AssignedGenericResources)
}
}
} else {
resources = &api.Resources{}
}
nodeInfo.Node = n
nodeInfo.AvailableResources = resources

if nodeInfoErr != nil {
nodeInfo = newNodeInfo(n, nil, *resources)
} else {
nodeInfo.Node = n
nodeInfo.AvailableResources = resources
}
s.nodeSet.addOrUpdateNode(nodeInfo)
}

Expand Down Expand Up @@ -536,8 +543,8 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
nodeLess := func(a *NodeInfo, b *NodeInfo) bool {
// If either node has at least maxFailures recent failures,
// that's the deciding factor.
recentFailuresA := a.countRecentFailures(now, t.ServiceID)
recentFailuresB := b.countRecentFailures(now, t.ServiceID)
recentFailuresA := a.countRecentFailures(now, t)
recentFailuresB := b.countRecentFailures(now, t)

if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures {
if recentFailuresA > recentFailuresB {
Expand Down
151 changes: 150 additions & 1 deletion manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,23 @@ func TestSchedulerFaultyNode(t *testing.T) {
assert.Equal(t, "id1", assignment.NodeID)
}

node2Info, err := scheduler.nodeSet.nodeInfo("id2")
assert.NoError(t, err)
expectedNode2Failures := i
if i > 5 {
expectedNode2Failures = 5
}
assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1"}], expectedNode2Failures)

node1Info, err := scheduler.nodeSet.nodeInfo("id1")
assert.NoError(t, err)

expectedNode1Failures := i - 5
if i < 5 {
expectedNode1Failures = 0
}
assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1"}], expectedNode1Failures)

newPreassignedTask := preassignedTaskTemplate.Copy()
newPreassignedTask.ID = identity.NewID()

Expand All @@ -1297,7 +1314,7 @@ func TestSchedulerFaultyNode(t *testing.T) {
// marked as
nodeInfo, err := scheduler.nodeSet.nodeInfo("id1")
assert.NoError(t, err)
assert.Len(t, nodeInfo.recentFailures["service2"], 0)
assert.Len(t, nodeInfo.recentFailures[versionedService{serviceID: "service2"}], 0)

err = s.Update(func(tx store.Tx) error {
newReplicatedTask := store.GetTask(tx, newReplicatedTask.ID)
Expand All @@ -1316,6 +1333,138 @@ func TestSchedulerFaultyNode(t *testing.T) {
}
}

func TestSchedulerFaultyNodeSpecVersion(t *testing.T) {
ctx := context.Background()

taskTemplate := &api.Task{
ServiceID: "service1",
SpecVersion: &api.Version{Index: 1},
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}

node1 := &api.Node{
ID: "id1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}

node2 := &api.Node{
ID: "id2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial nodes, and one task assigned to node id1
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))

task1 := taskTemplate.Copy()
task1.ID = "id1"
task1.NodeID = "id1"
task1.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task1))
return nil
})
assert.NoError(t, err)

scheduler := New(s)

watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()

go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()

for i := 0; i != 15; i++ {
// Simulate a task failure cycle
newTask := taskTemplate.Copy()
newTask.ID = identity.NewID()

// After the condition for node faultiness has been reached,
// bump the spec version to simulate a service update.
if i > 5 {
newTask.SpecVersion.Index++
}

err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, newTask))
return nil
})
assert.NoError(t, err)

assignment := watchAssignment(t, watch)
assert.Equal(t, newTask.ID, assignment.ID)

if i < 5 || (i > 5 && i < 11) {
// The first 5 attempts should be assigned to node id2 because
// it has no replicas of the service.
// Same with i=6 to i=10 inclusive, which is repeating the
// same behavior with a different SpecVersion.
assert.Equal(t, "id2", assignment.NodeID)
} else {
// The next ones should be assigned to id1, since we'll
// flag id2 as potentially faulty.
assert.Equal(t, "id1", assignment.NodeID)
}

node1Info, err := scheduler.nodeSet.nodeInfo("id1")
assert.NoError(t, err)
node2Info, err := scheduler.nodeSet.nodeInfo("id2")
assert.NoError(t, err)
expectedNode1Spec1Failures := 0
expectedNode1Spec2Failures := 0
expectedNode2Spec1Failures := i
expectedNode2Spec2Failures := 0
if i > 5 {
expectedNode1Spec1Failures = 1
expectedNode2Spec1Failures = 5
expectedNode2Spec2Failures = i - 6
}
if i > 11 {
expectedNode1Spec2Failures = i - 11
expectedNode2Spec2Failures = 5
}
assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 1}}], expectedNode1Spec1Failures)
assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 2}}], expectedNode1Spec2Failures)
assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 1}}], expectedNode2Spec1Failures)
assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 2}}], expectedNode2Spec2Failures)

err = s.Update(func(tx store.Tx) error {
newTask := store.GetTask(tx, newTask.ID)
require.NotNil(t, newTask)
newTask.Status.State = api.TaskStateFailed
assert.NoError(t, store.UpdateTask(tx, newTask))
return nil
})
assert.NoError(t, err)
}
}

func TestSchedulerResourceConstraint(t *testing.T) {
ctx := context.Background()
// Create a ready node without enough memory to run the task.
Expand Down