diff --git a/manager/orchestrator/taskreaper/task_reaper.go b/manager/orchestrator/taskreaper/task_reaper.go index 9dbb0851e6..e3c2b8265d 100644 --- a/manager/orchestrator/taskreaper/task_reaper.go +++ b/manager/orchestrator/taskreaper/task_reaper.go @@ -40,6 +40,12 @@ type TaskReaper struct { cleanup []string stopChan chan struct{} doneChan chan struct{} + + // tickSignal is a channel that, if non-nil and available, will be written + // to to signal that a tick has occurred. its sole purpose is for testing + // code, to verify that take cleanup attempts are happening when they + // should be. + tickSignal chan struct{} } // New creates a new TaskReaper. @@ -115,7 +121,34 @@ func (tr *TaskReaper) Run(ctx context.Context) { // Clean up when we hit TaskHistoryRetentionLimit or when the timer expires, // whichever happens first. + // + // Specifically, the way this should work: + // - Create a timer and immediately stop it. We don't want to fire the + // cleanup routine yet, because we just did a cleanup as part of the + // initialization above. + // - Launch into an event loop + // - When we receive an event, handle the event as needed + // - After receiving the event: + // - If minimum batch size (maxDirty) is exceeded with dirty + cleanup, + // then immediately launch into the cleanup routine + // - Otherwise, if the timer is stopped, start it (reset). + // - If the timer expires and the timer channel is signaled, then Stop the + // timer (so that it will be ready to be started again as needed), and + // execute the cleanup routine (tick) timer := time.NewTimer(reaperBatchingInterval) + timer.Stop() + + // If stop is somehow called AFTER the timer has expired, there will be a + // value in the timer.C channel. If there is such a value, we should drain + // it out. This select statement allows us to drain that value if it's + // present, or continue straight through otherwise. + select { + case <-timer.C: + default: + } + + // keep track with a boolean of whether the timer is currently stopped + isTimerStopped := true // Watch for: // 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot. @@ -153,16 +186,35 @@ func (tr *TaskReaper) Run(ctx context.Context) { } if len(tr.dirty)+len(tr.cleanup) > maxDirty { + // stop the timer, so we don't fire it. if we get another event + // after we do this cleaning, we will reset the timer then timer.Stop() + // if the timer had fired, drain out the value. + select { + case <-timer.C: + default: + } + isTimerStopped = true tr.tick() } else { - timer.Reset(reaperBatchingInterval) + if isTimerStopped { + timer.Reset(reaperBatchingInterval) + isTimerStopped = false + } } case <-timer.C: - timer.Stop() + // we can safely ignore draining off of the timer channel, because + // we already know that the timer has expired. + isTimerStopped = true tr.tick() case <-tr.stopChan: + // even though this doesn't really matter in this context, it's + // good hygiene to drain the value. timer.Stop() + select { + case <-timer.C: + default: + } return } } @@ -170,6 +222,16 @@ func (tr *TaskReaper) Run(ctx context.Context) { // tick performs task history cleanup. func (tr *TaskReaper) tick() { + // this signals that a tick has occurred. it exists solely for testing. + if tr.tickSignal != nil { + // try writing to this channel, but if it's full, fall straight through + // and ignore it. + select { + case tr.tickSignal <- struct{}{}: + default: + } + } + if len(tr.dirty) == 0 && len(tr.cleanup) == 0 { return } diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index f1fe7c69ef..db59248747 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -2,6 +2,9 @@ package taskreaper import ( "context" + "fmt" + "time" + "github.com/docker/swarmkit/manager/orchestrator" "testing" @@ -751,6 +754,198 @@ func TestServiceRemoveDeadTasks(t *testing.T) { assert.Len(t, foundTasks, 0) } +// TestTaskReaperBatching tests that the batching logic for the task reaper +// runs correctly. +func TestTaskReaperBatching(t *testing.T) { + // create a canned context and store to use with this task reaper + ctx := context.Background() + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + var ( + task1, task2, task3 *api.Task + tasks []*api.Task + ) + + // set up all of the test fixtures + assert.NoError(t, s.Update(func(tx store.Tx) error { + // we need a cluster object, because we need to set the retention limit + // to a low value + assert.NoError(t, store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + TaskHistoryRetentionLimit: 1, + }, + }, + })) + + task1 = &api.Task{ + ID: "foo", + ServiceID: "bar", + Slot: 0, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + } + // we need to create all of the tasks used in this test, because we'll + // be using task update events to trigger reaper behavior. + assert.NoError(t, store.CreateTask(tx, task1)) + + task2 = &api.Task{ + ID: "foo2", + ServiceID: "bar", + Slot: 1, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + } + assert.NoError(t, store.CreateTask(tx, task2)) + + tasks = make([]*api.Task, maxDirty+1) + for i := 0; i < maxDirty+1; i++ { + tasks[i] = &api.Task{ + ID: fmt.Sprintf("baz%v", i), + ServiceID: "bar", + // every task in a different slot, so they don't get cleaned up + // based on exceeding the retention limit + Slot: uint64(i), + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + } + if err := store.CreateTask(tx, tasks[i]); err != nil { + return err + } + } + + task3 = &api.Task{ + ID: "foo3", + ServiceID: "bar", + Slot: 2, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + } + assert.NoError(t, store.CreateTask(tx, task3)) + return nil + })) + + // now create the task reaper + taskReaper := New(s) + taskReaper.tickSignal = make(chan struct{}, 1) + defer taskReaper.Stop() + go taskReaper.Run(ctx) + + // None of the tasks we've created are eligible for deletion. We should + // see no task delete events. Wait for a tick signal, or 500ms to pass, to + // verify that no tick will occur. + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper ticked when it should not have") + case <-time.After(reaperBatchingInterval * 2): + // ok, looks good, moving on + } + + // update task1 to die + assert.NoError(t, s.Update(func(tx store.Tx) error { + task1.DesiredState = api.TaskStateRemove + return store.UpdateTask(tx, task1) + })) + + // the task should be added to the cleanup map and a tick should occur + // shortly. give it an extra 50ms for overhead + select { + case <-taskReaper.tickSignal: + case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)): + t.Fatalf("the taskreaper should have ticked but did not") + } + + // now wait and make sure the task reaper does not tick again + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper should not have ticked but did") + case <-time.After(reaperBatchingInterval * 2): + } + + // now make sure we'll tick again if we update another task to die + assert.NoError(t, s.Update(func(tx store.Tx) error { + task2.DesiredState = api.TaskStateRemove + return store.UpdateTask(tx, task2) + })) + + select { + case <-taskReaper.tickSignal: + case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)): + t.Fatalf("the taskreaper should have ticked by now but did not") + } + + // again, now wait and make sure the task reaper does not tick again + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper should not have ticked but did") + case <-time.After(reaperBatchingInterval * 2): + } + + // now create a shitload of tasks. this should tick immediately after, no + // waiting. we should easily within the batching interval be able to + // process all of these events, and should expect 1 tick immediately after + // and no more + assert.NoError(t, s.Update(func(tx store.Tx) error { + for _, task := range tasks { + task.DesiredState = api.TaskStateRemove + assert.NoError(t, store.UpdateTask(tx, task)) + } + return nil + })) + + select { + case <-taskReaper.tickSignal: + case <-time.After(reaperBatchingInterval): + // tight bound on the how long it should take to tick. we should tick + // before the reaper batching interval. this should only POSSIBLY fail + // on a really slow system, where processing the 1000+ incoming events + // takes longer than the reaperBatchingInterval. if this test flakes + // here, that's probably why. + t.Fatalf("we should have immediately ticked already, but did not") + } + + // again again, wait and make sure the task reaper does not tick again + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper should not have ticked but did") + case <-time.After(reaperBatchingInterval * 2): + } + + // now before we wrap up, make sure the task reaper still works off the + // timer + assert.NoError(t, s.Update(func(tx store.Tx) error { + task3.DesiredState = api.TaskStateRemove + return store.UpdateTask(tx, task3) + })) + + select { + case <-taskReaper.tickSignal: + case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)): + t.Fatalf("the taskreaper should have ticked by now but did not") + } + + // again, now wait and make sure the task reaper does not tick again + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper should not have ticked but did") + case <-time.After(reaperBatchingInterval * 2): + } +} + // TestServiceRemoveDeadTasks tests removal of // tasks in state < TaskStateAssigned. func TestServiceRemoveUnassignedTasks(t *testing.T) {