diff --git a/manager/orchestrator/replicated/task_reaper_test.go b/manager/orchestrator/replicated/task_reaper_test.go index f92831a582..6df4655e9e 100644 --- a/manager/orchestrator/replicated/task_reaper_test.go +++ b/manager/orchestrator/replicated/task_reaper_test.go @@ -493,11 +493,9 @@ func TestServiceRemoveDeadTasks(t *testing.T) { // Set both task states to RUNNING. updatedTask1 := observedTask1.Copy() - updatedTask1.DesiredState = api.TaskStateRunning updatedTask1.Status.State = api.TaskStateRunning updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} updatedTask2 := observedTask2.Copy() - updatedTask2.DesiredState = api.TaskStateRunning updatedTask2.Status.State = api.TaskStateRunning updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} err = s.Update(func(tx store.Tx) error { @@ -569,3 +567,125 @@ func TestServiceRemoveDeadTasks(t *testing.T) { assert.NoError(t, err) assert.Len(t, foundTasks, 0) } + +// TestServiceRemoveDeadTasks tests removal of +// tasks in state < TaskStateAssigned. +func TestServiceRemoveUnassignedTasks(t *testing.T) { + ctx := context.Background() + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + assert.NoError(t, s.Update(func(tx store.Tx) error { + store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + // set TaskHistoryRetentionLimit to a negative value, so + // that tasks are cleaned up right away. + TaskHistoryRetentionLimit: 1, + }, + }, + }) + return nil + })) + + taskReaper := taskreaper.New(s) + defer taskReaper.Stop() + orchestrator := NewReplicatedOrchestrator(s) + defer orchestrator.Stop() + + watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) + defer cancel() + + service1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 1, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + // Turn off restart to get an accurate count on tasks. + Condition: api.RestartOnNone, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create a service with one replica specified before the orchestrator is + // started. This should result in two tasks when the orchestrator + // starts up. + err := s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, service1)) + return nil + }) + assert.NoError(t, err) + + // Start the orchestrator. + go func() { + assert.NoError(t, orchestrator.Run(ctx)) + }() + go taskReaper.Run(ctx) + + observedTask1 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + + // Set the task state to PENDING to simulate allocation. + updatedTask1 := observedTask1.Copy() + updatedTask1.Status.State = api.TaskStatePending + updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask1)) + return nil + }) + require.NoError(t, err) + + testutils.Expect(t, watch, state.EventCommit{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, state.EventCommit{}) + + service1.Spec.Task.ForceUpdate++ + // This should shutdown the previous task and create a new one. + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateService(tx, service1)) + return nil + }) + testutils.Expect(t, watch, api.EventUpdateService{}) + testutils.Expect(t, watch, state.EventCommit{}) + + // New task should be created and old task marked for SHUTDOWN. + observedTask1 = testutils.WatchTaskCreate(t, watch) + assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + + observedTask3 := testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, api.TaskStateShutdown, observedTask3.DesiredState) + assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name) + + testutils.Expect(t, watch, state.EventCommit{}) + + // Task reaper should delete the task previously marked for SHUTDOWN. + deletedTask1 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStatePending, deletedTask1.Status.State) + assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) + + testutils.Expect(t, watch, state.EventCommit{}) + + var foundTasks []*api.Task + s.View(func(tx store.ReadTx) { + foundTasks, err = store.FindTasks(tx, store.All) + }) + assert.NoError(t, err) + assert.Len(t, foundTasks, 1) +} diff --git a/manager/orchestrator/taskreaper/task_reaper.go b/manager/orchestrator/taskreaper/task_reaper.go index 88e353b383..0c4e818475 100644 --- a/manager/orchestrator/taskreaper/task_reaper.go +++ b/manager/orchestrator/taskreaper/task_reaper.go @@ -251,7 +251,12 @@ func (tr *TaskReaper) tick() { runningTasks := 0 for _, t := range historicTasks { - if t.DesiredState <= api.TaskStateRunning || t.Status.State <= api.TaskStateRunning { + // Skip tasks which are desired to be running but the current state + // is less than or equal to running. + // This check is important to ignore tasks which are running or need to be running, + // but to delete tasks which are either past running, + // or have not reached running but need to be shutdown (because of a service update, for example). + if t.DesiredState == api.TaskStateRunning && t.Status.State <= api.TaskStateRunning { // Don't delete running tasks runningTasks++ continue diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 6d5b4e551b..9ee0b9e5c9 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -74,6 +74,12 @@ func (s *Scheduler) setupTasksList(tx store.ReadTx) error { continue } + // Also ignore tasks that have not yet been assigned but desired state is beyond TaskStateRunning + // This can happen if you update, delete or scale down a service before its tasks were assigned. + if t.Status.State == api.TaskStatePending && t.DesiredState > api.TaskStateRunning { + continue + } + s.allTasks[t.ID] = t if t.NodeID == "" { s.enqueue(t) diff --git a/manager/scheduler/scheduler_test.go b/manager/scheduler/scheduler_test.go index 5705e1dcdb..2126e70397 100644 --- a/manager/scheduler/scheduler_test.go +++ b/manager/scheduler/scheduler_test.go @@ -2277,6 +2277,94 @@ func TestPreassignedTasks(t *testing.T) { assert.Equal(t, assignment3.NodeID, "node2") } +func TestIgnoreTasks(t *testing.T) { + ctx := context.Background() + initialNodeSet := []*api.Node{ + { + ID: "node1", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + }, + } + + // Tasks with desired state running, shutdown, remove. + initialTaskSet := []*api.Task{ + { + ID: "task1", + DesiredState: api.TaskStateRunning, + ServiceAnnotations: api.Annotations{ + Name: "name1", + }, + + Status: api.TaskStatus{ + State: api.TaskStatePending, + }, + }, + { + ID: "task2", + DesiredState: api.TaskStateShutdown, + ServiceAnnotations: api.Annotations{ + Name: "name2", + }, + Status: api.TaskStatus{ + State: api.TaskStatePending, + }, + NodeID: initialNodeSet[0].ID, + }, + { + ID: "task3", + DesiredState: api.TaskStateRemove, + ServiceAnnotations: api.Annotations{ + Name: "name2", + }, + Status: api.TaskStatus{ + State: api.TaskStatePending, + }, + NodeID: initialNodeSet[0].ID, + }, + } + + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + err := s.Update(func(tx store.Tx) error { + // Prepopulate nodes + for _, n := range initialNodeSet { + assert.NoError(t, store.CreateNode(tx, n)) + } + + // Prepopulate tasks + for _, task := range initialTaskSet { + assert.NoError(t, store.CreateTask(tx, task)) + } + return nil + }) + assert.NoError(t, err) + + scheduler := New(s) + + watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) + defer cancel() + + go func() { + assert.NoError(t, scheduler.Run(ctx)) + }() + + // task1 is the only task that gets assigned since other two tasks + // are ignored by the scheduler. + // Normally task2/task3 should get assigned first since its a preassigned task. + assignment3 := watchAssignment(t, watch) + assert.Equal(t, assignment3.ID, "task1") + assert.Equal(t, assignment3.NodeID, "node1") +} + func watchAssignmentFailure(t *testing.T, watch chan events.Event) *api.Task { for { select {