diff --git a/manager/orchestrator/global/global.go b/manager/orchestrator/global/global.go index b89a105fec..ee4c2baea3 100644 --- a/manager/orchestrator/global/global.go +++ b/manager/orchestrator/global/global.go @@ -169,12 +169,6 @@ func (g *Orchestrator) Run(ctx context.Context) error { delete(g.nodes, v.Node.ID) case api.EventUpdateTask: g.handleTaskChange(ctx, v.Task) - case api.EventDeleteTask: - // CLI allows deleting task - if _, exists := g.globalServices[v.Task.ServiceID]; !exists { - continue - } - g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID) } case <-g.stopChan: return nil @@ -216,7 +210,7 @@ func (g *Orchestrator) handleTaskChange(ctx context.Context, t *api.Task) { if _, exists := g.globalServices[t.ServiceID]; !exists { return } - // if a task's DesiredState has past running, which + // if a task's DesiredState has passed running, it // means the task has been processed if t.DesiredState > api.TaskStateRunning { return @@ -264,7 +258,6 @@ func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, } func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []string) { - nodeCompleted := make(map[string]map[string]struct{}) nodeTasks := make(map[string]map[string][]*api.Task) g.store.View(func(tx store.ReadTx) { @@ -275,8 +268,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin continue } - // a node may have completed this service - nodeCompleted[serviceID] = make(map[string]struct{}) // nodeID -> task list nodeTasks[serviceID] = make(map[string][]*api.Task) @@ -284,11 +275,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin if t.DesiredState <= api.TaskStateRunning { // Collect all running instances of this service nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t) - } else { - // for finished tasks, check restartPolicy - if isTaskCompleted(t, orchestrator.RestartCondition(t)) { - nodeCompleted[serviceID][t.NodeID] = struct{}{} - } } } } @@ -311,9 +297,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin ntasks := nodeTasks[serviceID][nodeID] delete(nodeTasks[serviceID], nodeID) - // if restart policy considers this node has finished its task - // it should remove all running tasks - if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints { + if !meetsConstraints { g.shutdownTasks(ctx, batch, ntasks) continue } @@ -400,8 +384,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs return } - // whether each service has completed on the node - completed := make(map[string]bool) // tasks by service tasks := make(map[string][]*api.Task) @@ -425,10 +407,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs } if t.DesiredState <= api.TaskStateRunning { tasks[serviceID] = append(tasks[serviceID], t) - } else { - if isTaskCompleted(t, orchestrator.RestartCondition(t)) { - completed[serviceID] = true - } } } } @@ -444,13 +422,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs continue } - // if restart policy considers this node has finished its task - // it should remove all running tasks - if completed[serviceID] { - g.shutdownTasks(ctx, batch, tasks[serviceID]) - continue - } - if node.Spec.Availability == api.NodeAvailabilityPause { // the node is paused, so we won't add or update tasks continue diff --git a/manager/orchestrator/global/global_test.go b/manager/orchestrator/global/global_test.go index 886ed18157..8d1fe0aa15 100644 --- a/manager/orchestrator/global/global_test.go +++ b/manager/orchestrator/global/global_test.go @@ -89,9 +89,29 @@ var ( }, }, } + + serviceNoRestart = &api.Service{ + ID: "serviceid3", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "norestart", + }, + Task: api.TaskSpec{ + Runtime: &api.TaskSpec_Container{ + Container: &api.ContainerSpec{}, + }, + Restart: &api.RestartPolicy{ + Condition: api.RestartOnNone, + }, + }, + Mode: &api.ServiceSpec_Global{ + Global: &api.GlobalService{}, + }, + }, + } ) -func SetupCluster(t *testing.T, store *store.MemoryStore, watch chan events.Event) *api.Task { +func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orchestrator { ctx := context.Background() // Start the global orchestrator. global := NewGlobalOrchestrator(store) @@ -107,8 +127,7 @@ func SetupCluster(t *testing.T, store *store.MemoryStore, watch chan events.Even testutils.Expect(t, watch, api.EventCreateNode{}) testutils.Expect(t, watch, state.EventCommit{}) - // return task creation from orchestrator - return testutils.WatchTaskCreate(t, watch) + return global } func TestSetup(t *testing.T) { @@ -119,7 +138,10 @@ func TestSetup(t *testing.T) { watch, cancel := state.Watch(store.WatchQueue() /*state.EventCreateTask{}, state.EventUpdateTask{}*/) defer cancel() - observedTask1 := SetupCluster(t, store, watch) + orchestrator := setup(t, store, watch) + defer orchestrator.Stop() + + observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") @@ -134,7 +156,10 @@ func TestAddNode(t *testing.T) { watch, cancel := state.Watch(store.WatchQueue()) defer cancel() - SetupCluster(t, store, watch) + orchestrator := setup(t, store, watch) + defer orchestrator.Stop() + + testutils.WatchTaskCreate(t, watch) addNode(t, store, node2) observedTask2 := testutils.WatchTaskCreate(t, watch) @@ -151,7 +176,10 @@ func TestDeleteNode(t *testing.T) { watch, cancel := state.Watch(store.WatchQueue()) defer cancel() - SetupCluster(t, store, watch) + orchestrator := setup(t, store, watch) + defer orchestrator.Stop() + + testutils.WatchTaskCreate(t, watch) deleteNode(t, store, node1) // task should be set to dead @@ -168,7 +196,10 @@ func TestNodeAvailability(t *testing.T) { watch, cancel := state.Watch(store.WatchQueue()) defer cancel() - SetupCluster(t, store, watch) + orchestrator := setup(t, store, watch) + defer orchestrator.Stop() + + testutils.WatchTaskCreate(t, watch) // set node1 to drain updateNodeAvailability(t, store, node1, api.NodeAvailabilityDrain) @@ -195,7 +226,10 @@ func TestAddService(t *testing.T) { watch, cancel := state.Watch(store.WatchQueue()) defer cancel() - SetupCluster(t, store, watch) + orchestrator := setup(t, store, watch) + defer orchestrator.Stop() + + testutils.WatchTaskCreate(t, watch) addService(t, store, service2) observedTask := testutils.WatchTaskCreate(t, watch) @@ -212,7 +246,10 @@ func TestDeleteService(t *testing.T) { watch, cancel := state.Watch(store.WatchQueue()) defer cancel() - SetupCluster(t, store, watch) + orchestrator := setup(t, store, watch) + defer orchestrator.Stop() + + testutils.WatchTaskCreate(t, watch) deleteService(t, store, service1) // task should be deleted @@ -222,6 +259,8 @@ func TestDeleteService(t *testing.T) { } func TestRemoveTask(t *testing.T) { + t.Parallel() + store := store.NewMemoryStore(nil) assert.NotNil(t, store) defer store.Close() @@ -229,25 +268,111 @@ func TestRemoveTask(t *testing.T) { watch, cancel := state.Watch(store.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) defer cancel() - observedTask1 := SetupCluster(t, store, watch) + orchestrator := setup(t, store, watch) + defer orchestrator.Stop() + + observedTask1 := testutils.WatchTaskCreate(t, watch) + testutils.Expect(t, watch, state.EventCommit{}) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") assert.Equal(t, observedTask1.NodeID, "nodeid1") - // delete the task deleteTask(t, store, observedTask1) + testutils.Expect(t, watch, api.EventDeleteTask{}) + testutils.Expect(t, watch, state.EventCommit{}) + + // the task should not be recreated + select { + case event := <-watch: + t.Fatalf("got unexpected event %T: %+v", event, event) + case <-time.After(100 * time.Millisecond): + } +} + +func TestTaskFailure(t *testing.T) { + t.Parallel() + + store := store.NewMemoryStore(nil) + assert.NotNil(t, store) + defer store.Close() + + watch, cancel := state.Watch(store.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) + defer cancel() + + // first, try a "restart on any" policy + orchestrator := setup(t, store, watch) + defer orchestrator.Stop() + + observedTask1 := testutils.WatchTaskCreate(t, watch) + + assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + assert.Equal(t, observedTask1.NodeID, "nodeid1") + + failTask(t, store, observedTask1) + + testutils.WatchShutdownTask(t, watch) // the task should be recreated observedTask2 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask2.Status.State, api.TaskStateNew) assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") assert.Equal(t, observedTask2.NodeID, "nodeid1") + testutils.Expect(t, watch, state.EventCommit{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) // ready->running + testutils.Expect(t, watch, state.EventCommit{}) + + // repeat with service set up not to restart + addService(t, store, serviceNoRestart) + testutils.Expect(t, watch, api.EventCreateService{}) + testutils.Expect(t, watch, state.EventCommit{}) + + observedTask3 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask3.ServiceAnnotations.Name, "norestart") + assert.Equal(t, observedTask3.NodeID, "nodeid1") + testutils.Expect(t, watch, state.EventCommit{}) + + failTask(t, store, observedTask3) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, state.EventCommit{}) + observedTask4 := testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, observedTask4.DesiredState, api.TaskStateShutdown) + testutils.Expect(t, watch, state.EventCommit{}) + + // the task should not be recreated + select { + case event := <-watch: + t.Fatalf("got unexpected event %T: %+v", event, event) + case <-time.After(100 * time.Millisecond): + } + + // update the service. now the task should be recreated. + updateService(t, store, serviceNoRestart) + testutils.Expect(t, watch, api.EventUpdateService{}) + testutils.Expect(t, watch, state.EventCommit{}) + + observedTask5 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask5.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask5.ServiceAnnotations.Name, "norestart") + assert.Equal(t, observedTask5.NodeID, "nodeid1") + testutils.Expect(t, watch, state.EventCommit{}) } func addService(t *testing.T, s *store.MemoryStore, service *api.Service) { s.Update(func(tx store.Tx) error { - assert.NoError(t, store.CreateService(tx, service)) + assert.NoError(t, store.CreateService(tx, service.Copy())) + return nil + }) +} + +func updateService(t *testing.T, s *store.MemoryStore, service *api.Service) { + s.Update(func(tx store.Tx) error { + service := store.GetService(tx, service.ID) + require.NotNil(t, service) + service.Spec.Task.ForceUpdate++ + assert.NoError(t, store.UpdateService(tx, service)) return nil }) } @@ -261,7 +386,7 @@ func deleteService(t *testing.T, s *store.MemoryStore, service *api.Service) { func addNode(t *testing.T, s *store.MemoryStore, node *api.Node) { s.Update(func(tx store.Tx) error { - assert.NoError(t, store.CreateNode(tx, node)) + assert.NoError(t, store.CreateNode(tx, node.Copy())) return nil }) } @@ -295,6 +420,16 @@ func deleteTask(t *testing.T, s *store.MemoryStore, task *api.Task) { }) } +func failTask(t *testing.T, s *store.MemoryStore, task *api.Task) { + s.Update(func(tx store.Tx) error { + task := store.GetTask(tx, task.ID) + require.NotNil(t, task) + task.Status.State = api.TaskStateFailed + assert.NoError(t, store.UpdateTask(tx, task)) + return nil + }) +} + func TestInitializationRejectedTasks(t *testing.T) { ctx := context.Background() s := store.NewMemoryStore(nil) diff --git a/manager/orchestrator/testutils/testutils.go b/manager/orchestrator/testutils/testutils.go index f9bb91c0f3..09e9e54551 100644 --- a/manager/orchestrator/testutils/testutils.go +++ b/manager/orchestrator/testutils/testutils.go @@ -88,7 +88,7 @@ func Expect(t *testing.T, watch chan events.Event, specifiers ...api.Event) { } return case <-time.After(time.Second): - assert.FailNow(t, "no commit event") + assert.FailNow(t, "no matching event") } } }