From 1a43a3b612d8c775db8a44c8399844e1f7e4aed2 Mon Sep 17 00:00:00 2001 From: Anshul Pundir Date: Fri, 22 Jun 2018 14:49:33 -0700 Subject: [PATCH] [manager/orchestrator/reaper] Clean out the task reaper dirty set at the end of tick() Signed-off-by: Anshul Pundir --- .../orchestrator/taskreaper/task_reaper.go | 23 +- .../taskreaper/task_reaper_test.go | 207 +++++++++++++++++- 2 files changed, 227 insertions(+), 3 deletions(-) diff --git a/manager/orchestrator/taskreaper/task_reaper.go b/manager/orchestrator/taskreaper/task_reaper.go index 3bdd8b4220..9dbb0851e6 100644 --- a/manager/orchestrator/taskreaper/task_reaper.go +++ b/manager/orchestrator/taskreaper/task_reaper.go @@ -184,12 +184,23 @@ func (tr *TaskReaper) tick() { } // Check history of dirty tasks for cleanup. + // Note: Clean out the dirty set at the end of this tick iteration + // in all but one scenarios (documented below). + // When tick() finishes, the tasks in the slot were either cleaned up, + // or it was skipped because it didn't meet the criteria for cleaning. + // Either way, we can discard the dirty set because future events on + // that slot will cause the task to be readded to the dirty set + // at that point. + // + // The only case when we keep the slot dirty is when there are more + // than one running tasks present for a given slot. + // In that case, we need to keep the slot dirty to allow it to be + // cleaned when tick() is called next and one or more the tasks + // in that slot have stopped running. tr.store.View(func(tx store.ReadTx) { for dirty := range tr.dirty { service := store.GetService(tx, dirty.ServiceID) if service == nil { - // If the service can't be found, assume that it was deleted - // and remove the slot from the dirty list. delete(tr.dirty, dirty) continue } @@ -214,6 +225,7 @@ func (tr *TaskReaper) tick() { // Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history. if taskHistory < 0 { + delete(tr.dirty, dirty) continue } @@ -243,6 +255,7 @@ func (tr *TaskReaper) tick() { } if int64(len(historicTasks)) <= taskHistory { + delete(tr.dirty, dirty) continue } @@ -273,6 +286,12 @@ func (tr *TaskReaper) tick() { } } + // The only case when we keep the slot dirty at the end of tick() + // is when there are more than one running tasks present + // for a given slot. + // In that case, we keep the slot dirty to allow it to be + // cleaned when tick() is called next and one or more of + // the tasks in that slot have stopped running. if runningTasks <= 1 { delete(tr.dirty, dirty) } diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index d4bad21d1d..f1fe7c69ef 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -2,6 +2,7 @@ package taskreaper import ( "context" + "github.com/docker/swarmkit/manager/orchestrator" "testing" @@ -805,7 +806,7 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { } // Create a service with one replica specified before the orchestrator is - // started. This should result in two tasks when the orchestrator + // started. This should result in one tasks when the orchestrator // starts up. err := s.Update(func(tx store.Tx) error { assert.NoError(t, store.CreateService(tx, service1)) @@ -871,3 +872,207 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { assert.NoError(t, err) assert.Len(t, foundTasks, 1) } + +func setupTaskReaperDirty(tr *TaskReaper) { + tr.dirty[orchestrator.SlotTuple{ + Slot: 1, + ServiceID: "id1", + NodeID: "node1", + }] = struct{}{} + tr.dirty[orchestrator.SlotTuple{ + Slot: 1, + ServiceID: "id2", + NodeID: "node1", + }] = struct{}{} +} + +// TestTick unit-tests the task reaper tick function. +// 1. Test that the dirty set is cleaned up when the service can't be found. +// 2. Test that the dirty set is cleaned up when the number of total tasks +// is smaller than the retention limit. +// 3. Test that the dirty set and excess tasks in the store are cleaned up +// when there the number of total tasks is greater than the retention limit. +func TestTick(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + assert.NoError(t, s.Update(func(tx store.Tx) error { + store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + // set TaskHistoryRetentionLimit to a negative value, so + // that tasks are cleaned up right away. + TaskHistoryRetentionLimit: 1, + }, + }, + }) + return nil + })) + + // create the task reaper. + taskReaper := New(s) + + // Test # 1 + // Setup the dirty set with entries to + // verify that the dirty set it cleaned up + // when the service is not found. + setupTaskReaperDirty(taskReaper) + // call tick directly and verify dirty set was cleaned up. + taskReaper.tick() + assert.Zero(t, len(taskReaper.dirty)) + + // Test # 2 + // Verify that the dirty set it cleaned up + // when the history limit is set to zero. + + // Create a service in the store for the following test cases. + service1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 1, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + // Turn off restart to get an accurate count on tasks. + Condition: api.RestartOnNone, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create another service in the store for the following test cases. + service2 := &api.Service{ + ID: "id2", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name2", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 1, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + // Turn off restart to get an accurate count on tasks. + Condition: api.RestartOnNone, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create a service. + err := s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, service1)) + assert.NoError(t, store.CreateService(tx, service2)) + return nil + }) + assert.NoError(t, err) + + // Setup the dirty set with entries to + // verify that the dirty set it cleaned up + // when the history limit is set to zero. + setupTaskReaperDirty(taskReaper) + taskReaper.taskHistory = 0 + // call tick directly and verify dirty set was cleaned up. + taskReaper.tick() + assert.Zero(t, len(taskReaper.dirty)) + + // Test # 3 + // Test that the tasks are cleanup when the total number of tasks + // is greater than the retention limit. + + // Create tasks for both services in the store. + task1 := &api.Task{ + ID: "id1task1", + Slot: 1, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + ServiceID: "id1", + ServiceAnnotations: api.Annotations{ + Name: "name1", + }, + } + + task2 := &api.Task{ + ID: "id2task1", + Slot: 1, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + ServiceID: "id2", + ServiceAnnotations: api.Annotations{ + Name: "name2", + }, + } + + // Create Tasks. + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateTask(tx, task1)) + assert.NoError(t, store.CreateTask(tx, task2)) + return nil + }) + assert.NoError(t, err) + + // Set history to 1 to ensure that the tasks are not cleaned up yet. + // At the same time, we should be able to test that the dirty set was + // cleaned up at the end of tick(). + taskReaper.taskHistory = 1 + setupTaskReaperDirty(taskReaper) + // call tick directly and verify dirty set was cleaned up. + taskReaper.tick() + assert.Zero(t, len(taskReaper.dirty)) + + // Now test that tick() function cleans up the old tasks from the store. + + // Create new tasks in the store for the same slots to simulate service update. + task1.Status.State = api.TaskStateNew + task1.DesiredState = api.TaskStateRunning + task1.ID = "id1task2" + task2.Status.State = api.TaskStateNew + task2.DesiredState = api.TaskStateRunning + task2.ID = "id2task2" + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateTask(tx, task1)) + assert.NoError(t, store.CreateTask(tx, task2)) + return nil + }) + assert.NoError(t, err) + + watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) + defer cancel() + + // Setup the task reaper dirty set. + setupTaskReaperDirty(taskReaper) + // Call tick directly and verify dirty set was cleaned up. + taskReaper.tick() + assert.Zero(t, len(taskReaper.dirty)) + // Task reaper should delete the task previously marked for SHUTDOWN. + deletedTask1 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State) + assert.Equal(t, api.TaskStateShutdown, deletedTask1.DesiredState) + assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" || + deletedTask1.ServiceAnnotations.Name == "name2") + + deletedTask2 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStateShutdown, deletedTask2.Status.State) + assert.Equal(t, api.TaskStateShutdown, deletedTask2.DesiredState) + assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" || + deletedTask1.ServiceAnnotations.Name == "name2") +}