From 710aef85c054a99d1b2dc75d088200f0874f6e54 Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Mon, 6 Aug 2018 13:11:24 -0500 Subject: [PATCH] Only allocate attachments needed by nodes Instead of allocating an attachment for every network on every node, allocate and deallocate attachments on nodes based off whether or not the node needs the attachment because a task using that network is on the node. Signed-off-by: Drew Erny --- manager/allocator/allocator_test.go | 134 ++++++++++++++++-- manager/allocator/network.go | 202 ++++++++++++++++++++++++---- 2 files changed, 299 insertions(+), 37 deletions(-) diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 1cb398d09a..5a3bf9633c 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -1214,7 +1214,19 @@ func TestNodeAllocator(t *testing.T) { } assert.NoError(t, store.CreateNetwork(tx, n1)) + // this network will never be used for any task + nUnused := &api.Network{ + ID: "overlayIDUnused", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "overlayIDUnused", + }, + }, + } + assert.NoError(t, store.CreateNetwork(tx, nUnused)) + assert.NoError(t, store.CreateNode(tx, node1)) + return nil })) @@ -1222,6 +1234,8 @@ func TestNodeAllocator(t *testing.T) { defer cancel() netWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateNetwork{}, api.EventDeleteNetwork{}) defer cancel() + taskWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) + defer cancel() // Start allocator go func() { @@ -1229,12 +1243,36 @@ func TestNodeAllocator(t *testing.T) { }() defer a.Stop() + assert.NoError(t, s.Update(func(tx store.Tx) error { + // create a task assigned to this node that has a network attachment on + // n1 + t1 := &api.Task{ + ID: "task1", + NodeID: node1.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID1", + }, + }, + }, + } + + return store.CreateTask(tx, t1) + })) + + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + // Validate node has 2 LB IP address (1 for each network). watchNetwork(t, netWatch, false, isValidNetwork) // ingress watchNetwork(t, netWatch, false, isValidNetwork) // overlayID1 + watchNetwork(t, netWatch, false, isValidNetwork) // overlayIDUnused watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 - // Add a node and validate it gets a LB ip on each network. + // Add a node and validate it gets a LB ip only on ingress, as it has no + // tasks assigned. node2 := &api.Node{ ID: "nodeID2", } @@ -1242,9 +1280,9 @@ func TestNodeAllocator(t *testing.T) { assert.NoError(t, store.CreateNode(tx, node2)) return nil })) - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1"}) // node2 + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress"}) // node2 - // Add a network and validate each node has 3 LB IP addresses + // Add a network and validate that nothing has changed in the nodes n2 := &api.Network{ ID: "overlayID2", Spec: api.NetworkSpec{ @@ -1257,18 +1295,86 @@ func TestNodeAllocator(t *testing.T) { assert.NoError(t, store.CreateNetwork(tx, n2)) return nil })) - watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 - watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1", "overlayID3"}) // node1 - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1", "overlayID3"}) // node2 + watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 + // nothing should change, no updates + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress"}) // node2 - // Remove a network and validate each node has 2 LB IP addresses + // add a task and validate that the node gets the network for the task assert.NoError(t, s.Update(func(tx store.Tx) error { - assert.NoError(t, store.DeleteNetwork(tx, n2.ID)) + // create a task assigned to this node that has a network attachment on + // n1 + t2 := &api.Task{ + ID: "task2", + NodeID: node2.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID2", + }, + }, + }, + } + + return store.CreateTask(tx, t2) + })) + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + + // validate that node2 gets a new attachment and node1 stays the same + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + + // add another task with the same network to a node and validate that it + // still only has 1 attachment for that network + assert.NoError(t, s.Update(func(tx store.Tx) error { + // create a task assigned to this node that has a network attachment on + // n1 + t3 := &api.Task{ + ID: "task3", + NodeID: node1.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID1", + }, + }, + }, + } + + return store.CreateTask(tx, t3) + })) + + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + + // validate that nothing changes + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 + + // now remove that task we just created, and validate that the node still + // has an attachment for the other task + // Remove a node and validate remaining node has 2 LB IP addresses + assert.NoError(t, s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteTask(tx, "task1")) return nil })) - watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 - watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1"}) // node2 + + // validate that nothing changes + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 + + // now remove another task. this time the attachment on the node should be + // removed as well + assert.NoError(t, s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteTask(tx, "task2")) + return nil + })) + + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress"}) // node2 + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 // Remove a node and validate remaining node has 2 LB IP addresses assert.NoError(t, s.Update(func(tx store.Tx) error { @@ -1418,13 +1524,13 @@ func watchNode(t *testing.T, watch chan events.Event, expectTimeout bool, } } - case <-time.After(1 * time.Millisecond): + case <-time.After(getWatchTimeout(expectTimeout)): if !expectTimeout { if node != nil && fn != nil { fn(t, originalNode, node, networks) } - t.Fatal("timed out before watchNode found expected node state") + t.Fatal("timed out before watchNode found expected node state", string(debug.Stack())) } return @@ -1457,7 +1563,7 @@ func watchNetwork(t *testing.T, watch chan events.Event, expectTimeout bool, fn fn(t, network) } - t.Fatal("timed out before watchNetwork found expected network state") + t.Fatal("timed out before watchNetwork found expected network state", string(debug.Stack())) } return diff --git a/manager/allocator/network.go b/manager/allocator/network.go index 3083db8e3b..c87b7757bf 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -181,10 +181,6 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { if IsIngressNetwork(n) { nc.ingressNetwork = n } - err := a.allocateNodes(ctx, false) - if err != nil { - log.G(ctx).WithError(err).Error(err) - } case api.EventDeleteNetwork: n := v.Network.Copy() @@ -337,19 +333,12 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) { nc.somethingWasDeallocated = true } } else { - allocatedNetworks, err := a.getAllocatedNetworks() - if err != nil { - log.G(ctx).WithError(err).Errorf("Error listing allocated networks in network %s", node.ID) - } - - isAllocated := a.allocateNode(ctx, node, false, allocatedNetworks) - - if isAllocated { - if err := a.store.Batch(func(batch *store.Batch) error { - return a.commitAllocatedNode(ctx, batch, node) - }); err != nil { - log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID) - } + // if this isn't a delete, we should try reallocating the node. if this + // is a creation, then the node will be allocated only for ingress. + if err := a.reallocateNode(ctx, node.ID); err != nil { + log.G(ctx).WithError(err).Errorf( + "error reallocating network resources for node %v", node.ID, + ) } } } @@ -394,6 +383,69 @@ func (a *Allocator) getAllocatedNetworks() ([]*api.Network, error) { return allocatedNetworks, nil } +// getNodeNetworks returns all networks that should be allocated for a node +func (a *Allocator) getNodeNetworks(nodeID string) ([]*api.Network, error) { + var ( + // no need to initialize networks. we only append to it, and appending + // to a nil slice is valid. this has the added bonus of making this nil + // if we return an error + networks []*api.Network + err error + ) + a.store.View(func(tx store.ReadTx) { + // get all tasks currently assigned to this node. it's no big deal if + // the tasks change in the meantime, there's no race to clean up + // unneeded network attachments on a node. + var tasks []*api.Task + tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID)) + if err != nil { + return + } + // we need to keep track of network IDs that we've already added to the + // list of networks we're going to return. we could do + // map[string]*api.Network and then convert to []*api.Network and + // return that, but it seems cleaner to have a separate set and list. + networkIDs := map[string]struct{}{} + for _, task := range tasks { + // we don't need to check if a task is before the Assigned state. + // the only way we have a task with a NodeID that isn't yet in + // Assigned is if it's a global service task. this check is not + // necessary: + // if task.Status.State < api.TaskStateAssigned { + // continue + // } + if task.Status.State > api.TaskStateRunning { + // we don't need to have network attachments for a task that's + // already in a terminal state + continue + } + + // now go through the task's network attachments and find all of + // the networks + for _, attachment := range task.Networks { + // if the network is an overlay network, and the network ID is + // not yet in the set of network IDs, then add it to the set + // and add the network to the list of networks we'll be + // returning + if _, ok := networkIDs[attachment.Network.ID]; isOverlayNetwork(attachment.Network) && !ok { + networkIDs[attachment.Network.ID] = struct{}{} + // we don't need to worry about retrieving the network from + // the store, because the network in the attachment is an + // identical copy of the network in the store. + networks = append(networks, attachment.Network) + } + } + } + }) + + // finally, we need the ingress network if one exists. + if a.netCtx != nil && a.netCtx.ingressNetwork != nil { + networks = append(networks, a.netCtx.ingressNetwork) + } + + return networks, err +} + func (a *Allocator) allocateNodes(ctx context.Context, existingAddressesOnly bool) error { // Allocate nodes in the store so far before we process watched events. var ( @@ -409,13 +461,12 @@ func (a *Allocator) allocateNodes(ctx context.Context, existingAddressesOnly boo return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources") } - allocatedNetworks, err := a.getAllocatedNetworks() - if err != nil { - return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources") - } - for _, node := range nodes { - isAllocated := a.allocateNode(ctx, node, existingAddressesOnly, allocatedNetworks) + networks, err := a.getNodeNetworks(node.ID) + if err != nil { + return errors.Wrap(err, "error getting all networks needed by node") + } + isAllocated := a.allocateNode(ctx, node, existingAddressesOnly, networks) if isAllocated { allocatedNodes = append(allocatedNodes, node) } @@ -842,6 +893,14 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { } } + // if we're deallocating the task, we also might need to deallocate the + // node's network attachment, if this is the last task on the node that + // needs it. we can do that by doing the same dance to reallocate a + // node + if err := a.reallocateNode(ctx, t.NodeID); err != nil { + logger.WithError(err).Errorf("error reallocating node %v", t.NodeID) + } + // Cleanup any task references that might exist delete(nc.pendingTasks, t.ID) delete(nc.unallocatedTasks, t.ID) @@ -849,6 +908,16 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { return } + // if the task has a node ID, we should allocate an attachment for the node + // this happens if the task is in any non-terminal state. + if t.NodeID != "" && t.Status.State <= api.TaskStateRunning { + if err := a.reallocateNode(ctx, t.NodeID); err != nil { + // TODO(dperny): not entire sure what the error handling flow here + // should be... for now, just log and keep going + logger.WithError(err).Errorf("error reallocating node %v", t.NodeID) + } + } + // If we are already in allocated state, there is // absolutely nothing else to do. if t.Status.State >= api.TaskStatePending { @@ -887,13 +956,25 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { log.G(ctx).Debugf("task %v was marked pending allocation", t.ID) } +// allocateNode takes a context, a node, whether or not new allocations should +// be made, and the networks to allocate. it then makes sure an attachment is +// allocated for every network in the provided networks, allocating new +// attachments if existingAddressesOnly is false. it return true if something +// new was allocated or something was removed, or false otherwise. +// +// additionally, allocateNode will remove and free any attachments for networks +// not in the set of networks passed in. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node, existingAddressesOnly bool, networks []*api.Network) bool { var allocated bool nc := a.netCtx + // go through all of the networks we've passed in for _, network := range networks { + // for each one, create space for an attachment. then, search through + // all of the attachments already on the node. if the attachment + // exists, then copy it to the node. if not, we'll allocate it below. var lbAttachment *api.NetworkAttachment for _, na := range node.Attachments { if na.Network != nil && na.Network.ID == network.ID { @@ -927,8 +1008,83 @@ func (a *Allocator) allocateNode(ctx context.Context, node *api.Node, existingAd allocated = true } + + // if we're only initializing existing addresses, we should stop here and + // not deallocate anything + if existingAddressesOnly { + return allocated + } + + // now that we've allocated everything new, we have to remove things that + // do not belong. we have to do this last because we can easily roll back + // attachments we've allocated if something goes wrong by freeing them, but + // we can't roll back deallocating attachments by reacquiring them. + + // we're using a trick to filter without allocating see the official go + // wiki on github: + // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating + attachments := node.Attachments[:0] + for _, attach := range node.Attachments { + // for every attachment, go through every network. if the attachment + // belongs to one of the networks, then go to the next attachment. if + // no network matches, then the the attachment should be removed. + attachmentBelongs := false + for _, network := range networks { + if network.ID == attach.Network.ID { + attachmentBelongs = true + break + } + } + if attachmentBelongs { + attachments = append(attachments, attach) + } else { + // free the attachment and remove it from the node's attachments by + // re-slicing + if err := a.netCtx.nwkAllocator.DeallocateAttachment(node, attach); err != nil { + // if deallocation fails, there's nothing we can do besides log + // an error and keep going + log.G(ctx).WithError(err).Errorf( + "error deallocating attachment for network %v on node %v", + attach.Network.ID, node.ID, + ) + } + // strictly speaking, nothing was allocated, but something was + // deallocated and that counts. + allocated = true + // also, set the somethingWasDeallocated flag so the allocator + // knows that it can now try again. + a.netCtx.somethingWasDeallocated = true + } + } + node.Attachments = attachments + return allocated +} + +func (a *Allocator) reallocateNode(ctx context.Context, nodeID string) error { + var ( + node *api.Node + ) + a.store.View(func(tx store.ReadTx) { + node = store.GetNode(tx, nodeID) + }) + if node == nil { + return errors.Errorf("node %v cannot be found", nodeID) + } + networks, err := a.getNodeNetworks(node.ID) + if err != nil { + return errors.Wrapf(err, "error getting networks for node %v", nodeID) + } + if a.allocateNode(ctx, node, false, networks) { + // if something was allocated, commit the node + if err := a.store.Batch(func(batch *store.Batch) error { + return a.commitAllocatedNode(ctx, batch, node) + }); err != nil { + return errors.Wrapf(err, "error committing allocation for node %v", nodeID) + } + } + return nil } func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {