Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions manager/orchestrator/taskreaper/task_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type TaskReaper struct {
cleanup []string
stopChan chan struct{}
doneChan chan struct{}

// tickSignal is a channel that, if non-nil and available, will be written
// to to signal that a tick has occurred. its sole purpose is for testing
// code, to verify that take cleanup attempts are happening when they
// should be.
tickSignal chan struct{}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead use the side-effects caused by tick() to test ?

Copy link
Copy Markdown
Collaborator Author

@dperny dperny Jun 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not cleanly, as far as i could tell, unless you have a better idea?

additionally, it would make the test fragile to changes in the implementation of tick. right now, the test is only relevant to the implementation of the code under test.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. Can we just check to see if the tasks were actually cleaned up to verify that tick() was called ?

Alternatively, we could just add a counter to count the number of times tick() was executed. The benefit of this is that we could additionally use that counter to expose stats.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bringing in a conversation @anshulpundir and i had in meatspace:

i've chosen to add this code for the purpose of determining affirmatively or negatively that tick has or has not been called. Relying on side effects can tell that tick has been called, but cannot prove definitively that it has not been called.

}

// New creates a new TaskReaper.
Expand Down Expand Up @@ -115,7 +121,34 @@ func (tr *TaskReaper) Run(ctx context.Context) {

// Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
// whichever happens first.
//
// Specifically, the way this should work:
// - Create a timer and immediately stop it. We don't want to fire the
// cleanup routine yet, because we just did a cleanup as part of the
// initialization above.
// - Launch into an event loop
// - When we receive an event, handle the event as needed
// - After receiving the event:
// - If minimum batch size (maxDirty) is exceeded with dirty + cleanup,
// then immediately launch into the cleanup routine
// - Otherwise, if the timer is stopped, start it (reset).
// - If the timer expires and the timer channel is signaled, then Stop the
// timer (so that it will be ready to be started again as needed), and
// execute the cleanup routine (tick)
timer := time.NewTimer(reaperBatchingInterval)
timer.Stop()

// If stop is somehow called AFTER the timer has expired, there will be a
// value in the timer.C channel. If there is such a value, we should drain
// it out. This select statement allows us to drain that value if it's
// present, or continue straight through otherwise.
select {
case <-timer.C:
default:
}

// keep track with a boolean of whether the timer is currently stopped
isTimerStopped := true

// Watch for:
// 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
Expand Down Expand Up @@ -153,23 +186,52 @@ func (tr *TaskReaper) Run(ctx context.Context) {
}

if len(tr.dirty)+len(tr.cleanup) > maxDirty {
// stop the timer, so we don't fire it. if we get another event
// after we do this cleaning, we will reset the timer then
timer.Stop()
// if the timer had fired, drain out the value.
select {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in a few places. Can we move this to a function called drainTimer() ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's worthwhile to move this to a function. it's essentially just an if statement for a channel read. sort of like

if HasValue(channel) {
    <-channel
}

but you can't do such an if statement, so you do this select with an empty default.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively you can do

if !timer.Stop() {
    <-timer.C
}

which is how the docs drain a channel, but for some reason doing it that way was deadlocking, so i did it the easy way instead of trying to figure out what was weird.

Copy link
Copy Markdown
Contributor

@anshulpundir anshulpundir Jun 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. The second option is more readable than the first one.

My point was that its 4 lines of code thats repeated at least 3x, which can be replaced by a single function call. But I'll let you decide.

case <-timer.C:
default:
}
isTimerStopped = true
tr.tick()
} else {
timer.Reset(reaperBatchingInterval)
if isTimerStopped {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it matter here if you always call Reset() without checking if the timer was stopped ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you've got the added and removed lines confused.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No :). I'm asking why not always call Reset() without checking isTimerStopped. At the same time, I'm also questioning if we need the bool isTimerStopped and whether we can do without it. @dperny

Copy link
Copy Markdown
Contributor

@cyli cyli Jun 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking - this code's logic seems fine, but looking at some of our vendored code like GRPC (see clientconn.go) it seems like the pattern is to create a timer at the beginning of the loop. If the timer fires, set the timer to nil. If the timer is not nil at the end of the loop, stop it and set it to nil, and there's no need to drain timer.C. This does increase GC load, though, but it is easier to read and would be slightly less repeated code to drain the timer.

I am fine with this as is, though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anshulpundir If I'm understanding the intent correctly, we want to clean up whenever maxDirty is reached or the timer ticks, whichever is sooner. I think if we always called Reset without checking isTimerStopped, the the timer may never tick and we will just wait for maxDirty all the time (since calling Reset would mean that the the timer would be reset to start waiting another reaperBatchingInterval seconds every time an event comes in without hitting maxDirty.

timer.Reset(reaperBatchingInterval)
isTimerStopped = false
}
}
case <-timer.C:
timer.Stop()
// we can safely ignore draining off of the timer channel, because
// we already know that the timer has expired.
isTimerStopped = true
tr.tick()
case <-tr.stopChan:
// even though this doesn't really matter in this context, it's
// good hygiene to drain the value.
timer.Stop()
select {
case <-timer.C:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to a function.

default:
}
return
}
}
}

// tick performs task history cleanup.
func (tr *TaskReaper) tick() {
// this signals that a tick has occurred. it exists solely for testing.
if tr.tickSignal != nil {
// try writing to this channel, but if it's full, fall straight through
// and ignore it.
select {
case tr.tickSignal <- struct{}{}:
default:
}
}

if len(tr.dirty) == 0 && len(tr.cleanup) == 0 {
return
}
Expand Down
195 changes: 195 additions & 0 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package taskreaper

import (
"context"
"fmt"
"time"

"github.com/docker/swarmkit/manager/orchestrator"

"testing"
Expand Down Expand Up @@ -751,6 +754,198 @@ func TestServiceRemoveDeadTasks(t *testing.T) {
assert.Len(t, foundTasks, 0)
}

// TestTaskReaperBatching tests that the batching logic for the task reaper
// runs correctly.
func TestTaskReaperBatching(t *testing.T) {
// create a canned context and store to use with this task reaper
ctx := context.Background()
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

var (
task1, task2, task3 *api.Task
tasks []*api.Task
)

// set up all of the test fixtures
assert.NoError(t, s.Update(func(tx store.Tx) error {
// we need a cluster object, because we need to set the retention limit
// to a low value
assert.NoError(t, store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
TaskHistoryRetentionLimit: 1,
},
},
}))

task1 = &api.Task{
ID: "foo",
ServiceID: "bar",
Slot: 0,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
// we need to create all of the tasks used in this test, because we'll
// be using task update events to trigger reaper behavior.
assert.NoError(t, store.CreateTask(tx, task1))

task2 = &api.Task{
ID: "foo2",
ServiceID: "bar",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
assert.NoError(t, store.CreateTask(tx, task2))

tasks = make([]*api.Task, maxDirty+1)
for i := 0; i < maxDirty+1; i++ {
tasks[i] = &api.Task{
ID: fmt.Sprintf("baz%v", i),
ServiceID: "bar",
// every task in a different slot, so they don't get cleaned up
// based on exceeding the retention limit
Slot: uint64(i),
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
if err := store.CreateTask(tx, tasks[i]); err != nil {
return err
}
}

task3 = &api.Task{
ID: "foo3",
ServiceID: "bar",
Slot: 2,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
assert.NoError(t, store.CreateTask(tx, task3))
return nil
}))

// now create the task reaper
taskReaper := New(s)
taskReaper.tickSignal = make(chan struct{}, 1)
defer taskReaper.Stop()
go taskReaper.Run(ctx)

// None of the tasks we've created are eligible for deletion. We should
// see no task delete events. Wait for a tick signal, or 500ms to pass, to
// verify that no tick will occur.
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper ticked when it should not have")
case <-time.After(reaperBatchingInterval * 2):
// ok, looks good, moving on
}

// update task1 to die
assert.NoError(t, s.Update(func(tx store.Tx) error {
task1.DesiredState = api.TaskStateRemove
return store.UpdateTask(tx, task1)
}))

// the task should be added to the cleanup map and a tick should occur
// shortly. give it an extra 50ms for overhead
select {
case <-taskReaper.tickSignal:
case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)):
t.Fatalf("the taskreaper should have ticked but did not")
}

// now wait and make sure the task reaper does not tick again
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper should not have ticked but did")
case <-time.After(reaperBatchingInterval * 2):
}

// now make sure we'll tick again if we update another task to die
assert.NoError(t, s.Update(func(tx store.Tx) error {
task2.DesiredState = api.TaskStateRemove
return store.UpdateTask(tx, task2)
}))

select {
case <-taskReaper.tickSignal:
case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)):
t.Fatalf("the taskreaper should have ticked by now but did not")
}

// again, now wait and make sure the task reaper does not tick again
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper should not have ticked but did")
case <-time.After(reaperBatchingInterval * 2):
}

// now create a shitload of tasks. this should tick immediately after, no
// waiting. we should easily within the batching interval be able to
// process all of these events, and should expect 1 tick immediately after
// and no more
assert.NoError(t, s.Update(func(tx store.Tx) error {
for _, task := range tasks {
task.DesiredState = api.TaskStateRemove
assert.NoError(t, store.UpdateTask(tx, task))
}
return nil
}))

select {
case <-taskReaper.tickSignal:
case <-time.After(reaperBatchingInterval):
// tight bound on the how long it should take to tick. we should tick
// before the reaper batching interval. this should only POSSIBLY fail
// on a really slow system, where processing the 1000+ incoming events
// takes longer than the reaperBatchingInterval. if this test flakes
// here, that's probably why.
t.Fatalf("we should have immediately ticked already, but did not")
}

// again again, wait and make sure the task reaper does not tick again
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper should not have ticked but did")
case <-time.After(reaperBatchingInterval * 2):
}

// now before we wrap up, make sure the task reaper still works off the
// timer
assert.NoError(t, s.Update(func(tx store.Tx) error {
task3.DesiredState = api.TaskStateRemove
return store.UpdateTask(tx, task3)
}))

select {
case <-taskReaper.tickSignal:
case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)):
t.Fatalf("the taskreaper should have ticked by now but did not")
}

// again, now wait and make sure the task reaper does not tick again
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper should not have ticked but did")
case <-time.After(reaperBatchingInterval * 2):
}
}

// TestServiceRemoveDeadTasks tests removal of
// tasks in state < TaskStateAssigned.
func TestServiceRemoveUnassignedTasks(t *testing.T) {
Expand Down