From 760a432c3207e7546ca011b3be65bb7795b4eacc Mon Sep 17 00:00:00 2001 From: Anshul Pundir Date: Fri, 23 Feb 2018 13:25:48 -0800 Subject: [PATCH 1/5] [manager/orchestrator/task_reaper] Fix task reaper test to also set the desired state on tasks to revent reconciliation races. Signed-off-by: Anshul Pundir (cherry picked from commit a388cad309edddb9880899fe8927afbe4717a18e) Signed-off-by: Sebastiaan van Stijn --- manager/orchestrator/replicated/task_reaper_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/manager/orchestrator/replicated/task_reaper_test.go b/manager/orchestrator/replicated/task_reaper_test.go index beb5edbf98..7c60c9886b 100644 --- a/manager/orchestrator/replicated/task_reaper_test.go +++ b/manager/orchestrator/replicated/task_reaper_test.go @@ -493,9 +493,11 @@ func TestServiceRemoveDeadTasks(t *testing.T) { // Set both task states to RUNNING. updatedTask1 := observedTask1.Copy() + updatedTask1.DesiredState = api.TaskStateRunning updatedTask1.Status.State = api.TaskStateRunning updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} updatedTask2 := observedTask2.Copy() + updatedTask2.DesiredState = api.TaskStateRunning updatedTask2.Status.State = api.TaskStateRunning updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} err = s.Update(func(tx store.Tx) error { @@ -512,9 +514,11 @@ func TestServiceRemoveDeadTasks(t *testing.T) { // Set both tasks to COMPLETED. updatedTask3 := observedTask1.Copy() + updatedTask3.DesiredState = api.TaskStateCompleted updatedTask3.Status.State = api.TaskStateCompleted updatedTask3.ServiceAnnotations = api.Annotations{Name: "original"} updatedTask4 := observedTask2.Copy() + updatedTask4.DesiredState = api.TaskStateCompleted updatedTask4.Status.State = api.TaskStateCompleted updatedTask4.ServiceAnnotations = api.Annotations{Name: "original"} err = s.Update(func(tx store.Tx) error { From f47614bf66b83292f6359bdb781beda3e698cc06 Mon Sep 17 00:00:00 2001 From: Anshul Pundir Date: Wed, 4 Apr 2018 16:00:06 -0700 Subject: [PATCH 2/5] [manager/orchestrator/taskreaper] Move task reaper tests to orchestrator/taskreaper Signed-off-by: Anshul Pundir (cherry picked from commit 8cfb337920a6658b302643f27074ca3d669176ec) Signed-off-by: Sebastiaan van Stijn --- .../replicated/task_reaper_test.go | 693 ------------------ .../taskreaper/task_reaper_test.go | 680 +++++++++++++++++ 2 files changed, 680 insertions(+), 693 deletions(-) delete mode 100644 manager/orchestrator/replicated/task_reaper_test.go diff --git a/manager/orchestrator/replicated/task_reaper_test.go b/manager/orchestrator/replicated/task_reaper_test.go deleted file mode 100644 index 7c60c9886b..0000000000 --- a/manager/orchestrator/replicated/task_reaper_test.go +++ /dev/null @@ -1,693 +0,0 @@ -package replicated - -import ( - "github.com/stretchr/testify/require" - "testing" - - "github.com/docker/swarmkit/api" - "github.com/docker/swarmkit/identity" - "github.com/docker/swarmkit/manager/orchestrator/taskreaper" - "github.com/docker/swarmkit/manager/orchestrator/testutils" - "github.com/docker/swarmkit/manager/state" - "github.com/docker/swarmkit/manager/state/store" - gogotypes "github.com/gogo/protobuf/types" - "github.com/stretchr/testify/assert" - "golang.org/x/net/context" -) - -func TestTaskHistory(t *testing.T) { - ctx := context.Background() - s := store.NewMemoryStore(nil) - assert.NotNil(t, s) - defer s.Close() - - assert.NoError(t, s.Update(func(tx store.Tx) error { - store.CreateCluster(tx, &api.Cluster{ - ID: identity.NewID(), - Spec: api.ClusterSpec{ - Annotations: api.Annotations{ - Name: store.DefaultClusterName, - }, - Orchestration: api.OrchestrationConfig{ - TaskHistoryRetentionLimit: 2, - }, - }, - }) - return nil - })) - - taskReaper := taskreaper.New(s) - defer taskReaper.Stop() - orchestrator := NewReplicatedOrchestrator(s) - defer orchestrator.Stop() - - watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) - defer cancel() - - // Create a service with two instances specified before the orchestrator is - // started. This should result in two tasks when the orchestrator - // starts up. - err := s.Update(func(tx store.Tx) error { - j1 := &api.Service{ - ID: "id1", - Spec: api.ServiceSpec{ - Annotations: api.Annotations{ - Name: "name1", - }, - Mode: &api.ServiceSpec_Replicated{ - Replicated: &api.ReplicatedService{ - Replicas: 2, - }, - }, - Task: api.TaskSpec{ - Restart: &api.RestartPolicy{ - Condition: api.RestartOnAny, - Delay: gogotypes.DurationProto(0), - }, - }, - }, - } - assert.NoError(t, store.CreateService(tx, j1)) - return nil - }) - assert.NoError(t, err) - - // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) - - observedTask1 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") - - observedTask2 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, observedTask2.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") - - // Fail both tasks. They should both get restarted. - updatedTask1 := observedTask1.Copy() - updatedTask1.Status.State = api.TaskStateFailed - updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} - updatedTask2 := observedTask2.Copy() - updatedTask2.Status.State = api.TaskStateFailed - updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask1)) - assert.NoError(t, store.UpdateTask(tx, updatedTask2)) - return nil - }) - - testutils.Expect(t, watch, state.EventCommit{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, state.EventCommit{}) - - testutils.Expect(t, watch, api.EventUpdateTask{}) - observedTask3 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") - - testutils.Expect(t, watch, api.EventUpdateTask{}) - observedTask4 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, observedTask4.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask4.ServiceAnnotations.Name, "name1") - - // Fail these replacement tasks. Since TaskHistory is set to 2, this - // should cause the oldest tasks for each instance to get deleted. - updatedTask3 := observedTask3.Copy() - updatedTask3.Status.State = api.TaskStateFailed - updatedTask4 := observedTask4.Copy() - updatedTask4.Status.State = api.TaskStateFailed - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask3)) - assert.NoError(t, store.UpdateTask(tx, updatedTask4)) - return nil - }) - - deletedTask1 := testutils.WatchTaskDelete(t, watch) - deletedTask2 := testutils.WatchTaskDelete(t, watch) - - assert.Equal(t, api.TaskStateFailed, deletedTask1.Status.State) - assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) - assert.Equal(t, api.TaskStateFailed, deletedTask2.Status.State) - assert.Equal(t, "original", deletedTask2.ServiceAnnotations.Name) - - var foundTasks []*api.Task - s.View(func(tx store.ReadTx) { - foundTasks, err = store.FindTasks(tx, store.All) - }) - assert.NoError(t, err) - assert.Len(t, foundTasks, 4) -} - -// TestTaskStateRemoveOnScaledown tests that on service scale down, task desired -// states are set to REMOVE. Then, when the agent shuts the task down (simulated -// by setting the task state to SHUTDOWN), the task reaper actually deletes -// the tasks from the store. -func TestTaskStateRemoveOnScaledown(t *testing.T) { - ctx := context.Background() - s := store.NewMemoryStore(nil) - assert.NotNil(t, s) - defer s.Close() - - assert.NoError(t, s.Update(func(tx store.Tx) error { - store.CreateCluster(tx, &api.Cluster{ - ID: identity.NewID(), - Spec: api.ClusterSpec{ - Annotations: api.Annotations{ - Name: store.DefaultClusterName, - }, - Orchestration: api.OrchestrationConfig{ - // set TaskHistoryRetentionLimit to a negative value, so - // that it is not considered in this test - TaskHistoryRetentionLimit: -1, - }, - }, - }) - return nil - })) - - taskReaper := taskreaper.New(s) - defer taskReaper.Stop() - orchestrator := NewReplicatedOrchestrator(s) - defer orchestrator.Stop() - - // watch all incoming events - watch, cancel := state.Watch(s.WatchQueue()) - defer cancel() - - service1 := &api.Service{ - ID: "id1", - Spec: api.ServiceSpec{ - Annotations: api.Annotations{ - Name: "name1", - }, - Mode: &api.ServiceSpec_Replicated{ - Replicated: &api.ReplicatedService{ - Replicas: 2, - }, - }, - Task: api.TaskSpec{ - Restart: &api.RestartPolicy{ - Condition: api.RestartOnAny, - Delay: gogotypes.DurationProto(0), - }, - }, - }, - } - - // Create a service with two instances specified before the orchestrator is - // started. This should result in two tasks when the orchestrator - // starts up. - err := s.Update(func(tx store.Tx) error { - assert.NoError(t, store.CreateService(tx, service1)) - return nil - }) - assert.NoError(t, err) - - // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) - - observedTask1 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") - - observedTask2 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, observedTask2.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") - - // Set both tasks to RUNNING, so the service is successfully running - updatedTask1 := observedTask1.Copy() - updatedTask1.Status.State = api.TaskStateRunning - updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} - updatedTask2 := observedTask2.Copy() - updatedTask2.Status.State = api.TaskStateRunning - updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask1)) - assert.NoError(t, store.UpdateTask(tx, updatedTask2)) - return nil - }) - - testutils.Expect(t, watch, state.EventCommit{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, state.EventCommit{}) - - // Scale the service down to one instance. This should trigger one of the task - // statuses to be set to REMOVE. - service1.Spec.GetReplicated().Replicas = 1 - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateService(tx, service1)) - return nil - }) - - observedTask3 := testutils.WatchTaskUpdate(t, watch) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateRemove) - assert.Equal(t, observedTask3.ServiceAnnotations.Name, "original") - - testutils.Expect(t, watch, state.EventCommit{}) - - // Now the task for which desired state was set to REMOVE must be deleted by the task reaper. - // Shut this task down first (simulates shut down by agent) - updatedTask3 := observedTask3.Copy() - updatedTask3.Status.State = api.TaskStateShutdown - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask3)) - return nil - }) - - deletedTask1 := testutils.WatchTaskDelete(t, watch) - - assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State) - assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) - - var foundTasks []*api.Task - s.View(func(tx store.ReadTx) { - foundTasks, err = store.FindTasks(tx, store.All) - }) - assert.NoError(t, err) - assert.Len(t, foundTasks, 1) -} - -// TestTaskStateRemoveOnServiceRemoval tests that on service removal, task desired -// states are set to REMOVE. Then, when the agent shuts the task down (simulated -// by setting the task state to SHUTDOWN), the task reaper actually deletes -// the tasks from the store. -func TestTaskStateRemoveOnServiceRemoval(t *testing.T) { - ctx := context.Background() - s := store.NewMemoryStore(nil) - assert.NotNil(t, s) - defer s.Close() - - assert.NoError(t, s.Update(func(tx store.Tx) error { - store.CreateCluster(tx, &api.Cluster{ - ID: identity.NewID(), - Spec: api.ClusterSpec{ - Annotations: api.Annotations{ - Name: store.DefaultClusterName, - }, - Orchestration: api.OrchestrationConfig{ - // set TaskHistoryRetentionLimit to a negative value, so - // that it is not considered in this test - TaskHistoryRetentionLimit: -1, - }, - }, - }) - return nil - })) - - taskReaper := taskreaper.New(s) - defer taskReaper.Stop() - orchestrator := NewReplicatedOrchestrator(s) - defer orchestrator.Stop() - - watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) - defer cancel() - - service1 := &api.Service{ - ID: "id1", - Spec: api.ServiceSpec{ - Annotations: api.Annotations{ - Name: "name1", - }, - Mode: &api.ServiceSpec_Replicated{ - Replicated: &api.ReplicatedService{ - Replicas: 2, - }, - }, - Task: api.TaskSpec{ - Restart: &api.RestartPolicy{ - Condition: api.RestartOnAny, - Delay: gogotypes.DurationProto(0), - }, - }, - }, - } - - // Create a service with two instances specified before the orchestrator is - // started. This should result in two tasks when the orchestrator - // starts up. - err := s.Update(func(tx store.Tx) error { - assert.NoError(t, store.CreateService(tx, service1)) - return nil - }) - assert.NoError(t, err) - - // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) - - observedTask1 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") - - observedTask2 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, observedTask2.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") - - // Set both tasks to RUNNING, so the service is successfully running - updatedTask1 := observedTask1.Copy() - updatedTask1.Status.State = api.TaskStateRunning - updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} - updatedTask2 := observedTask2.Copy() - updatedTask2.Status.State = api.TaskStateRunning - updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask1)) - assert.NoError(t, store.UpdateTask(tx, updatedTask2)) - return nil - }) - - testutils.Expect(t, watch, state.EventCommit{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, state.EventCommit{}) - - // Delete the service. This should trigger both the task desired statuses to be set to REMOVE. - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.DeleteService(tx, service1.ID)) - return nil - }) - - observedTask3 := testutils.WatchTaskUpdate(t, watch) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateRemove) - assert.Equal(t, observedTask3.ServiceAnnotations.Name, "original") - observedTask4 := testutils.WatchTaskUpdate(t, watch) - assert.Equal(t, observedTask4.DesiredState, api.TaskStateRemove) - assert.Equal(t, observedTask4.ServiceAnnotations.Name, "original") - - testutils.Expect(t, watch, state.EventCommit{}) - - // Now the tasks must be deleted by the task reaper. - // Shut them down first (simulates shut down by agent) - updatedTask3 := observedTask3.Copy() - updatedTask3.Status.State = api.TaskStateShutdown - updatedTask4 := observedTask4.Copy() - updatedTask4.Status.State = api.TaskStateShutdown - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask3)) - assert.NoError(t, store.UpdateTask(tx, updatedTask4)) - return nil - }) - - deletedTask1 := testutils.WatchTaskDelete(t, watch) - assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State) - assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) - - deletedTask2 := testutils.WatchTaskDelete(t, watch) - assert.Equal(t, api.TaskStateShutdown, deletedTask2.Status.State) - assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) - - var foundTasks []*api.Task - s.View(func(tx store.ReadTx) { - foundTasks, err = store.FindTasks(tx, store.All) - }) - assert.NoError(t, err) - assert.Len(t, foundTasks, 0) -} - -// TestServiceRemoveDeadTasks tests removal of dead tasks -// (old shutdown tasks) on service remove. -func TestServiceRemoveDeadTasks(t *testing.T) { - ctx := context.Background() - s := store.NewMemoryStore(nil) - assert.NotNil(t, s) - defer s.Close() - - assert.NoError(t, s.Update(func(tx store.Tx) error { - store.CreateCluster(tx, &api.Cluster{ - ID: identity.NewID(), - Spec: api.ClusterSpec{ - Annotations: api.Annotations{ - Name: store.DefaultClusterName, - }, - Orchestration: api.OrchestrationConfig{ - // set TaskHistoryRetentionLimit to a negative value, so - // that it is not considered in this test - TaskHistoryRetentionLimit: -1, - }, - }, - }) - return nil - })) - - taskReaper := taskreaper.New(s) - defer taskReaper.Stop() - orchestrator := NewReplicatedOrchestrator(s) - defer orchestrator.Stop() - - watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) - defer cancel() - - service1 := &api.Service{ - ID: "id1", - Spec: api.ServiceSpec{ - Annotations: api.Annotations{ - Name: "name1", - }, - Mode: &api.ServiceSpec_Replicated{ - Replicated: &api.ReplicatedService{ - Replicas: 2, - }, - }, - Task: api.TaskSpec{ - Restart: &api.RestartPolicy{ - // Turn off restart to get an accurate count on tasks. - Condition: api.RestartOnNone, - Delay: gogotypes.DurationProto(0), - }, - }, - }, - } - - // Create a service with two instances specified before the orchestrator is - // started. This should result in two tasks when the orchestrator - // starts up. - err := s.Update(func(tx store.Tx) error { - assert.NoError(t, store.CreateService(tx, service1)) - return nil - }) - assert.NoError(t, err) - - // Start the orchestrator and the reaper. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) - - observedTask1 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) - assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") - - observedTask2 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, api.TaskStateNew, observedTask2.Status.State) - assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") - - // Set both task states to RUNNING. - updatedTask1 := observedTask1.Copy() - updatedTask1.DesiredState = api.TaskStateRunning - updatedTask1.Status.State = api.TaskStateRunning - updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} - updatedTask2 := observedTask2.Copy() - updatedTask2.DesiredState = api.TaskStateRunning - updatedTask2.Status.State = api.TaskStateRunning - updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask1)) - assert.NoError(t, store.UpdateTask(tx, updatedTask2)) - return nil - }) - require.NoError(t, err) - - testutils.Expect(t, watch, state.EventCommit{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, state.EventCommit{}) - - // Set both tasks to COMPLETED. - updatedTask3 := observedTask1.Copy() - updatedTask3.DesiredState = api.TaskStateCompleted - updatedTask3.Status.State = api.TaskStateCompleted - updatedTask3.ServiceAnnotations = api.Annotations{Name: "original"} - updatedTask4 := observedTask2.Copy() - updatedTask4.DesiredState = api.TaskStateCompleted - updatedTask4.Status.State = api.TaskStateCompleted - updatedTask4.ServiceAnnotations = api.Annotations{Name: "original"} - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask3)) - assert.NoError(t, store.UpdateTask(tx, updatedTask4)) - return nil - }) - require.NoError(t, err) - - // Verify state is set to COMPLETED - observedTask3 := testutils.WatchTaskUpdate(t, watch) - assert.Equal(t, api.TaskStateCompleted, observedTask3.Status.State) - assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name) - observedTask4 := testutils.WatchTaskUpdate(t, watch) - assert.Equal(t, api.TaskStateCompleted, observedTask4.Status.State) - assert.Equal(t, "original", observedTask4.ServiceAnnotations.Name) - - // Delete the service. - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.DeleteService(tx, service1.ID)) - return nil - }) - - // Service delete should trigger both the task desired statuses - // to be set to REMOVE. - observedTask3 = testutils.WatchTaskUpdate(t, watch) - assert.Equal(t, api.TaskStateRemove, observedTask3.DesiredState) - assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name) - observedTask4 = testutils.WatchTaskUpdate(t, watch) - assert.Equal(t, api.TaskStateRemove, observedTask4.DesiredState) - assert.Equal(t, "original", observedTask4.ServiceAnnotations.Name) - - testutils.Expect(t, watch, state.EventCommit{}) - - // Task reaper should see the event updates for desired state update - // to REMOVE and should deleted by the reaper. - deletedTask1 := testutils.WatchTaskDelete(t, watch) - assert.Equal(t, api.TaskStateCompleted, deletedTask1.Status.State) - assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) - deletedTask2 := testutils.WatchTaskDelete(t, watch) - assert.Equal(t, api.TaskStateCompleted, deletedTask2.Status.State) - assert.Equal(t, "original", deletedTask2.ServiceAnnotations.Name) - - var foundTasks []*api.Task - s.View(func(tx store.ReadTx) { - foundTasks, err = store.FindTasks(tx, store.All) - }) - assert.NoError(t, err) - assert.Len(t, foundTasks, 0) -} - -// TestServiceRemoveDeadTasks tests removal of -// tasks in state < TaskStateAssigned. -func TestServiceRemoveUnassignedTasks(t *testing.T) { - ctx := context.Background() - s := store.NewMemoryStore(nil) - assert.NotNil(t, s) - defer s.Close() - - assert.NoError(t, s.Update(func(tx store.Tx) error { - store.CreateCluster(tx, &api.Cluster{ - ID: identity.NewID(), - Spec: api.ClusterSpec{ - Annotations: api.Annotations{ - Name: store.DefaultClusterName, - }, - Orchestration: api.OrchestrationConfig{ - // set TaskHistoryRetentionLimit to a negative value, so - // that tasks are cleaned up right away. - TaskHistoryRetentionLimit: 1, - }, - }, - }) - return nil - })) - - taskReaper := taskreaper.New(s) - defer taskReaper.Stop() - orchestrator := NewReplicatedOrchestrator(s) - defer orchestrator.Stop() - - watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) - defer cancel() - - service1 := &api.Service{ - ID: "id1", - Spec: api.ServiceSpec{ - Annotations: api.Annotations{ - Name: "name1", - }, - Mode: &api.ServiceSpec_Replicated{ - Replicated: &api.ReplicatedService{ - Replicas: 1, - }, - }, - Task: api.TaskSpec{ - Restart: &api.RestartPolicy{ - // Turn off restart to get an accurate count on tasks. - Condition: api.RestartOnNone, - Delay: gogotypes.DurationProto(0), - }, - }, - }, - } - - // Create a service with one replica specified before the orchestrator is - // started. This should result in two tasks when the orchestrator - // starts up. - err := s.Update(func(tx store.Tx) error { - assert.NoError(t, store.CreateService(tx, service1)) - return nil - }) - assert.NoError(t, err) - - // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) - - observedTask1 := testutils.WatchTaskCreate(t, watch) - assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) - assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") - - // Set the task state to PENDING to simulate allocation. - updatedTask1 := observedTask1.Copy() - updatedTask1.Status.State = api.TaskStatePending - updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateTask(tx, updatedTask1)) - return nil - }) - require.NoError(t, err) - - testutils.Expect(t, watch, state.EventCommit{}) - testutils.Expect(t, watch, api.EventUpdateTask{}) - testutils.Expect(t, watch, state.EventCommit{}) - - service1.Spec.Task.ForceUpdate++ - // This should shutdown the previous task and create a new one. - err = s.Update(func(tx store.Tx) error { - assert.NoError(t, store.UpdateService(tx, service1)) - return nil - }) - testutils.Expect(t, watch, api.EventUpdateService{}) - testutils.Expect(t, watch, state.EventCommit{}) - - // New task should be created and old task marked for SHUTDOWN. - observedTask1 = testutils.WatchTaskCreate(t, watch) - assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) - assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") - - observedTask3 := testutils.WatchTaskUpdate(t, watch) - assert.Equal(t, api.TaskStateShutdown, observedTask3.DesiredState) - assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name) - - testutils.Expect(t, watch, state.EventCommit{}) - - // Task reaper should delete the task previously marked for SHUTDOWN. - deletedTask1 := testutils.WatchTaskDelete(t, watch) - assert.Equal(t, api.TaskStatePending, deletedTask1.Status.State) - assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) - - testutils.Expect(t, watch, state.EventCommit{}) - - var foundTasks []*api.Task - s.View(func(tx store.ReadTx) { - foundTasks, err = store.FindTasks(tx, store.All) - }) - assert.NoError(t, err) - assert.Len(t, foundTasks, 1) -} diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index 3a586d7ebe..d4bad21d1d 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -9,7 +9,12 @@ import ( "github.com/stretchr/testify/require" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/identity" + "github.com/docker/swarmkit/manager/orchestrator/replicated" + "github.com/docker/swarmkit/manager/orchestrator/testutils" + "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" + gogotypes "github.com/gogo/protobuf/types" ) // TestTaskReaperInit tests that the task reaper correctly cleans up tasks when @@ -191,3 +196,678 @@ func TestTaskReaperInit(t *testing.T) { assert.Nil(t, store.GetTask(tx, "earlytask2")) }) } + +func TestTaskHistory(t *testing.T) { + ctx := context.Background() + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + assert.NoError(t, s.Update(func(tx store.Tx) error { + store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + TaskHistoryRetentionLimit: 2, + }, + }, + }) + return nil + })) + + taskReaper := New(s) + defer taskReaper.Stop() + orchestrator := replicated.NewReplicatedOrchestrator(s) + defer orchestrator.Stop() + + watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) + defer cancel() + + // Create a service with two instances specified before the orchestrator is + // started. This should result in two tasks when the orchestrator + // starts up. + err := s.Update(func(tx store.Tx) error { + j1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 2, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + Condition: api.RestartOnAny, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, j1)) + return nil + }) + assert.NoError(t, err) + + // Start the orchestrator. + go func() { + assert.NoError(t, orchestrator.Run(ctx)) + }() + go taskReaper.Run(ctx) + + observedTask1 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + + observedTask2 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask2.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") + + // Fail both tasks. They should both get restarted. + updatedTask1 := observedTask1.Copy() + updatedTask1.Status.State = api.TaskStateFailed + updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} + updatedTask2 := observedTask2.Copy() + updatedTask2.Status.State = api.TaskStateFailed + updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask1)) + assert.NoError(t, store.UpdateTask(tx, updatedTask2)) + return nil + }) + + testutils.Expect(t, watch, state.EventCommit{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, state.EventCommit{}) + + testutils.Expect(t, watch, api.EventUpdateTask{}) + observedTask3 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") + + testutils.Expect(t, watch, api.EventUpdateTask{}) + observedTask4 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask4.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask4.ServiceAnnotations.Name, "name1") + + // Fail these replacement tasks. Since TaskHistory is set to 2, this + // should cause the oldest tasks for each instance to get deleted. + updatedTask3 := observedTask3.Copy() + updatedTask3.Status.State = api.TaskStateFailed + updatedTask4 := observedTask4.Copy() + updatedTask4.Status.State = api.TaskStateFailed + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask3)) + assert.NoError(t, store.UpdateTask(tx, updatedTask4)) + return nil + }) + + deletedTask1 := testutils.WatchTaskDelete(t, watch) + deletedTask2 := testutils.WatchTaskDelete(t, watch) + + assert.Equal(t, api.TaskStateFailed, deletedTask1.Status.State) + assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) + assert.Equal(t, api.TaskStateFailed, deletedTask2.Status.State) + assert.Equal(t, "original", deletedTask2.ServiceAnnotations.Name) + + var foundTasks []*api.Task + s.View(func(tx store.ReadTx) { + foundTasks, err = store.FindTasks(tx, store.All) + }) + assert.NoError(t, err) + assert.Len(t, foundTasks, 4) +} + +// TestTaskStateRemoveOnScaledown tests that on service scale down, task desired +// states are set to REMOVE. Then, when the agent shuts the task down (simulated +// by setting the task state to SHUTDOWN), the task reaper actually deletes +// the tasks from the store. +func TestTaskStateRemoveOnScaledown(t *testing.T) { + ctx := context.Background() + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + assert.NoError(t, s.Update(func(tx store.Tx) error { + store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + // set TaskHistoryRetentionLimit to a negative value, so + // that it is not considered in this test + TaskHistoryRetentionLimit: -1, + }, + }, + }) + return nil + })) + + taskReaper := New(s) + defer taskReaper.Stop() + orchestrator := replicated.NewReplicatedOrchestrator(s) + defer orchestrator.Stop() + + // watch all incoming events + watch, cancel := state.Watch(s.WatchQueue()) + defer cancel() + + service1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 2, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + Condition: api.RestartOnAny, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create a service with two instances specified before the orchestrator is + // started. This should result in two tasks when the orchestrator + // starts up. + err := s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, service1)) + return nil + }) + assert.NoError(t, err) + + // Start the orchestrator. + go func() { + assert.NoError(t, orchestrator.Run(ctx)) + }() + go taskReaper.Run(ctx) + + observedTask1 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + + observedTask2 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask2.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") + + // Set both tasks to RUNNING, so the service is successfully running + updatedTask1 := observedTask1.Copy() + updatedTask1.Status.State = api.TaskStateRunning + updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} + updatedTask2 := observedTask2.Copy() + updatedTask2.Status.State = api.TaskStateRunning + updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask1)) + assert.NoError(t, store.UpdateTask(tx, updatedTask2)) + return nil + }) + + testutils.Expect(t, watch, state.EventCommit{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, state.EventCommit{}) + + // Scale the service down to one instance. This should trigger one of the task + // statuses to be set to REMOVE. + service1.Spec.GetReplicated().Replicas = 1 + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateService(tx, service1)) + return nil + }) + + observedTask3 := testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateRemove) + assert.Equal(t, observedTask3.ServiceAnnotations.Name, "original") + + testutils.Expect(t, watch, state.EventCommit{}) + + // Now the task for which desired state was set to REMOVE must be deleted by the task reaper. + // Shut this task down first (simulates shut down by agent) + updatedTask3 := observedTask3.Copy() + updatedTask3.Status.State = api.TaskStateShutdown + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask3)) + return nil + }) + + deletedTask1 := testutils.WatchTaskDelete(t, watch) + + assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State) + assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) + + var foundTasks []*api.Task + s.View(func(tx store.ReadTx) { + foundTasks, err = store.FindTasks(tx, store.All) + }) + assert.NoError(t, err) + assert.Len(t, foundTasks, 1) +} + +// TestTaskStateRemoveOnServiceRemoval tests that on service removal, task desired +// states are set to REMOVE. Then, when the agent shuts the task down (simulated +// by setting the task state to SHUTDOWN), the task reaper actually deletes +// the tasks from the store. +func TestTaskStateRemoveOnServiceRemoval(t *testing.T) { + ctx := context.Background() + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + assert.NoError(t, s.Update(func(tx store.Tx) error { + store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + // set TaskHistoryRetentionLimit to a negative value, so + // that it is not considered in this test + TaskHistoryRetentionLimit: -1, + }, + }, + }) + return nil + })) + + taskReaper := New(s) + defer taskReaper.Stop() + orchestrator := replicated.NewReplicatedOrchestrator(s) + defer orchestrator.Stop() + + watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) + defer cancel() + + service1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 2, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + Condition: api.RestartOnAny, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create a service with two instances specified before the orchestrator is + // started. This should result in two tasks when the orchestrator + // starts up. + err := s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, service1)) + return nil + }) + assert.NoError(t, err) + + // Start the orchestrator. + go func() { + assert.NoError(t, orchestrator.Run(ctx)) + }() + go taskReaper.Run(ctx) + + observedTask1 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + + observedTask2 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, observedTask2.Status.State, api.TaskStateNew) + assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") + + // Set both tasks to RUNNING, so the service is successfully running + updatedTask1 := observedTask1.Copy() + updatedTask1.Status.State = api.TaskStateRunning + updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} + updatedTask2 := observedTask2.Copy() + updatedTask2.Status.State = api.TaskStateRunning + updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask1)) + assert.NoError(t, store.UpdateTask(tx, updatedTask2)) + return nil + }) + + testutils.Expect(t, watch, state.EventCommit{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, state.EventCommit{}) + + // Delete the service. This should trigger both the task desired statuses to be set to REMOVE. + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteService(tx, service1.ID)) + return nil + }) + + observedTask3 := testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateRemove) + assert.Equal(t, observedTask3.ServiceAnnotations.Name, "original") + observedTask4 := testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, observedTask4.DesiredState, api.TaskStateRemove) + assert.Equal(t, observedTask4.ServiceAnnotations.Name, "original") + + testutils.Expect(t, watch, state.EventCommit{}) + + // Now the tasks must be deleted by the task reaper. + // Shut them down first (simulates shut down by agent) + updatedTask3 := observedTask3.Copy() + updatedTask3.Status.State = api.TaskStateShutdown + updatedTask4 := observedTask4.Copy() + updatedTask4.Status.State = api.TaskStateShutdown + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask3)) + assert.NoError(t, store.UpdateTask(tx, updatedTask4)) + return nil + }) + + deletedTask1 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State) + assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) + + deletedTask2 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStateShutdown, deletedTask2.Status.State) + assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) + + var foundTasks []*api.Task + s.View(func(tx store.ReadTx) { + foundTasks, err = store.FindTasks(tx, store.All) + }) + assert.NoError(t, err) + assert.Len(t, foundTasks, 0) +} + +// TestServiceRemoveDeadTasks tests removal of dead tasks +// (old shutdown tasks) on service remove. +func TestServiceRemoveDeadTasks(t *testing.T) { + ctx := context.Background() + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + assert.NoError(t, s.Update(func(tx store.Tx) error { + store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + // set TaskHistoryRetentionLimit to a negative value, so + // that it is not considered in this test + TaskHistoryRetentionLimit: -1, + }, + }, + }) + return nil + })) + + taskReaper := New(s) + defer taskReaper.Stop() + orchestrator := replicated.NewReplicatedOrchestrator(s) + defer orchestrator.Stop() + + watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) + defer cancel() + + service1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 2, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + // Turn off restart to get an accurate count on tasks. + Condition: api.RestartOnNone, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create a service with two instances specified before the orchestrator is + // started. This should result in two tasks when the orchestrator + // starts up. + err := s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, service1)) + return nil + }) + assert.NoError(t, err) + + // Start the orchestrator and the reaper. + go func() { + assert.NoError(t, orchestrator.Run(ctx)) + }() + go taskReaper.Run(ctx) + + observedTask1 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + + observedTask2 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, api.TaskStateNew, observedTask2.Status.State) + assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1") + + // Set both task states to RUNNING. + updatedTask1 := observedTask1.Copy() + updatedTask1.Status.State = api.TaskStateRunning + updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} + updatedTask2 := observedTask2.Copy() + updatedTask2.Status.State = api.TaskStateRunning + updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"} + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask1)) + assert.NoError(t, store.UpdateTask(tx, updatedTask2)) + return nil + }) + require.NoError(t, err) + + testutils.Expect(t, watch, state.EventCommit{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, state.EventCommit{}) + + // Set both tasks to COMPLETED. + updatedTask3 := observedTask1.Copy() + updatedTask3.DesiredState = api.TaskStateCompleted + updatedTask3.Status.State = api.TaskStateCompleted + updatedTask3.ServiceAnnotations = api.Annotations{Name: "original"} + updatedTask4 := observedTask2.Copy() + updatedTask4.DesiredState = api.TaskStateCompleted + updatedTask4.Status.State = api.TaskStateCompleted + updatedTask4.ServiceAnnotations = api.Annotations{Name: "original"} + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask3)) + assert.NoError(t, store.UpdateTask(tx, updatedTask4)) + return nil + }) + require.NoError(t, err) + + // Verify state is set to COMPLETED + observedTask3 := testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, api.TaskStateCompleted, observedTask3.Status.State) + assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name) + observedTask4 := testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, api.TaskStateCompleted, observedTask4.Status.State) + assert.Equal(t, "original", observedTask4.ServiceAnnotations.Name) + + // Delete the service. + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteService(tx, service1.ID)) + return nil + }) + + // Service delete should trigger both the task desired statuses + // to be set to REMOVE. + observedTask3 = testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, api.TaskStateRemove, observedTask3.DesiredState) + assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name) + observedTask4 = testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, api.TaskStateRemove, observedTask4.DesiredState) + assert.Equal(t, "original", observedTask4.ServiceAnnotations.Name) + + testutils.Expect(t, watch, state.EventCommit{}) + + // Task reaper should see the event updates for desired state update + // to REMOVE and should deleted by the reaper. + deletedTask1 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStateCompleted, deletedTask1.Status.State) + assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) + deletedTask2 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStateCompleted, deletedTask2.Status.State) + assert.Equal(t, "original", deletedTask2.ServiceAnnotations.Name) + + var foundTasks []*api.Task + s.View(func(tx store.ReadTx) { + foundTasks, err = store.FindTasks(tx, store.All) + }) + assert.NoError(t, err) + assert.Len(t, foundTasks, 0) +} + +// TestServiceRemoveDeadTasks tests removal of +// tasks in state < TaskStateAssigned. +func TestServiceRemoveUnassignedTasks(t *testing.T) { + ctx := context.Background() + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + assert.NoError(t, s.Update(func(tx store.Tx) error { + store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + // set TaskHistoryRetentionLimit to a negative value, so + // that tasks are cleaned up right away. + TaskHistoryRetentionLimit: 1, + }, + }, + }) + return nil + })) + + taskReaper := New(s) + defer taskReaper.Stop() + orchestrator := replicated.NewReplicatedOrchestrator(s) + defer orchestrator.Stop() + + watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) + defer cancel() + + service1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 1, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + // Turn off restart to get an accurate count on tasks. + Condition: api.RestartOnNone, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create a service with one replica specified before the orchestrator is + // started. This should result in two tasks when the orchestrator + // starts up. + err := s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, service1)) + return nil + }) + assert.NoError(t, err) + + // Start the orchestrator. + go func() { + assert.NoError(t, orchestrator.Run(ctx)) + }() + go taskReaper.Run(ctx) + + observedTask1 := testutils.WatchTaskCreate(t, watch) + assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + + // Set the task state to PENDING to simulate allocation. + updatedTask1 := observedTask1.Copy() + updatedTask1.Status.State = api.TaskStatePending + updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"} + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, updatedTask1)) + return nil + }) + require.NoError(t, err) + + testutils.Expect(t, watch, state.EventCommit{}) + testutils.Expect(t, watch, api.EventUpdateTask{}) + testutils.Expect(t, watch, state.EventCommit{}) + + service1.Spec.Task.ForceUpdate++ + // This should shutdown the previous task and create a new one. + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateService(tx, service1)) + return nil + }) + testutils.Expect(t, watch, api.EventUpdateService{}) + testutils.Expect(t, watch, state.EventCommit{}) + + // New task should be created and old task marked for SHUTDOWN. + observedTask1 = testutils.WatchTaskCreate(t, watch) + assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) + assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1") + + observedTask3 := testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, api.TaskStateShutdown, observedTask3.DesiredState) + assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name) + + testutils.Expect(t, watch, state.EventCommit{}) + + // Task reaper should delete the task previously marked for SHUTDOWN. + deletedTask1 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStatePending, deletedTask1.Status.State) + assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name) + + testutils.Expect(t, watch, state.EventCommit{}) + + var foundTasks []*api.Task + s.View(func(tx store.ReadTx) { + foundTasks, err = store.FindTasks(tx, store.All) + }) + assert.NoError(t, err) + assert.Len(t, foundTasks, 1) +} From 697b16c0459a3dfa37de4fbda66f0394ca7d22d7 Mon Sep 17 00:00:00 2001 From: Anshul Pundir Date: Tue, 19 Jun 2018 11:25:27 -0700 Subject: [PATCH 3/5] [orchestrator/task reaper] Clean up tasks in dirty list for which the service has been deleted. Signed-off-by: Anshul Pundir (cherry picked from commit 592e8eddfa43ec5fbd6e34da5ad6890dfa9313fb) Signed-off-by: Sebastiaan van Stijn --- manager/orchestrator/taskreaper/task_reaper.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/manager/orchestrator/taskreaper/task_reaper.go b/manager/orchestrator/taskreaper/task_reaper.go index 1c93b77fbe..3bdd8b4220 100644 --- a/manager/orchestrator/taskreaper/task_reaper.go +++ b/manager/orchestrator/taskreaper/task_reaper.go @@ -188,6 +188,9 @@ func (tr *TaskReaper) tick() { for dirty := range tr.dirty { service := store.GetService(tx, dirty.ServiceID) if service == nil { + // If the service can't be found, assume that it was deleted + // and remove the slot from the dirty list. + delete(tr.dirty, dirty) continue } From 882c07d076bca9fbd4433a5047e76417ed0833fb Mon Sep 17 00:00:00 2001 From: Anshul Pundir Date: Fri, 22 Jun 2018 14:49:33 -0700 Subject: [PATCH 4/5] [manager/orchestrator/reaper] Clean out the task reaper dirty set at the end of tick() Signed-off-by: Anshul Pundir (cherry picked from commit 1a43a3b612d8c775db8a44c8399844e1f7e4aed2) Signed-off-by: Sebastiaan van Stijn --- .../orchestrator/taskreaper/task_reaper.go | 23 +- .../taskreaper/task_reaper_test.go | 207 +++++++++++++++++- 2 files changed, 227 insertions(+), 3 deletions(-) diff --git a/manager/orchestrator/taskreaper/task_reaper.go b/manager/orchestrator/taskreaper/task_reaper.go index 3bdd8b4220..9dbb0851e6 100644 --- a/manager/orchestrator/taskreaper/task_reaper.go +++ b/manager/orchestrator/taskreaper/task_reaper.go @@ -184,12 +184,23 @@ func (tr *TaskReaper) tick() { } // Check history of dirty tasks for cleanup. + // Note: Clean out the dirty set at the end of this tick iteration + // in all but one scenarios (documented below). + // When tick() finishes, the tasks in the slot were either cleaned up, + // or it was skipped because it didn't meet the criteria for cleaning. + // Either way, we can discard the dirty set because future events on + // that slot will cause the task to be readded to the dirty set + // at that point. + // + // The only case when we keep the slot dirty is when there are more + // than one running tasks present for a given slot. + // In that case, we need to keep the slot dirty to allow it to be + // cleaned when tick() is called next and one or more the tasks + // in that slot have stopped running. tr.store.View(func(tx store.ReadTx) { for dirty := range tr.dirty { service := store.GetService(tx, dirty.ServiceID) if service == nil { - // If the service can't be found, assume that it was deleted - // and remove the slot from the dirty list. delete(tr.dirty, dirty) continue } @@ -214,6 +225,7 @@ func (tr *TaskReaper) tick() { // Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history. if taskHistory < 0 { + delete(tr.dirty, dirty) continue } @@ -243,6 +255,7 @@ func (tr *TaskReaper) tick() { } if int64(len(historicTasks)) <= taskHistory { + delete(tr.dirty, dirty) continue } @@ -273,6 +286,12 @@ func (tr *TaskReaper) tick() { } } + // The only case when we keep the slot dirty at the end of tick() + // is when there are more than one running tasks present + // for a given slot. + // In that case, we keep the slot dirty to allow it to be + // cleaned when tick() is called next and one or more of + // the tasks in that slot have stopped running. if runningTasks <= 1 { delete(tr.dirty, dirty) } diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index d4bad21d1d..f1fe7c69ef 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -2,6 +2,7 @@ package taskreaper import ( "context" + "github.com/docker/swarmkit/manager/orchestrator" "testing" @@ -805,7 +806,7 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { } // Create a service with one replica specified before the orchestrator is - // started. This should result in two tasks when the orchestrator + // started. This should result in one tasks when the orchestrator // starts up. err := s.Update(func(tx store.Tx) error { assert.NoError(t, store.CreateService(tx, service1)) @@ -871,3 +872,207 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { assert.NoError(t, err) assert.Len(t, foundTasks, 1) } + +func setupTaskReaperDirty(tr *TaskReaper) { + tr.dirty[orchestrator.SlotTuple{ + Slot: 1, + ServiceID: "id1", + NodeID: "node1", + }] = struct{}{} + tr.dirty[orchestrator.SlotTuple{ + Slot: 1, + ServiceID: "id2", + NodeID: "node1", + }] = struct{}{} +} + +// TestTick unit-tests the task reaper tick function. +// 1. Test that the dirty set is cleaned up when the service can't be found. +// 2. Test that the dirty set is cleaned up when the number of total tasks +// is smaller than the retention limit. +// 3. Test that the dirty set and excess tasks in the store are cleaned up +// when there the number of total tasks is greater than the retention limit. +func TestTick(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + assert.NoError(t, s.Update(func(tx store.Tx) error { + store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + // set TaskHistoryRetentionLimit to a negative value, so + // that tasks are cleaned up right away. + TaskHistoryRetentionLimit: 1, + }, + }, + }) + return nil + })) + + // create the task reaper. + taskReaper := New(s) + + // Test # 1 + // Setup the dirty set with entries to + // verify that the dirty set it cleaned up + // when the service is not found. + setupTaskReaperDirty(taskReaper) + // call tick directly and verify dirty set was cleaned up. + taskReaper.tick() + assert.Zero(t, len(taskReaper.dirty)) + + // Test # 2 + // Verify that the dirty set it cleaned up + // when the history limit is set to zero. + + // Create a service in the store for the following test cases. + service1 := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name1", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 1, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + // Turn off restart to get an accurate count on tasks. + Condition: api.RestartOnNone, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create another service in the store for the following test cases. + service2 := &api.Service{ + ID: "id2", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "name2", + }, + Mode: &api.ServiceSpec_Replicated{ + Replicated: &api.ReplicatedService{ + Replicas: 1, + }, + }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + // Turn off restart to get an accurate count on tasks. + Condition: api.RestartOnNone, + Delay: gogotypes.DurationProto(0), + }, + }, + }, + } + + // Create a service. + err := s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, service1)) + assert.NoError(t, store.CreateService(tx, service2)) + return nil + }) + assert.NoError(t, err) + + // Setup the dirty set with entries to + // verify that the dirty set it cleaned up + // when the history limit is set to zero. + setupTaskReaperDirty(taskReaper) + taskReaper.taskHistory = 0 + // call tick directly and verify dirty set was cleaned up. + taskReaper.tick() + assert.Zero(t, len(taskReaper.dirty)) + + // Test # 3 + // Test that the tasks are cleanup when the total number of tasks + // is greater than the retention limit. + + // Create tasks for both services in the store. + task1 := &api.Task{ + ID: "id1task1", + Slot: 1, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + ServiceID: "id1", + ServiceAnnotations: api.Annotations{ + Name: "name1", + }, + } + + task2 := &api.Task{ + ID: "id2task1", + Slot: 1, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + ServiceID: "id2", + ServiceAnnotations: api.Annotations{ + Name: "name2", + }, + } + + // Create Tasks. + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateTask(tx, task1)) + assert.NoError(t, store.CreateTask(tx, task2)) + return nil + }) + assert.NoError(t, err) + + // Set history to 1 to ensure that the tasks are not cleaned up yet. + // At the same time, we should be able to test that the dirty set was + // cleaned up at the end of tick(). + taskReaper.taskHistory = 1 + setupTaskReaperDirty(taskReaper) + // call tick directly and verify dirty set was cleaned up. + taskReaper.tick() + assert.Zero(t, len(taskReaper.dirty)) + + // Now test that tick() function cleans up the old tasks from the store. + + // Create new tasks in the store for the same slots to simulate service update. + task1.Status.State = api.TaskStateNew + task1.DesiredState = api.TaskStateRunning + task1.ID = "id1task2" + task2.Status.State = api.TaskStateNew + task2.DesiredState = api.TaskStateRunning + task2.ID = "id2task2" + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateTask(tx, task1)) + assert.NoError(t, store.CreateTask(tx, task2)) + return nil + }) + assert.NoError(t, err) + + watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/) + defer cancel() + + // Setup the task reaper dirty set. + setupTaskReaperDirty(taskReaper) + // Call tick directly and verify dirty set was cleaned up. + taskReaper.tick() + assert.Zero(t, len(taskReaper.dirty)) + // Task reaper should delete the task previously marked for SHUTDOWN. + deletedTask1 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State) + assert.Equal(t, api.TaskStateShutdown, deletedTask1.DesiredState) + assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" || + deletedTask1.ServiceAnnotations.Name == "name2") + + deletedTask2 := testutils.WatchTaskDelete(t, watch) + assert.Equal(t, api.TaskStateShutdown, deletedTask2.Status.State) + assert.Equal(t, api.TaskStateShutdown, deletedTask2.DesiredState) + assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" || + deletedTask1.ServiceAnnotations.Name == "name2") +} From 5568420598c53672dfb189ef3d325564772c08bf Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Wed, 20 Jun 2018 13:29:15 -0700 Subject: [PATCH 5/5] Fix task reaper batching The batching logic of the task reaper was previously race-y because of its use of timer.Reset. This fixes the logic to guarantee it's not race-y. Signed-off-by: Drew Erny (cherry picked from commit 5291c7a7b45773a4fe18720a54485ee2dde0af3d) Signed-off-by: Sebastiaan van Stijn --- .../orchestrator/taskreaper/task_reaper.go | 66 +++++- .../taskreaper/task_reaper_test.go | 195 ++++++++++++++++++ 2 files changed, 259 insertions(+), 2 deletions(-) diff --git a/manager/orchestrator/taskreaper/task_reaper.go b/manager/orchestrator/taskreaper/task_reaper.go index 9dbb0851e6..e3c2b8265d 100644 --- a/manager/orchestrator/taskreaper/task_reaper.go +++ b/manager/orchestrator/taskreaper/task_reaper.go @@ -40,6 +40,12 @@ type TaskReaper struct { cleanup []string stopChan chan struct{} doneChan chan struct{} + + // tickSignal is a channel that, if non-nil and available, will be written + // to to signal that a tick has occurred. its sole purpose is for testing + // code, to verify that take cleanup attempts are happening when they + // should be. + tickSignal chan struct{} } // New creates a new TaskReaper. @@ -115,7 +121,34 @@ func (tr *TaskReaper) Run(ctx context.Context) { // Clean up when we hit TaskHistoryRetentionLimit or when the timer expires, // whichever happens first. + // + // Specifically, the way this should work: + // - Create a timer and immediately stop it. We don't want to fire the + // cleanup routine yet, because we just did a cleanup as part of the + // initialization above. + // - Launch into an event loop + // - When we receive an event, handle the event as needed + // - After receiving the event: + // - If minimum batch size (maxDirty) is exceeded with dirty + cleanup, + // then immediately launch into the cleanup routine + // - Otherwise, if the timer is stopped, start it (reset). + // - If the timer expires and the timer channel is signaled, then Stop the + // timer (so that it will be ready to be started again as needed), and + // execute the cleanup routine (tick) timer := time.NewTimer(reaperBatchingInterval) + timer.Stop() + + // If stop is somehow called AFTER the timer has expired, there will be a + // value in the timer.C channel. If there is such a value, we should drain + // it out. This select statement allows us to drain that value if it's + // present, or continue straight through otherwise. + select { + case <-timer.C: + default: + } + + // keep track with a boolean of whether the timer is currently stopped + isTimerStopped := true // Watch for: // 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot. @@ -153,16 +186,35 @@ func (tr *TaskReaper) Run(ctx context.Context) { } if len(tr.dirty)+len(tr.cleanup) > maxDirty { + // stop the timer, so we don't fire it. if we get another event + // after we do this cleaning, we will reset the timer then timer.Stop() + // if the timer had fired, drain out the value. + select { + case <-timer.C: + default: + } + isTimerStopped = true tr.tick() } else { - timer.Reset(reaperBatchingInterval) + if isTimerStopped { + timer.Reset(reaperBatchingInterval) + isTimerStopped = false + } } case <-timer.C: - timer.Stop() + // we can safely ignore draining off of the timer channel, because + // we already know that the timer has expired. + isTimerStopped = true tr.tick() case <-tr.stopChan: + // even though this doesn't really matter in this context, it's + // good hygiene to drain the value. timer.Stop() + select { + case <-timer.C: + default: + } return } } @@ -170,6 +222,16 @@ func (tr *TaskReaper) Run(ctx context.Context) { // tick performs task history cleanup. func (tr *TaskReaper) tick() { + // this signals that a tick has occurred. it exists solely for testing. + if tr.tickSignal != nil { + // try writing to this channel, but if it's full, fall straight through + // and ignore it. + select { + case tr.tickSignal <- struct{}{}: + default: + } + } + if len(tr.dirty) == 0 && len(tr.cleanup) == 0 { return } diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index f1fe7c69ef..db59248747 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -2,6 +2,9 @@ package taskreaper import ( "context" + "fmt" + "time" + "github.com/docker/swarmkit/manager/orchestrator" "testing" @@ -751,6 +754,198 @@ func TestServiceRemoveDeadTasks(t *testing.T) { assert.Len(t, foundTasks, 0) } +// TestTaskReaperBatching tests that the batching logic for the task reaper +// runs correctly. +func TestTaskReaperBatching(t *testing.T) { + // create a canned context and store to use with this task reaper + ctx := context.Background() + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + var ( + task1, task2, task3 *api.Task + tasks []*api.Task + ) + + // set up all of the test fixtures + assert.NoError(t, s.Update(func(tx store.Tx) error { + // we need a cluster object, because we need to set the retention limit + // to a low value + assert.NoError(t, store.CreateCluster(tx, &api.Cluster{ + ID: identity.NewID(), + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: store.DefaultClusterName, + }, + Orchestration: api.OrchestrationConfig{ + TaskHistoryRetentionLimit: 1, + }, + }, + })) + + task1 = &api.Task{ + ID: "foo", + ServiceID: "bar", + Slot: 0, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + } + // we need to create all of the tasks used in this test, because we'll + // be using task update events to trigger reaper behavior. + assert.NoError(t, store.CreateTask(tx, task1)) + + task2 = &api.Task{ + ID: "foo2", + ServiceID: "bar", + Slot: 1, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + } + assert.NoError(t, store.CreateTask(tx, task2)) + + tasks = make([]*api.Task, maxDirty+1) + for i := 0; i < maxDirty+1; i++ { + tasks[i] = &api.Task{ + ID: fmt.Sprintf("baz%v", i), + ServiceID: "bar", + // every task in a different slot, so they don't get cleaned up + // based on exceeding the retention limit + Slot: uint64(i), + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + } + if err := store.CreateTask(tx, tasks[i]); err != nil { + return err + } + } + + task3 = &api.Task{ + ID: "foo3", + ServiceID: "bar", + Slot: 2, + DesiredState: api.TaskStateShutdown, + Status: api.TaskStatus{ + State: api.TaskStateShutdown, + }, + } + assert.NoError(t, store.CreateTask(tx, task3)) + return nil + })) + + // now create the task reaper + taskReaper := New(s) + taskReaper.tickSignal = make(chan struct{}, 1) + defer taskReaper.Stop() + go taskReaper.Run(ctx) + + // None of the tasks we've created are eligible for deletion. We should + // see no task delete events. Wait for a tick signal, or 500ms to pass, to + // verify that no tick will occur. + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper ticked when it should not have") + case <-time.After(reaperBatchingInterval * 2): + // ok, looks good, moving on + } + + // update task1 to die + assert.NoError(t, s.Update(func(tx store.Tx) error { + task1.DesiredState = api.TaskStateRemove + return store.UpdateTask(tx, task1) + })) + + // the task should be added to the cleanup map and a tick should occur + // shortly. give it an extra 50ms for overhead + select { + case <-taskReaper.tickSignal: + case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)): + t.Fatalf("the taskreaper should have ticked but did not") + } + + // now wait and make sure the task reaper does not tick again + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper should not have ticked but did") + case <-time.After(reaperBatchingInterval * 2): + } + + // now make sure we'll tick again if we update another task to die + assert.NoError(t, s.Update(func(tx store.Tx) error { + task2.DesiredState = api.TaskStateRemove + return store.UpdateTask(tx, task2) + })) + + select { + case <-taskReaper.tickSignal: + case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)): + t.Fatalf("the taskreaper should have ticked by now but did not") + } + + // again, now wait and make sure the task reaper does not tick again + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper should not have ticked but did") + case <-time.After(reaperBatchingInterval * 2): + } + + // now create a shitload of tasks. this should tick immediately after, no + // waiting. we should easily within the batching interval be able to + // process all of these events, and should expect 1 tick immediately after + // and no more + assert.NoError(t, s.Update(func(tx store.Tx) error { + for _, task := range tasks { + task.DesiredState = api.TaskStateRemove + assert.NoError(t, store.UpdateTask(tx, task)) + } + return nil + })) + + select { + case <-taskReaper.tickSignal: + case <-time.After(reaperBatchingInterval): + // tight bound on the how long it should take to tick. we should tick + // before the reaper batching interval. this should only POSSIBLY fail + // on a really slow system, where processing the 1000+ incoming events + // takes longer than the reaperBatchingInterval. if this test flakes + // here, that's probably why. + t.Fatalf("we should have immediately ticked already, but did not") + } + + // again again, wait and make sure the task reaper does not tick again + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper should not have ticked but did") + case <-time.After(reaperBatchingInterval * 2): + } + + // now before we wrap up, make sure the task reaper still works off the + // timer + assert.NoError(t, s.Update(func(tx store.Tx) error { + task3.DesiredState = api.TaskStateRemove + return store.UpdateTask(tx, task3) + })) + + select { + case <-taskReaper.tickSignal: + case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)): + t.Fatalf("the taskreaper should have ticked by now but did not") + } + + // again, now wait and make sure the task reaper does not tick again + select { + case <-taskReaper.tickSignal: + t.Fatalf("the taskreaper should not have ticked but did") + case <-time.After(reaperBatchingInterval * 2): + } +} + // TestServiceRemoveDeadTasks tests removal of // tasks in state < TaskStateAssigned. func TestServiceRemoveUnassignedTasks(t *testing.T) {