Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions manager/orchestrator/taskreaper/task_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,23 @@ func (tr *TaskReaper) tick() {
}

// Check history of dirty tasks for cleanup.
// Note: Clean out the dirty set at the end of this tick iteration
// in all but one scenarios (documented below).
// When tick() finishes, the tasks in the slot were either cleaned up,
// or it was skipped because it didn't meet the criteria for cleaning.
// Either way, we can discard the dirty set because future events on
// that slot will cause the task to be readded to the dirty set
// at that point.
//
// The only case when we keep the slot dirty is when there are more
// than one running tasks present for a given slot.
// In that case, we need to keep the slot dirty to allow it to be
// cleaned when tick() is called next and one or more the tasks
// in that slot have stopped running.
tr.store.View(func(tx store.ReadTx) {
for dirty := range tr.dirty {
service := store.GetService(tx, dirty.ServiceID)
if service == nil {
// If the service can't be found, assume that it was deleted
// and remove the slot from the dirty list.
delete(tr.dirty, dirty)
continue
}
Expand All @@ -214,6 +225,7 @@ func (tr *TaskReaper) tick() {

// Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
if taskHistory < 0 {
delete(tr.dirty, dirty)
continue
}

Expand Down Expand Up @@ -243,6 +255,7 @@ func (tr *TaskReaper) tick() {
}

if int64(len(historicTasks)) <= taskHistory {
delete(tr.dirty, dirty)
continue
}

Expand Down Expand Up @@ -273,6 +286,12 @@ func (tr *TaskReaper) tick() {
}
}

// The only case when we keep the slot dirty at the end of tick()
// is when there are more than one running tasks present
// for a given slot.
// In that case, we keep the slot dirty to allow it to be
// cleaned when tick() is called next and one or more of
// the tasks in that slot have stopped running.
if runningTasks <= 1 {
delete(tr.dirty, dirty)
}
Expand Down
207 changes: 206 additions & 1 deletion manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package taskreaper

import (
"context"
"github.com/docker/swarmkit/manager/orchestrator"

"testing"

Expand Down Expand Up @@ -805,7 +806,7 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
}

// Create a service with one replica specified before the orchestrator is
// started. This should result in two tasks when the orchestrator
// started. This should result in one tasks when the orchestrator
// starts up.
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
Expand Down Expand Up @@ -871,3 +872,207 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, foundTasks, 1)
}

func setupTaskReaperDirty(tr *TaskReaper) {
tr.dirty[orchestrator.SlotTuple{
Slot: 1,
ServiceID: "id1",
NodeID: "node1",
}] = struct{}{}
tr.dirty[orchestrator.SlotTuple{
Slot: 1,
ServiceID: "id2",
NodeID: "node1",
}] = struct{}{}
}

// TestTick unit-tests the task reaper tick function.
// 1. Test that the dirty set is cleaned up when the service can't be found.
// 2. Test that the dirty set is cleaned up when the number of total tasks
// is smaller than the retention limit.
// 3. Test that the dirty set and excess tasks in the store are cleaned up
// when there the number of total tasks is greater than the retention limit.
func TestTick(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

assert.NoError(t, s.Update(func(tx store.Tx) error {
store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
// set TaskHistoryRetentionLimit to a negative value, so
// that tasks are cleaned up right away.
TaskHistoryRetentionLimit: 1,
},
},
})
return nil
}))

// create the task reaper.
taskReaper := New(s)

// Test # 1
// Setup the dirty set with entries to
// verify that the dirty set it cleaned up
// when the service is not found.
setupTaskReaperDirty(taskReaper)
// call tick directly and verify dirty set was cleaned up.
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))

// Test # 2
// Verify that the dirty set it cleaned up
// when the history limit is set to zero.

// Create a service in the store for the following test cases.
service1 := &api.Service{
ID: "id1",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name1",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 1,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

// Create another service in the store for the following test cases.
service2 := &api.Service{
ID: "id2",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name2",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 1,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

// Create a service.
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateService(tx, service2))
return nil
})
assert.NoError(t, err)

// Setup the dirty set with entries to
// verify that the dirty set it cleaned up
// when the history limit is set to zero.
setupTaskReaperDirty(taskReaper)
taskReaper.taskHistory = 0
// call tick directly and verify dirty set was cleaned up.
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))

// Test # 3
// Test that the tasks are cleanup when the total number of tasks
// is greater than the retention limit.

// Create tasks for both services in the store.
task1 := &api.Task{
ID: "id1task1",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
ServiceID: "id1",
ServiceAnnotations: api.Annotations{
Name: "name1",
},
}

task2 := &api.Task{
ID: "id2task1",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
ServiceID: "id2",
ServiceAnnotations: api.Annotations{
Name: "name2",
},
}

// Create Tasks.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)

// Set history to 1 to ensure that the tasks are not cleaned up yet.
// At the same time, we should be able to test that the dirty set was
// cleaned up at the end of tick().
taskReaper.taskHistory = 1
setupTaskReaperDirty(taskReaper)
// call tick directly and verify dirty set was cleaned up.
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))

// Now test that tick() function cleans up the old tasks from the store.

// Create new tasks in the store for the same slots to simulate service update.
task1.Status.State = api.TaskStateNew
task1.DesiredState = api.TaskStateRunning
task1.ID = "id1task2"
task2.Status.State = api.TaskStateNew
task2.DesiredState = api.TaskStateRunning
task2.ID = "id2task2"
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)

watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/)
defer cancel()

// Setup the task reaper dirty set.
setupTaskReaperDirty(taskReaper)
// Call tick directly and verify dirty set was cleaned up.
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))
// Task reaper should delete the task previously marked for SHUTDOWN.
deletedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State)
assert.Equal(t, api.TaskStateShutdown, deletedTask1.DesiredState)
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" ||
deletedTask1.ServiceAnnotations.Name == "name2")

deletedTask2 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStateShutdown, deletedTask2.Status.State)
assert.Equal(t, api.TaskStateShutdown, deletedTask2.DesiredState)
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" ||
deletedTask1.ServiceAnnotations.Name == "name2")
}