diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 1cb398d09a..5a3bf9633c 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -1214,7 +1214,19 @@ func TestNodeAllocator(t *testing.T) { } assert.NoError(t, store.CreateNetwork(tx, n1)) + // this network will never be used for any task + nUnused := &api.Network{ + ID: "overlayIDUnused", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "overlayIDUnused", + }, + }, + } + assert.NoError(t, store.CreateNetwork(tx, nUnused)) + assert.NoError(t, store.CreateNode(tx, node1)) + return nil })) @@ -1222,6 +1234,8 @@ func TestNodeAllocator(t *testing.T) { defer cancel() netWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateNetwork{}, api.EventDeleteNetwork{}) defer cancel() + taskWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) + defer cancel() // Start allocator go func() { @@ -1229,12 +1243,36 @@ func TestNodeAllocator(t *testing.T) { }() defer a.Stop() + assert.NoError(t, s.Update(func(tx store.Tx) error { + // create a task assigned to this node that has a network attachment on + // n1 + t1 := &api.Task{ + ID: "task1", + NodeID: node1.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID1", + }, + }, + }, + } + + return store.CreateTask(tx, t1) + })) + + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + // Validate node has 2 LB IP address (1 for each network). watchNetwork(t, netWatch, false, isValidNetwork) // ingress watchNetwork(t, netWatch, false, isValidNetwork) // overlayID1 + watchNetwork(t, netWatch, false, isValidNetwork) // overlayIDUnused watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 - // Add a node and validate it gets a LB ip on each network. + // Add a node and validate it gets a LB ip only on ingress, as it has no + // tasks assigned. node2 := &api.Node{ ID: "nodeID2", } @@ -1242,9 +1280,9 @@ func TestNodeAllocator(t *testing.T) { assert.NoError(t, store.CreateNode(tx, node2)) return nil })) - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1"}) // node2 + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress"}) // node2 - // Add a network and validate each node has 3 LB IP addresses + // Add a network and validate that nothing has changed in the nodes n2 := &api.Network{ ID: "overlayID2", Spec: api.NetworkSpec{ @@ -1257,18 +1295,86 @@ func TestNodeAllocator(t *testing.T) { assert.NoError(t, store.CreateNetwork(tx, n2)) return nil })) - watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 - watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1", "overlayID3"}) // node1 - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1", "overlayID3"}) // node2 + watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 + // nothing should change, no updates + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress"}) // node2 - // Remove a network and validate each node has 2 LB IP addresses + // add a task and validate that the node gets the network for the task assert.NoError(t, s.Update(func(tx store.Tx) error { - assert.NoError(t, store.DeleteNetwork(tx, n2.ID)) + // create a task assigned to this node that has a network attachment on + // n1 + t2 := &api.Task{ + ID: "task2", + NodeID: node2.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID2", + }, + }, + }, + } + + return store.CreateTask(tx, t2) + })) + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + + // validate that node2 gets a new attachment and node1 stays the same + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + + // add another task with the same network to a node and validate that it + // still only has 1 attachment for that network + assert.NoError(t, s.Update(func(tx store.Tx) error { + // create a task assigned to this node that has a network attachment on + // n1 + t3 := &api.Task{ + ID: "task3", + NodeID: node1.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID1", + }, + }, + }, + } + + return store.CreateTask(tx, t3) + })) + + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + + // validate that nothing changes + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 + + // now remove that task we just created, and validate that the node still + // has an attachment for the other task + // Remove a node and validate remaining node has 2 LB IP addresses + assert.NoError(t, s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteTask(tx, "task1")) return nil })) - watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 - watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1"}) // node2 + + // validate that nothing changes + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 + + // now remove another task. this time the attachment on the node should be + // removed as well + assert.NoError(t, s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteTask(tx, "task2")) + return nil + })) + + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress"}) // node2 + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 // Remove a node and validate remaining node has 2 LB IP addresses assert.NoError(t, s.Update(func(tx store.Tx) error { @@ -1418,13 +1524,13 @@ func watchNode(t *testing.T, watch chan events.Event, expectTimeout bool, } } - case <-time.After(1 * time.Millisecond): + case <-time.After(getWatchTimeout(expectTimeout)): if !expectTimeout { if node != nil && fn != nil { fn(t, originalNode, node, networks) } - t.Fatal("timed out before watchNode found expected node state") + t.Fatal("timed out before watchNode found expected node state", string(debug.Stack())) } return @@ -1457,7 +1563,7 @@ func watchNetwork(t *testing.T, watch chan events.Event, expectTimeout bool, fn fn(t, network) } - t.Fatal("timed out before watchNetwork found expected network state") + t.Fatal("timed out before watchNetwork found expected network state", string(debug.Stack())) } return diff --git a/manager/allocator/network.go b/manager/allocator/network.go index 3083db8e3b..c87b7757bf 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -181,10 +181,6 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { if IsIngressNetwork(n) { nc.ingressNetwork = n } - err := a.allocateNodes(ctx, false) - if err != nil { - log.G(ctx).WithError(err).Error(err) - } case api.EventDeleteNetwork: n := v.Network.Copy() @@ -337,19 +333,12 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) { nc.somethingWasDeallocated = true } } else { - allocatedNetworks, err := a.getAllocatedNetworks() - if err != nil { - log.G(ctx).WithError(err).Errorf("Error listing allocated networks in network %s", node.ID) - } - - isAllocated := a.allocateNode(ctx, node, false, allocatedNetworks) - - if isAllocated { - if err := a.store.Batch(func(batch *store.Batch) error { - return a.commitAllocatedNode(ctx, batch, node) - }); err != nil { - log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID) - } + // if this isn't a delete, we should try reallocating the node. if this + // is a creation, then the node will be allocated only for ingress. + if err := a.reallocateNode(ctx, node.ID); err != nil { + log.G(ctx).WithError(err).Errorf( + "error reallocating network resources for node %v", node.ID, + ) } } } @@ -394,6 +383,69 @@ func (a *Allocator) getAllocatedNetworks() ([]*api.Network, error) { return allocatedNetworks, nil } +// getNodeNetworks returns all networks that should be allocated for a node +func (a *Allocator) getNodeNetworks(nodeID string) ([]*api.Network, error) { + var ( + // no need to initialize networks. we only append to it, and appending + // to a nil slice is valid. this has the added bonus of making this nil + // if we return an error + networks []*api.Network + err error + ) + a.store.View(func(tx store.ReadTx) { + // get all tasks currently assigned to this node. it's no big deal if + // the tasks change in the meantime, there's no race to clean up + // unneeded network attachments on a node. + var tasks []*api.Task + tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID)) + if err != nil { + return + } + // we need to keep track of network IDs that we've already added to the + // list of networks we're going to return. we could do + // map[string]*api.Network and then convert to []*api.Network and + // return that, but it seems cleaner to have a separate set and list. + networkIDs := map[string]struct{}{} + for _, task := range tasks { + // we don't need to check if a task is before the Assigned state. + // the only way we have a task with a NodeID that isn't yet in + // Assigned is if it's a global service task. this check is not + // necessary: + // if task.Status.State < api.TaskStateAssigned { + // continue + // } + if task.Status.State > api.TaskStateRunning { + // we don't need to have network attachments for a task that's + // already in a terminal state + continue + } + + // now go through the task's network attachments and find all of + // the networks + for _, attachment := range task.Networks { + // if the network is an overlay network, and the network ID is + // not yet in the set of network IDs, then add it to the set + // and add the network to the list of networks we'll be + // returning + if _, ok := networkIDs[attachment.Network.ID]; isOverlayNetwork(attachment.Network) && !ok { + networkIDs[attachment.Network.ID] = struct{}{} + // we don't need to worry about retrieving the network from + // the store, because the network in the attachment is an + // identical copy of the network in the store. + networks = append(networks, attachment.Network) + } + } + } + }) + + // finally, we need the ingress network if one exists. + if a.netCtx != nil && a.netCtx.ingressNetwork != nil { + networks = append(networks, a.netCtx.ingressNetwork) + } + + return networks, err +} + func (a *Allocator) allocateNodes(ctx context.Context, existingAddressesOnly bool) error { // Allocate nodes in the store so far before we process watched events. var ( @@ -409,13 +461,12 @@ func (a *Allocator) allocateNodes(ctx context.Context, existingAddressesOnly boo return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources") } - allocatedNetworks, err := a.getAllocatedNetworks() - if err != nil { - return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources") - } - for _, node := range nodes { - isAllocated := a.allocateNode(ctx, node, existingAddressesOnly, allocatedNetworks) + networks, err := a.getNodeNetworks(node.ID) + if err != nil { + return errors.Wrap(err, "error getting all networks needed by node") + } + isAllocated := a.allocateNode(ctx, node, existingAddressesOnly, networks) if isAllocated { allocatedNodes = append(allocatedNodes, node) } @@ -842,6 +893,14 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { } } + // if we're deallocating the task, we also might need to deallocate the + // node's network attachment, if this is the last task on the node that + // needs it. we can do that by doing the same dance to reallocate a + // node + if err := a.reallocateNode(ctx, t.NodeID); err != nil { + logger.WithError(err).Errorf("error reallocating node %v", t.NodeID) + } + // Cleanup any task references that might exist delete(nc.pendingTasks, t.ID) delete(nc.unallocatedTasks, t.ID) @@ -849,6 +908,16 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { return } + // if the task has a node ID, we should allocate an attachment for the node + // this happens if the task is in any non-terminal state. + if t.NodeID != "" && t.Status.State <= api.TaskStateRunning { + if err := a.reallocateNode(ctx, t.NodeID); err != nil { + // TODO(dperny): not entire sure what the error handling flow here + // should be... for now, just log and keep going + logger.WithError(err).Errorf("error reallocating node %v", t.NodeID) + } + } + // If we are already in allocated state, there is // absolutely nothing else to do. if t.Status.State >= api.TaskStatePending { @@ -887,13 +956,25 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { log.G(ctx).Debugf("task %v was marked pending allocation", t.ID) } +// allocateNode takes a context, a node, whether or not new allocations should +// be made, and the networks to allocate. it then makes sure an attachment is +// allocated for every network in the provided networks, allocating new +// attachments if existingAddressesOnly is false. it return true if something +// new was allocated or something was removed, or false otherwise. +// +// additionally, allocateNode will remove and free any attachments for networks +// not in the set of networks passed in. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node, existingAddressesOnly bool, networks []*api.Network) bool { var allocated bool nc := a.netCtx + // go through all of the networks we've passed in for _, network := range networks { + // for each one, create space for an attachment. then, search through + // all of the attachments already on the node. if the attachment + // exists, then copy it to the node. if not, we'll allocate it below. var lbAttachment *api.NetworkAttachment for _, na := range node.Attachments { if na.Network != nil && na.Network.ID == network.ID { @@ -927,8 +1008,83 @@ func (a *Allocator) allocateNode(ctx context.Context, node *api.Node, existingAd allocated = true } + + // if we're only initializing existing addresses, we should stop here and + // not deallocate anything + if existingAddressesOnly { + return allocated + } + + // now that we've allocated everything new, we have to remove things that + // do not belong. we have to do this last because we can easily roll back + // attachments we've allocated if something goes wrong by freeing them, but + // we can't roll back deallocating attachments by reacquiring them. + + // we're using a trick to filter without allocating see the official go + // wiki on github: + // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating + attachments := node.Attachments[:0] + for _, attach := range node.Attachments { + // for every attachment, go through every network. if the attachment + // belongs to one of the networks, then go to the next attachment. if + // no network matches, then the the attachment should be removed. + attachmentBelongs := false + for _, network := range networks { + if network.ID == attach.Network.ID { + attachmentBelongs = true + break + } + } + if attachmentBelongs { + attachments = append(attachments, attach) + } else { + // free the attachment and remove it from the node's attachments by + // re-slicing + if err := a.netCtx.nwkAllocator.DeallocateAttachment(node, attach); err != nil { + // if deallocation fails, there's nothing we can do besides log + // an error and keep going + log.G(ctx).WithError(err).Errorf( + "error deallocating attachment for network %v on node %v", + attach.Network.ID, node.ID, + ) + } + // strictly speaking, nothing was allocated, but something was + // deallocated and that counts. + allocated = true + // also, set the somethingWasDeallocated flag so the allocator + // knows that it can now try again. + a.netCtx.somethingWasDeallocated = true + } + } + node.Attachments = attachments + return allocated +} + +func (a *Allocator) reallocateNode(ctx context.Context, nodeID string) error { + var ( + node *api.Node + ) + a.store.View(func(tx store.ReadTx) { + node = store.GetNode(tx, nodeID) + }) + if node == nil { + return errors.Errorf("node %v cannot be found", nodeID) + } + networks, err := a.getNodeNetworks(node.ID) + if err != nil { + return errors.Wrapf(err, "error getting networks for node %v", nodeID) + } + if a.allocateNode(ctx, node, false, networks) { + // if something was allocated, commit the node + if err := a.store.Batch(func(batch *store.Batch) error { + return a.commitAllocatedNode(ctx, batch, node) + }); err != nil { + return errors.Wrapf(err, "error committing allocation for node %v", nodeID) + } + } + return nil } func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {