From 7f9db677bf1d6413bf6f9a41e4e7349e0dea3d65 Mon Sep 17 00:00:00 2001 From: Abhinandan Date: Fri, 23 Mar 2018 13:09:22 -0700 Subject: [PATCH] Adding logic to restore networks in order This commits adds a fix for restore case where there might a mix of allocated and unallocated network in raft. During restore the allocator was going over the networks lexicographically which would mean that there might be a chance for an unallocated network say net1 o be allocated the same vxlan id or subnet pool that was allocated to another networki net2. Because of this during restoring, when allocator tries to allocate the reallocate network net2, it would fail because it allocated network resources to net1 during restore. This would mean services,tasks and network itself would be in a messed up state. Signed-off-by: Abhinandan --- manager/allocator/allocator_test.go | 247 +++++++++++++++++++++++++++- manager/allocator/network.go | 102 ++++++++---- 2 files changed, 308 insertions(+), 41 deletions(-) diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 6a509b9016..1cb398d09a 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -563,9 +563,18 @@ func TestNoDuplicateIPs(t *testing.T) { }, Ingress: true, }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, } assert.NoError(t, store.CreateNetwork(tx, in)) - n1 := &api.Network{ ID: "testID1", Spec: api.NetworkSpec{ @@ -573,6 +582,16 @@ func TestNoDuplicateIPs(t *testing.T) { Name: "test1", }, }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.1.0.0/24", + Gateway: "10.1.0.1", + }, + }, + }, + DriverState: &api.Driver{}, } assert.NoError(t, store.CreateNetwork(tx, n1)) @@ -649,7 +668,6 @@ func TestNoDuplicateIPs(t *testing.T) { return nil })) - a, err := New(s, nil) assert.NoError(t, err) assert.NotNil(t, a) @@ -661,7 +679,6 @@ func TestNoDuplicateIPs(t *testing.T) { // Confirm task gets a unique IP watchTask(t, s, taskWatch, false, hasUniqueIP) - a.Stop() } } @@ -682,6 +699,15 @@ func TestAllocatorRestoreForDuplicateIPs(t *testing.T) { }, Ingress: true, }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, } assert.NoError(t, store.CreateNetwork(tx, in)) @@ -815,6 +841,16 @@ func TestAllocatorRestartNoEndpointSpec(t *testing.T) { Name: "net1", }, }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, } assert.NoError(t, store.CreateNetwork(tx, in)) @@ -887,7 +923,6 @@ func TestAllocatorRestartNoEndpointSpec(t *testing.T) { hasNoIPOverlapServices := func(fakeT assert.TestingT, service *api.Service) bool { assert.NotEqual(fakeT, len(service.Endpoint.VirtualIPs), 0) assert.NotEqual(fakeT, len(service.Endpoint.VirtualIPs[0].Addr), 0) - assignedVIP := service.Endpoint.VirtualIPs[0].Addr if assignedIPs[assignedVIP] { t.Fatalf("service %s assigned duplicate IP %s", service.ID, assignedVIP) @@ -903,7 +938,6 @@ func TestAllocatorRestartNoEndpointSpec(t *testing.T) { hasNoIPOverlapTasks := func(fakeT assert.TestingT, s *store.MemoryStore, task *api.Task) bool { assert.NotEqual(fakeT, len(task.Networks), 0) assert.NotEqual(fakeT, len(task.Networks[0].Addresses), 0) - assignedIP := task.Networks[0].Addresses[0] if assignedIPs[assignedIP] { t.Fatalf("task %s assigned duplicate IP %s", task.ID, assignedIP) @@ -939,6 +973,209 @@ func TestAllocatorRestartNoEndpointSpec(t *testing.T) { assert.Len(t, expectedIPs, 0) } +// TestAllocatorRestoreForUnallocatedNetwork tests allocator restart +// scenarios where there is a combination of allocated and unallocated +// networks and tests whether the restore logic ensures the networks +// services and tasks that were preallocated are allocated correctly +// followed by the allocation of unallocated networks prior to the +// restart. +func TestAllocatorRestoreForUnallocatedNetwork(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + // Create 3 services with 1 task each + numsvcstsks := 3 + var n1 *api.Network + var n2 *api.Network + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + n1 = &api.Network{ + ID: "testID1", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "test1", + }, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.1.0.0/24", + Gateway: "10.1.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, n1)) + + n2 = &api.Network{ + // Intentionally named testID0 so that in restore this network + // is looked into first + ID: "testID0", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "test2", + }, + }, + } + assert.NoError(t, store.CreateNetwork(tx, n2)) + + for i := 0; i != numsvcstsks; i++ { + svc := &api.Service{ + ID: "testServiceID" + strconv.Itoa(i), + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service" + strconv.Itoa(i), + }, + Task: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "testID1", + }, + }, + }, + Endpoint: &api.EndpointSpec{ + Mode: api.ResolutionModeVirtualIP, + Ports: []*api.PortConfig{ + { + Name: "", + Protocol: api.ProtocolTCP, + TargetPort: 8000, + PublishedPort: uint32(8001 + i), + }, + }, + }, + }, + Endpoint: &api.Endpoint{ + Ports: []*api.PortConfig{ + { + Name: "", + Protocol: api.ProtocolTCP, + TargetPort: 8000, + PublishedPort: uint32(8001 + i), + }, + }, + VirtualIPs: []*api.Endpoint_VirtualIP{ + { + NetworkID: "ingress-nw-id", + Addr: "10.0.0." + strconv.Itoa(2+i) + "/24", + }, + { + NetworkID: "testID1", + Addr: "10.1.0." + strconv.Itoa(2+i) + "/24", + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, svc)) + } + return nil + })) + + for i := 0; i != numsvcstsks; i++ { + assert.NoError(t, s.Update(func(tx store.Tx) error { + tsk := &api.Task{ + ID: "testTaskID" + strconv.Itoa(i), + Status: api.TaskStatus{ + State: api.TaskStateNew, + }, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "testID1", + }, + }, + }, + ServiceID: "testServiceID" + strconv.Itoa(i), + DesiredState: api.TaskStateRunning, + } + assert.NoError(t, store.CreateTask(tx, tsk)) + return nil + })) + } + + assignedIPs := make(map[string]bool) + expectedIPs := map[string]string{ + "testServiceID0": "10.1.0.2/24", + "testServiceID1": "10.1.0.3/24", + "testServiceID2": "10.1.0.4/24", + "testTaskID0": "10.1.0.5/24", + "testTaskID1": "10.1.0.6/24", + "testTaskID2": "10.1.0.7/24", + } + hasNoIPOverlapServices := func(fakeT assert.TestingT, service *api.Service) bool { + assert.NotEqual(fakeT, len(service.Endpoint.VirtualIPs), 0) + assert.NotEqual(fakeT, len(service.Endpoint.VirtualIPs[0].Addr), 0) + assignedVIP := service.Endpoint.VirtualIPs[1].Addr + if assignedIPs[assignedVIP] { + t.Fatalf("service %s assigned duplicate IP %s", service.ID, assignedVIP) + } + assignedIPs[assignedVIP] = true + ip, ok := expectedIPs[service.ID] + assert.True(t, ok) + assert.Equal(t, ip, assignedVIP) + delete(expectedIPs, service.ID) + return true + } + + hasNoIPOverlapTasks := func(fakeT assert.TestingT, s *store.MemoryStore, task *api.Task) bool { + assert.NotEqual(fakeT, len(task.Networks), 0) + assert.NotEqual(fakeT, len(task.Networks[0].Addresses), 0) + assignedIP := task.Networks[1].Addresses[0] + if assignedIPs[assignedIP] { + t.Fatalf("task %s assigned duplicate IP %s", task.ID, assignedIP) + } + assignedIPs[assignedIP] = true + ip, ok := expectedIPs[task.ID] + assert.True(t, ok) + assert.Equal(t, ip, assignedIP) + delete(expectedIPs, task.ID) + return true + } + + a, err := New(s, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + // Start allocator + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + taskWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}, api.EventDeleteTask{}) + defer cancel() + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + // Confirm tasks have no IPs that overlap with the services VIPs on restart + for i := 0; i != numsvcstsks; i++ { + watchTask(t, s, taskWatch, false, hasNoIPOverlapTasks) + watchService(t, serviceWatch, false, hasNoIPOverlapServices) + } +} + func TestNodeAllocator(t *testing.T) { s := store.NewMemoryStore(nil) assert.NotNil(t, s) diff --git a/manager/allocator/network.go b/manager/allocator/network.go index aee03870b1..857692fe2b 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -118,42 +118,15 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { return errors.Wrap(err, "failure while looking for ingress network during init") } - // Allocate networks in the store so far before we started - // watching. - var networks []*api.Network - a.store.View(func(tx store.ReadTx) { - networks, err = store.FindNetworks(tx, store.All) - }) - if err != nil { - return errors.Wrap(err, "error listing all networks in store while trying to allocate during init") - } - - var allocatedNetworks []*api.Network - for _, n := range networks { - if na.IsAllocated(n) { - continue - } - - if err := a.allocateNetwork(ctx, n); err != nil { - log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID) - continue - } - allocatedNetworks = append(allocatedNetworks, n) - } - - if err := a.store.Batch(func(batch *store.Batch) error { - for _, n := range allocatedNetworks { - if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil { - log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID) - } - } - return nil - }); err != nil { - log.G(ctx).WithError(err).Error("failed committing allocation of networks during init") + // First, allocate (read it as restore) objects likes network,nodes,serives + // and tasks that were already allocated. Then go on the allocate objects + // that are in raft and were previously not allocated. The reason being, during + // restore, we make sure that we populate the allocated states of + // the objects in the raft onto our in memory state. + if err := a.allocateNetworks(ctx, true); err != nil { + return err } - // First, allocate objects that already have addresses associated with - // them, to reserve these IP addresses in internal state. if err := a.allocateNodes(ctx, true); err != nil { return err } @@ -164,6 +137,11 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { if err := a.allocateTasks(ctx, true); err != nil { return err } + // Now allocate objects that were not previously allocated + // but were present in the raft. + if err := a.allocateNetworks(ctx, false); err != nil { + return err + } if err := a.allocateNodes(ctx, false); err != nil { return err @@ -184,7 +162,6 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { if nc.nwkAllocator.IsAllocated(n) { break } - if IsIngressNetwork(n) && nc.ingressNetwork != nil { log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)", n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations) @@ -560,6 +537,60 @@ func (a *Allocator) deallocateNode(node *api.Node) error { return nil } +// allocateNetworks allocates (restores) networks in the store so far before we process +// watched events. existingOnly flags is set to true to specify if only allocated +// networks need to be restored. +func (a *Allocator) allocateNetworks(ctx context.Context, existingOnly bool) error { + var ( + nc = a.netCtx + networks []*api.Network + err error + ) + a.store.View(func(tx store.ReadTx) { + networks, err = store.FindNetworks(tx, store.All) + }) + if err != nil { + return errors.Wrap(err, "error listing all networks in store while trying to allocate during init") + } + + var allocatedNetworks []*api.Network + for _, n := range networks { + if nc.nwkAllocator.IsAllocated(n) { + continue + } + // Network is considered allocated only if the DriverState and IPAM are NOT nil. + // During initial restore (existingOnly being true), check the network state in + // raft store. If it is allocated, then restore the same in the in memory allocator + // state. If it is not allocated, then skip allocating the network at this step. + // This is to avoid allocating an in-use network IP, subnet pool or vxlan id to + // another network. + if existingOnly && + (n.DriverState == nil || + n.IPAM == nil) { + continue + } + + if err := a.allocateNetwork(ctx, n); err != nil { + log.G(ctx).WithField("existingOnly", existingOnly).WithError(err).Errorf("failed allocating network %s during init", n.ID) + continue + } + allocatedNetworks = append(allocatedNetworks, n) + } + + if err := a.store.Batch(func(batch *store.Batch) error { + for _, n := range allocatedNetworks { + if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil { + log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID) + } + } + return nil + }); err != nil { + log.G(ctx).WithError(err).Error("failed committing allocation of networks during init") + } + + return nil +} + // allocateServices allocates services in the store so far before we process // watched events. func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly bool) error { @@ -580,7 +611,6 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) { continue } - if existingAddressesOnly && (s.Endpoint == nil || len(s.Endpoint.VirtualIPs) == 0) {