Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions manager/orchestrator/taskreaper/task_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func (tr *TaskReaper) Run(ctx context.Context) {
// Serviceless tasks can be cleaned up right away since they are not attached to a service.
tr.cleanup = append(tr.cleanup, t.ID)
}
// tasks with desired state REMOVE that have progressed beyond SHUTDOWN can be cleaned up
// tasks with desired state REMOVE that have progressed beyond COMPLETE can be cleaned up
// right away
for _, t := range removeTasks {
if t.Status.State >= api.TaskStateShutdown {
if t.Status.State >= api.TaskStateCompleted {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why is this not > api.TaskStateRunning?

That's pretty much the norm throughout the codebase to see whether a Task is running or not:

$ ack '> api.TaskStateRunning'
cmd/swarmctl/task/print.go:48:		if !all && t.DesiredState > api.TaskStateRunning {
cmd/swarmctl/task/inspect.go:40:	if t.Status.State > api.TaskStateRunning {
manager/dispatcher/dispatcher_test.go:589:		return s > api.TaskStateRunning
manager/dispatcher/assignments.go:229:			if t.Status.State > api.TaskStateRunning {
manager/scheduler/nodeinfo.go:111:		if t.DesiredState <= api.TaskStateRunning && oldTask.DesiredState > api.TaskStateRunning {
manager/scheduler/nodeinfo.go:116:		} else if t.DesiredState > api.TaskStateRunning && oldTask.DesiredState <= api.TaskStateRunning {
manager/scheduler/scheduler.go:73:		if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
manager/scheduler/scheduler.go:208:	if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
manager/scheduler/scheduler.go:245:	if t.Status.State > api.TaskStateRunning {
manager/allocator/network.go:627:		if t.Status.State > api.TaskStateRunning {
manager/allocator/network.go:794:	if t.Status.State > api.TaskStateRunning || isDelete {
manager/orchestrator/replicated/tasks.go:54:						if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/replicated/tasks.go:95:			if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/replicated/tasks.go:121:	if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/replicated/tasks.go:142:	if t.Status.State > api.TaskStateRunning ||
manager/orchestrator/replicated/tasks.go:153:	if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/replicated/tasks.go:175:	if t.Status.State > api.TaskStateRunning ||
manager/orchestrator/replicated/update_test.go:78:				} else if task.DesiredState > api.TaskStateRunning {
manager/orchestrator/constraintenforcer/constraint_enforcer.go:104:		if t.DesiredState < api.TaskStateAssigned || t.DesiredState > api.TaskStateRunning {
manager/orchestrator/constraintenforcer/constraint_enforcer.go:150:					if t == nil || t.DesiredState > api.TaskStateRunning {
manager/orchestrator/taskinit/init.go:62:			if t.DesiredState != api.TaskStateReady || t.Status.State > api.TaskStateRunning {
manager/orchestrator/update/updater.go:496:		if original.DesiredState > api.TaskStateRunning {
manager/orchestrator/update/updater.go:504:			if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/update/updater_test.go:386:			} else if task.DesiredState > api.TaskStateRunning {
manager/orchestrator/restart/restart.go:86:		if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/restart/restart.go:124:	if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/restart/restart.go:173:	if (n != nil && n.Status.State == api.NodeStatus_DOWN) || t.Status.State > api.TaskStateRunning {
manager/orchestrator/global/global.go:181:	if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/global/global.go:196:	if t.Status.State > api.TaskStateRunning {
manager/orchestrator/global/global.go:208:	if t.DesiredState > api.TaskStateRunning {
manager/orchestrator/global/global.go:213:	if t.Status.State > api.TaskStateRunning {
manager/orchestrator/global/global.go:496:				if t == nil || t.DesiredState > api.TaskStateRunning {

tr.cleanup = append(tr.cleanup, t.ID)
}
}
Expand Down Expand Up @@ -138,10 +138,10 @@ func (tr *TaskReaper) Run(ctx context.Context) {
if t.Status.State >= api.TaskStateOrphaned && t.ServiceID == "" {
tr.cleanup = append(tr.cleanup, t.ID)
}
// add tasks that have progressed beyond SHUTDOWN and have desired state REMOVE. These
// add tasks that have progressed beyond COMPLETE and have desired state REMOVE. These
// tasks are associated with slots that were removed as part of a service scale down
// or service removal.
if t.DesiredState == api.TaskStateRemove && t.Status.State >= api.TaskStateShutdown {
if t.DesiredState == api.TaskStateRemove && t.Status.State >= api.TaskStateCompleted {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the tasks which reached completed state before we removed/downsized the service ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They'll be handled by the regular task retention logic, won't they?

Copy link
Copy Markdown
Contributor

@anshulpundir anshulpundir Dec 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that the reaper will still see update events for those tasks when the desired state is updated to REMOVE.

tr.cleanup = append(tr.cleanup, t.ID)
}
case api.EventUpdateCluster:
Expand Down Expand Up @@ -282,6 +282,8 @@ func (tr *TaskReaper) tick() {

// Stop stops the TaskReaper and waits for the main loop to exit.
func (tr *TaskReaper) Stop() {
// TODO(dperny) calling stop on the task reaper twice will cause a panic
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'

This might be an easy fix:

tr.stopOnce.Do(func() {
 close(tr.stopChan)
})

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i know the easy fix, i just didn't want to "fix" it in an unrelated PR.

// because we try to close a channel that will already have been closed.
close(tr.stopChan)
<-tr.doneChan
}
165 changes: 165 additions & 0 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package taskreaper

import (
"context"

"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/store"
)

// TestTaskReaperInit tests that the task reaper correctly cleans up tasks when
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug was for the service removal case. We should update that test to catch the bug.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anshulpundir does #2476 addresses that? I mean that PR should only go green when this PR is merged?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replied back to @dperny in a separate message. This fix is correct and I realized that as I was adding the unit test in #2476 @marcusmartins

// it is initialized. This will happen every time cluster leadership changes.
func TestTaskReaperInit(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually can you please update the other two tests (TestTaskStateRemoveOnServiceRemoval, TestTaskStateRemoveOnScaledown) to test for TaskStateCompleted ? @dperny

// start up the memory store
ctx := context.Background()
s := store.NewMemoryStore(nil)
require.NotNil(t, s)
defer s.Close()

// Create the basic cluster with precooked tasks we need for the taskreaper
cluster := &api.Cluster{
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
TaskHistoryRetentionLimit: 2,
},
},
}

// this service is alive and active, has no tasks to clean up
service := &api.Service{
ID: "cleanservice",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "cleanservice",
},
Task: api.TaskSpec{
// the runtime spec isn't looked at and doesn't really need to
// be filled in
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 2,
},
},
},
}

// Two clean tasks, these should not be removed
cleantask1 := &api.Task{
ID: "cleantask1",
Slot: 1,
DesiredState: api.TaskStateRunning,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
ServiceID: "cleanservice",
}

cleantask2 := &api.Task{
ID: "cleantask2",
Slot: 2,
DesiredState: api.TaskStateRunning,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
ServiceID: "cleanservice",
}

// this is an old task from when an earlier task failed. It should not be
// removed because it's retained history
retainedtask := &api.Task{
ID: "retainedtask",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateFailed,
},
ServiceID: "cleanservice",
}

// This is a removed task after cleanservice was scaled down
removedtask := &api.Task{
ID: "removedtask",
Slot: 3,
DesiredState: api.TaskStateRemove,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
ServiceID: "cleanservice",
}

// two tasks belonging to a service that does not exist.
// this first one is sitll running and should not be cleaned up
terminaltask1 := &api.Task{
ID: "terminaltask1",
Slot: 1,
DesiredState: api.TaskStateRemove,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
ServiceID: "goneservice",
}

// this second task is shutdown, and can be cleaned up
terminaltask2 := &api.Task{
ID: "terminaltask2",
Slot: 2,
DesiredState: api.TaskStateRemove,
Status: api.TaskStatus{
// use COMPLETE because it's the earliest terminal state
State: api.TaskStateCompleted,
},
ServiceID: "goneservice",
}

err := s.Update(func(tx store.Tx) error {
require.NoError(t, store.CreateCluster(tx, cluster))
require.NoError(t, store.CreateService(tx, service))
require.NoError(t, store.CreateTask(tx, cleantask1))
require.NoError(t, store.CreateTask(tx, cleantask2))
require.NoError(t, store.CreateTask(tx, retainedtask))
require.NoError(t, store.CreateTask(tx, removedtask))
require.NoError(t, store.CreateTask(tx, terminaltask1))
require.NoError(t, store.CreateTask(tx, terminaltask2))
return nil
})
require.NoError(t, err, "Error setting up test fixtures")

// set up the task reaper we'll use for this test
reaper := New(s)

// Now, start the reaper
go reaper.Run(ctx)

// And then stop the reaper. This will cause the reaper to run through its
// whole init phase and then immediately enter the loop body, get the stop
// signal, and exit. plus, it will block until that loop body has been
// reached and the reaper is stopped.
reaper.Stop()

// Now check that all of the tasks are in the state we expect
s.View(func(tx store.ReadTx) {
// the first two clean tasks should exist
assert.NotNil(t, store.GetTask(tx, "cleantask1"))
assert.NotNil(t, store.GetTask(tx, "cleantask1"))
// the retained task should still exist
assert.NotNil(t, store.GetTask(tx, "retainedtask"))
// the removed task should be gone
assert.Nil(t, store.GetTask(tx, "removedtask"))
// the first terminal task, which has not yet shut down, should exist
assert.NotNil(t, store.GetTask(tx, "terminaltask1"))
// and the second terminal task should have been removed
assert.Nil(t, store.GetTask(tx, "terminaltask2"))
})
}