diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 51136f6f72..55911f513e 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -563,9 +563,18 @@ func TestNoDuplicateIPs(t *testing.T) { }, Ingress: true, }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, } assert.NoError(t, store.CreateNetwork(tx, in)) - n1 := &api.Network{ ID: "testID1", Spec: api.NetworkSpec{ @@ -573,6 +582,16 @@ func TestNoDuplicateIPs(t *testing.T) { Name: "test1", }, }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.1.0.0/24", + Gateway: "10.1.0.1", + }, + }, + }, + DriverState: &api.Driver{}, } assert.NoError(t, store.CreateNetwork(tx, n1)) @@ -649,7 +668,6 @@ func TestNoDuplicateIPs(t *testing.T) { return nil })) - a, err := New(s, nil) assert.NoError(t, err) assert.NotNil(t, a) @@ -661,7 +679,6 @@ func TestNoDuplicateIPs(t *testing.T) { // Confirm task gets a unique IP watchTask(t, s, taskWatch, false, hasUniqueIP) - a.Stop() } } @@ -682,6 +699,15 @@ func TestAllocatorRestoreForDuplicateIPs(t *testing.T) { }, Ingress: true, }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, } assert.NoError(t, store.CreateNetwork(tx, in)) @@ -815,6 +841,16 @@ func TestAllocatorRestartNoEndpointSpec(t *testing.T) { Name: "net1", }, }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, } assert.NoError(t, store.CreateNetwork(tx, in)) @@ -887,7 +923,6 @@ func TestAllocatorRestartNoEndpointSpec(t *testing.T) { hasNoIPOverlapServices := func(fakeT assert.TestingT, service *api.Service) bool { assert.NotEqual(fakeT, len(service.Endpoint.VirtualIPs), 0) assert.NotEqual(fakeT, len(service.Endpoint.VirtualIPs[0].Addr), 0) - assignedVIP := service.Endpoint.VirtualIPs[0].Addr if assignedIPs[assignedVIP] { t.Fatalf("service %s assigned duplicate IP %s", service.ID, assignedVIP) @@ -903,7 +938,6 @@ func TestAllocatorRestartNoEndpointSpec(t *testing.T) { hasNoIPOverlapTasks := func(fakeT assert.TestingT, s *store.MemoryStore, task *api.Task) bool { assert.NotEqual(fakeT, len(task.Networks), 0) assert.NotEqual(fakeT, len(task.Networks[0].Addresses), 0) - assignedIP := task.Networks[0].Addresses[0] if assignedIPs[assignedIP] { t.Fatalf("task %s assigned duplicate IP %s", task.ID, assignedIP) @@ -939,6 +973,209 @@ func TestAllocatorRestartNoEndpointSpec(t *testing.T) { assert.Len(t, expectedIPs, 0) } +// TestAllocatorRestoreForUnallocatedNetwork tests allocator restart +// scenarios where there is a combination of allocated and unallocated +// networks and tests whether the restore logic ensures the networks +// services and tasks that were preallocated are allocated correctly +// followed by the allocation of unallocated networks prior to the +// restart. +func TestAllocatorRestoreForUnallocatedNetwork(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + // Create 3 services with 1 task each + numsvcstsks := 3 + var n1 *api.Network + var n2 *api.Network + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + n1 = &api.Network{ + ID: "testID1", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "test1", + }, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.1.0.0/24", + Gateway: "10.1.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, n1)) + + n2 = &api.Network{ + // Intentionally named testID0 so that in restore this network + // is looked into first + ID: "testID0", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "test2", + }, + }, + } + assert.NoError(t, store.CreateNetwork(tx, n2)) + + for i := 0; i != numsvcstsks; i++ { + svc := &api.Service{ + ID: "testServiceID" + strconv.Itoa(i), + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service" + strconv.Itoa(i), + }, + Task: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "testID1", + }, + }, + }, + Endpoint: &api.EndpointSpec{ + Mode: api.ResolutionModeVirtualIP, + Ports: []*api.PortConfig{ + { + Name: "", + Protocol: api.ProtocolTCP, + TargetPort: 8000, + PublishedPort: uint32(8001 + i), + }, + }, + }, + }, + Endpoint: &api.Endpoint{ + Ports: []*api.PortConfig{ + { + Name: "", + Protocol: api.ProtocolTCP, + TargetPort: 8000, + PublishedPort: uint32(8001 + i), + }, + }, + VirtualIPs: []*api.Endpoint_VirtualIP{ + { + NetworkID: "ingress-nw-id", + Addr: "10.0.0." + strconv.Itoa(2+i) + "/24", + }, + { + NetworkID: "testID1", + Addr: "10.1.0." + strconv.Itoa(2+i) + "/24", + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, svc)) + } + return nil + })) + + for i := 0; i != numsvcstsks; i++ { + assert.NoError(t, s.Update(func(tx store.Tx) error { + tsk := &api.Task{ + ID: "testTaskID" + strconv.Itoa(i), + Status: api.TaskStatus{ + State: api.TaskStateNew, + }, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "testID1", + }, + }, + }, + ServiceID: "testServiceID" + strconv.Itoa(i), + DesiredState: api.TaskStateRunning, + } + assert.NoError(t, store.CreateTask(tx, tsk)) + return nil + })) + } + + assignedIPs := make(map[string]bool) + expectedIPs := map[string]string{ + "testServiceID0": "10.1.0.2/24", + "testServiceID1": "10.1.0.3/24", + "testServiceID2": "10.1.0.4/24", + "testTaskID0": "10.1.0.5/24", + "testTaskID1": "10.1.0.6/24", + "testTaskID2": "10.1.0.7/24", + } + hasNoIPOverlapServices := func(fakeT assert.TestingT, service *api.Service) bool { + assert.NotEqual(fakeT, len(service.Endpoint.VirtualIPs), 0) + assert.NotEqual(fakeT, len(service.Endpoint.VirtualIPs[0].Addr), 0) + assignedVIP := service.Endpoint.VirtualIPs[1].Addr + if assignedIPs[assignedVIP] { + t.Fatalf("service %s assigned duplicate IP %s", service.ID, assignedVIP) + } + assignedIPs[assignedVIP] = true + ip, ok := expectedIPs[service.ID] + assert.True(t, ok) + assert.Equal(t, ip, assignedVIP) + delete(expectedIPs, service.ID) + return true + } + + hasNoIPOverlapTasks := func(fakeT assert.TestingT, s *store.MemoryStore, task *api.Task) bool { + assert.NotEqual(fakeT, len(task.Networks), 0) + assert.NotEqual(fakeT, len(task.Networks[0].Addresses), 0) + assignedIP := task.Networks[1].Addresses[0] + if assignedIPs[assignedIP] { + t.Fatalf("task %s assigned duplicate IP %s", task.ID, assignedIP) + } + assignedIPs[assignedIP] = true + ip, ok := expectedIPs[task.ID] + assert.True(t, ok) + assert.Equal(t, ip, assignedIP) + delete(expectedIPs, task.ID) + return true + } + + a, err := New(s, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + // Start allocator + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + taskWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}, api.EventDeleteTask{}) + defer cancel() + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + // Confirm tasks have no IPs that overlap with the services VIPs on restart + for i := 0; i != numsvcstsks; i++ { + watchTask(t, s, taskWatch, false, hasNoIPOverlapTasks) + watchService(t, serviceWatch, false, hasNoIPOverlapServices) + } +} + func TestNodeAllocator(t *testing.T) { s := store.NewMemoryStore(nil) assert.NotNil(t, s) diff --git a/manager/allocator/network.go b/manager/allocator/network.go index eb212be051..4cdcb0bacc 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -111,38 +111,13 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { return errors.Wrap(err, "failure while looking for ingress network during init") } - // Allocate networks in the store so far before we started - // watching. - var networks []*api.Network - a.store.View(func(tx store.ReadTx) { - networks, err = store.FindNetworks(tx, store.All) - }) - if err != nil { - return errors.Wrap(err, "error listing all networks in store while trying to allocate during init") - } - - var allocatedNetworks []*api.Network - for _, n := range networks { - if na.IsAllocated(n) { - continue - } - - if err := a.allocateNetwork(ctx, n); err != nil { - log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID) - continue - } - allocatedNetworks = append(allocatedNetworks, n) - } - - if err := a.store.Batch(func(batch *store.Batch) error { - for _, n := range allocatedNetworks { - if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil { - log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID) - } - } - return nil - }); err != nil { - log.G(ctx).WithError(err).Error("failed committing allocation of networks during init") + // First, allocate (read it as restore) objects likes network,nodes,serives + // and tasks that were already allocated. Then go on the allocate objects + // that are in raft and were previously not allocated. The reason being, during + // restore, we make sure that we populate the allocated states of + // the objects in the raft onto our in memory state. + if err := a.allocateNetworks(ctx, true); err != nil { + return err } // First, allocate objects that already have addresses associated with @@ -158,6 +133,11 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { if err := a.allocateTasks(ctx, true); err != nil { return err } + // Now allocate objects that were not previously allocated + // but were present in the raft. + if err := a.allocateNetworks(ctx, false); err != nil { + return err + } // Now allocate objects that don't have addresses yet. if nc.ingressNetwork != nil { @@ -180,7 +160,6 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { if nc.nwkAllocator.IsAllocated(n) { break } - if IsIngressNetwork(n) && nc.ingressNetwork != nil { log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)", n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations) @@ -455,6 +434,60 @@ func (a *Allocator) deallocateNodes(ctx context.Context) error { return nil } +// allocateNetworks allocates (restores) networks in the store so far before we process +// watched events. existingOnly flags is set to true to specify if only allocated +// networks need to be restored. +func (a *Allocator) allocateNetworks(ctx context.Context, existingOnly bool) error { + var ( + nc = a.netCtx + networks []*api.Network + err error + ) + a.store.View(func(tx store.ReadTx) { + networks, err = store.FindNetworks(tx, store.All) + }) + if err != nil { + return errors.Wrap(err, "error listing all networks in store while trying to allocate during init") + } + + var allocatedNetworks []*api.Network + for _, n := range networks { + if nc.nwkAllocator.IsAllocated(n) { + continue + } + // Network is considered allocated only if the DriverState and IPAM are NOT nil. + // During initial restore (existingOnly being true), check the network state in + // raft store. If it is allocated, then restore the same in the in memory allocator + // state. If it is not allocated, then skip allocating the network at this step. + // This is to avoid allocating an in-use network IP, subnet pool or vxlan id to + // another network. + if existingOnly && + (n.DriverState == nil || + n.IPAM == nil) { + continue + } + + if err := a.allocateNetwork(ctx, n); err != nil { + log.G(ctx).WithField("existingOnly", existingOnly).WithError(err).Errorf("failed allocating network %s during init", n.ID) + continue + } + allocatedNetworks = append(allocatedNetworks, n) + } + + if err := a.store.Batch(func(batch *store.Batch) error { + for _, n := range allocatedNetworks { + if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil { + log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID) + } + } + return nil + }); err != nil { + log.G(ctx).WithError(err).Error("failed committing allocation of networks during init") + } + + return nil +} + // allocateServices allocates services in the store so far before we process // watched events. func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly bool) error { @@ -475,7 +508,6 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly if !nc.nwkAllocator.ServiceNeedsAllocation(s, networkallocator.OnInit) { continue } - if existingAddressesOnly && (s.Endpoint == nil || len(s.Endpoint.VirtualIPs) == 0) {