Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions manager/orchestrator/replicated/task_reaper_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package replicated

import (
"github.com/stretchr/testify/require"
"testing"

"github.com/docker/swarmkit/api"
Expand Down Expand Up @@ -412,3 +413,155 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, foundTasks, 0)
}

// TestServiceRemoveDeadTasks tests removal of dead tasks
// (old shutdown tasks) on service remove.
func TestServiceRemoveDeadTasks(t *testing.T) {
ctx := context.Background()
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

assert.NoError(t, s.Update(func(tx store.Tx) error {
store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
// set TaskHistoryRetentionLimit to a negative value, so
// that it is not considered in this test
TaskHistoryRetentionLimit: -1,
},
},
})
return nil
}))

taskReaper := taskreaper.New(s)
defer taskReaper.Stop()
orchestrator := NewReplicatedOrchestrator(s)
defer orchestrator.Stop()

watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/)
defer cancel()

service1 := &api.Service{
ID: "id1",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name1",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 2,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

// Create a service with two instances specified before the orchestrator is
// started. This should result in two tasks when the orchestrator
// starts up.
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
return nil
})
assert.NoError(t, err)

// Start the orchestrator and the reaper.
go func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")

observedTask2 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask2.Status.State)
assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1")

// Set both task states to RUNNING.
updatedTask1 := observedTask1.Copy()
updatedTask1.Status.State = api.TaskStateRunning
updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"}
updatedTask2 := observedTask2.Copy()
updatedTask2.Status.State = api.TaskStateRunning
updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask1))
assert.NoError(t, store.UpdateTask(tx, updatedTask2))
return nil
})
require.NoError(t, err)

testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

// Set both tasks to COMPLETED.
updatedTask3 := observedTask1.Copy()
updatedTask3.Status.State = api.TaskStateCompleted
updatedTask3.ServiceAnnotations = api.Annotations{Name: "original"}
updatedTask4 := observedTask2.Copy()
updatedTask4.Status.State = api.TaskStateCompleted
updatedTask4.ServiceAnnotations = api.Annotations{Name: "original"}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask3))
assert.NoError(t, store.UpdateTask(tx, updatedTask4))
return nil
})
require.NoError(t, err)

// Verify state is set to COMPLETED
observedTask3 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateCompleted, observedTask3.Status.State)
assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name)
observedTask4 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateCompleted, observedTask4.Status.State)
assert.Equal(t, "original", observedTask4.ServiceAnnotations.Name)

// Delete the service.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.DeleteService(tx, service1.ID))
return nil
})

// Service delete should trigger both the task desired statuses
// to be set to REMOVE.
observedTask3 = testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateRemove, observedTask3.DesiredState)
assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name)
observedTask4 = testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateRemove, observedTask4.DesiredState)
assert.Equal(t, "original", observedTask4.ServiceAnnotations.Name)

testutils.Expect(t, watch, state.EventCommit{})

// Task reaper should see the event updates for desired state update
// to REMOVE and should deleted by the reaper.
deletedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStateCompleted, deletedTask1.Status.State)
assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name)
deletedTask2 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStateCompleted, deletedTask2.Status.State)
assert.Equal(t, "original", deletedTask2.ServiceAnnotations.Name)

var foundTasks []*api.Task
s.View(func(tx store.ReadTx) {
foundTasks, err = store.FindTasks(tx, store.All)
})
assert.NoError(t, err)
assert.Len(t, foundTasks, 0)
}