diff --git a/manager/orchestrator/global/global_test.go b/manager/orchestrator/global/global_test.go index c7cede8d94..abb24fc307 100644 --- a/manager/orchestrator/global/global_test.go +++ b/manager/orchestrator/global/global_test.go @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc ctx := context.Background() // Start the global orchestrator. global := NewGlobalOrchestrator(store) - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, global.Run(ctx)) - }() + }) addService(t, store, service1) testutils.Expect(t, watch, api.EventCreateService{}) @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2") @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Nothing should happen because both tasks are up to date. select { @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskDelete(t, watch) assert.Equal(t, observedTask1.ID, "task2") @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Fail the running task s.Update(func(tx store.Tx) error { diff --git a/manager/orchestrator/replicated/update_test.go b/manager/orchestrator/replicated/update_test.go index 325a0141aa..53d6da728e 100644 --- a/manager/orchestrator/replicated/update_test.go +++ b/manager/orchestrator/replicated/update_test.go @@ -2,7 +2,7 @@ package replicated import ( "context" - "sync/atomic" + "sync" "testing" "time" @@ -27,7 +27,7 @@ func TestUpdaterRollback(t *testing.T) { } func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) { - // this test should complete within 20 seconds. if not, bail out + // this test should complete within 30 seconds. if not, bail out ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -37,14 +37,15 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa orchestrator := NewReplicatedOrchestrator(s) - // TODO(dperny): these are used with atomic.StoreUint32 and - // atomic.LoadUint32. using atomic primitives is bad practice and easy to - // mess up + // These variables will be used to signal that The Fail Loop should start + // failing these tasks. Once they're closed, The Failing Can Begin. var ( - failImage1 uint32 - failImage2 uint32 + failMu sync.Mutex + failImage1 bool ) + // create a watch for task creates, which we will use to verify that the + // updater works correctly. watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{}) defer cancelCreate() @@ -54,8 +55,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Fail new tasks the updater tries to run watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) defer cancelUpdate() - go func() { + + // We're gonna call this big chunk here "The Fail Loop". its job is to put + // tasks into a Failed state in certain conditions. + testutils.EnsureRuns(func() { failedLast := false + // typical go pattern: infinite for loop in a goroutine, exits on + // ctx.Done for { var e events.Event select { @@ -67,15 +73,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa if task.DesiredState == task.Status.State { continue } - if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning { + // This used to have a 3rd clause, + // "&& task.Status.State != api.TaskStateRunning" + // however, this is unneeded. If DesiredState is Running, then + // actual state cannot be Running, because that would get caught + // in the condition about (DesiredState == State) + if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed { err := s.Update(func(tx store.Tx) error { task = store.GetTask(tx, task.ID) - // Never fail two image2 tasks in a row, so there's a mix of - // failed and successful tasks for the rollback. - if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 { + // lock mutex governing access to failImage1. + failMu.Lock() + defer failMu.Unlock() + // we should start failing tasks with image1 only after1 + if task.Spec.GetContainer().Image == "image1" && failImage1 { + // only fail the task if we can read from failImage1 + // (which will only be true if it's closed) task.Status.State = api.TaskStateFailed failedLast = true - } else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast { + } else if task.Spec.GetContainer().Image == "image2" && !failedLast { + // Never fail two image2 tasks in a row, so there's a mix of + // failed and successful tasks for the rollback. task.Status.State = api.TaskStateFailed failedLast = true } else { @@ -94,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.NoError(t, err) } } - }() + }) // Create a service with four replicas specified before the orchestrator // is started. This should result in two tasks when the orchestrator @@ -153,23 +170,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Start the orchestrator. var orchestratorError error - orchestratorDone := make(chan struct{}) - // verify that the orchestrator has had a chance to run by blocking the - // main test routine until it has. - orchestratorRan := make(chan struct{}) - go func() { - close(orchestratorRan) - // try not to fail the test in go routines. it's racey. instead, save - // the error and check it in a defer + orchestratorDone := testutils.EnsureRuns(func() { orchestratorError = orchestrator.Run(ctx) - close(orchestratorDone) - }() - - select { - case <-orchestratorRan: - case <-ctx.Done(): - t.Error("orchestrator did not start before test timed out") - } + }) defer func() { orchestrator.Stop() @@ -196,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.Equal(t, observedTask.Status.State, api.TaskStateNew) assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1") - atomic.StoreUint32(&failImage2, 1) - // Start a rolling update err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") @@ -268,6 +269,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa var e events.Event select { case e = <-watchServiceUpdate: + t.Log("service was updated") case <-ctx.Done(): t.Error("test timed out before watchServiceUpdate provided an event") return @@ -278,10 +280,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa } } - atomic.StoreUint32(&failImage1, 1) - // Repeat the rolling update but this time fail the tasks that the // rollback creates. + failMu.Lock() + failImage1 = true + failMu.Unlock() + err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") require.NotNil(t, s1) diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index e347ec709c..0c6b9a9849 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) { reaper := New(s) // Now, start the reaper - go reaper.Run(ctx) + testutils.EnsureRuns(func() { reaper.Run(ctx) }) // And then stop the reaper. This will cause the reaper to run through its // whole init phase and then immediately enter the loop body, get the stop @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -664,10 +662,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator and the reaper. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) @@ -843,7 +841,7 @@ func TestTaskReaperBatching(t *testing.T) { taskReaper := New(s) taskReaper.tickSignal = make(chan struct{}, 1) defer taskReaper.Stop() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) // None of the tasks we've created are eligible for deletion. We should // see no task delete events. Wait for a tick signal, or 500ms to pass, to @@ -1010,10 +1008,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) diff --git a/manager/orchestrator/testutils/testutils.go b/manager/orchestrator/testutils/testutils.go index 0b3b851ee0..09602ec256 100644 --- a/manager/orchestrator/testutils/testutils.go +++ b/manager/orchestrator/testutils/testutils.go @@ -11,6 +11,22 @@ import ( "github.com/stretchr/testify/assert" ) +// EnsureRuns takes a closure and runs it in a goroutine, blocking until the +// goroutine has had an opportunity to run. It returns a channel which will be +// closed when the provided closure exits. +func EnsureRuns(closure func()) <-chan struct{} { + started := make(chan struct{}) + stopped := make(chan struct{}) + go func() { + close(started) + closure() + close(stopped) + }() + + <-started + return stopped +} + // WatchTaskCreate waits for a task to be created. func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { for { @@ -22,7 +38,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventUpdateTask); ok { assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event)) } - case <-time.After(2 * time.Second): + case <-time.After(3 * time.Second): assert.FailNow(t, "no task creation") } } @@ -39,7 +55,7 @@ func WatchTaskUpdate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventCreateTask); ok { assert.FailNow(t, "got EventCreateTask when expecting EventUpdateTask", fmt.Sprint(event)) } - case <-time.After(time.Second): + case <-time.After(2 * time.Second): assert.FailNow(t, "no task update") } } diff --git a/manager/orchestrator/update/updater.go b/manager/orchestrator/update/updater.go index 7c977dba1c..4e6a2cc0bd 100644 --- a/manager/orchestrator/update/updater.go +++ b/manager/orchestrator/update/updater.go @@ -501,7 +501,10 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove return fmt.Errorf("task %s not found while trying to shut it down", original.ID) } if t.DesiredState > api.TaskStateRunning { - return fmt.Errorf("task %s was already shut down when reached by updater", original.ID) + return fmt.Errorf( + "task %s was already shut down when reached by updater (state: %v)", + original.ID, t.DesiredState, + ) } t.DesiredState = api.TaskStateShutdown return store.UpdateTask(tx, t)