From 06a356671bc11e4fd5d754f257f9c5f93ec5c563 Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Wed, 16 Jan 2019 13:09:11 -0600 Subject: [PATCH] Fix flaky tests It is likely that a large portion of test flakiness, especially in CI, comes from the fact that swarmkit components under test are started in goroutines, but those goroutines never have an opportunity to run. This adds code ensuring those goroutines are scheduled and run, which should hopefully solve many inexplicably flaky tests. Additionally, increased test timeouts, to hopefully cover a few more flaky cases. Finally, removed direct use of the atomic package, in favor of less efficient but higher-level mutexes. Signed-off-by: Drew Erny --- manager/orchestrator/global/global_test.go | 36 +++++----- .../orchestrator/replicated/update_test.go | 72 ++++++++++--------- .../taskreaper/task_reaper_test.go | 34 +++++---- manager/orchestrator/testutils/testutils.go | 20 +++++- manager/orchestrator/update/updater.go | 5 +- 5 files changed, 94 insertions(+), 73 deletions(-) diff --git a/manager/orchestrator/global/global_test.go b/manager/orchestrator/global/global_test.go index c7cede8d94..abb24fc307 100644 --- a/manager/orchestrator/global/global_test.go +++ b/manager/orchestrator/global/global_test.go @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc ctx := context.Background() // Start the global orchestrator. global := NewGlobalOrchestrator(store) - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, global.Run(ctx)) - }() + }) addService(t, store, service1) testutils.Expect(t, watch, api.EventCreateService{}) @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2") @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Nothing should happen because both tasks are up to date. select { @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskDelete(t, watch) assert.Equal(t, observedTask1.ID, "task2") @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Fail the running task s.Update(func(tx store.Tx) error { diff --git a/manager/orchestrator/replicated/update_test.go b/manager/orchestrator/replicated/update_test.go index 325a0141aa..53d6da728e 100644 --- a/manager/orchestrator/replicated/update_test.go +++ b/manager/orchestrator/replicated/update_test.go @@ -2,7 +2,7 @@ package replicated import ( "context" - "sync/atomic" + "sync" "testing" "time" @@ -27,7 +27,7 @@ func TestUpdaterRollback(t *testing.T) { } func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) { - // this test should complete within 20 seconds. if not, bail out + // this test should complete within 30 seconds. if not, bail out ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -37,14 +37,15 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa orchestrator := NewReplicatedOrchestrator(s) - // TODO(dperny): these are used with atomic.StoreUint32 and - // atomic.LoadUint32. using atomic primitives is bad practice and easy to - // mess up + // These variables will be used to signal that The Fail Loop should start + // failing these tasks. Once they're closed, The Failing Can Begin. var ( - failImage1 uint32 - failImage2 uint32 + failMu sync.Mutex + failImage1 bool ) + // create a watch for task creates, which we will use to verify that the + // updater works correctly. watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{}) defer cancelCreate() @@ -54,8 +55,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Fail new tasks the updater tries to run watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) defer cancelUpdate() - go func() { + + // We're gonna call this big chunk here "The Fail Loop". its job is to put + // tasks into a Failed state in certain conditions. + testutils.EnsureRuns(func() { failedLast := false + // typical go pattern: infinite for loop in a goroutine, exits on + // ctx.Done for { var e events.Event select { @@ -67,15 +73,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa if task.DesiredState == task.Status.State { continue } - if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning { + // This used to have a 3rd clause, + // "&& task.Status.State != api.TaskStateRunning" + // however, this is unneeded. If DesiredState is Running, then + // actual state cannot be Running, because that would get caught + // in the condition about (DesiredState == State) + if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed { err := s.Update(func(tx store.Tx) error { task = store.GetTask(tx, task.ID) - // Never fail two image2 tasks in a row, so there's a mix of - // failed and successful tasks for the rollback. - if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 { + // lock mutex governing access to failImage1. + failMu.Lock() + defer failMu.Unlock() + // we should start failing tasks with image1 only after1 + if task.Spec.GetContainer().Image == "image1" && failImage1 { + // only fail the task if we can read from failImage1 + // (which will only be true if it's closed) task.Status.State = api.TaskStateFailed failedLast = true - } else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast { + } else if task.Spec.GetContainer().Image == "image2" && !failedLast { + // Never fail two image2 tasks in a row, so there's a mix of + // failed and successful tasks for the rollback. task.Status.State = api.TaskStateFailed failedLast = true } else { @@ -94,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.NoError(t, err) } } - }() + }) // Create a service with four replicas specified before the orchestrator // is started. This should result in two tasks when the orchestrator @@ -153,23 +170,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Start the orchestrator. var orchestratorError error - orchestratorDone := make(chan struct{}) - // verify that the orchestrator has had a chance to run by blocking the - // main test routine until it has. - orchestratorRan := make(chan struct{}) - go func() { - close(orchestratorRan) - // try not to fail the test in go routines. it's racey. instead, save - // the error and check it in a defer + orchestratorDone := testutils.EnsureRuns(func() { orchestratorError = orchestrator.Run(ctx) - close(orchestratorDone) - }() - - select { - case <-orchestratorRan: - case <-ctx.Done(): - t.Error("orchestrator did not start before test timed out") - } + }) defer func() { orchestrator.Stop() @@ -196,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.Equal(t, observedTask.Status.State, api.TaskStateNew) assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1") - atomic.StoreUint32(&failImage2, 1) - // Start a rolling update err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") @@ -268,6 +269,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa var e events.Event select { case e = <-watchServiceUpdate: + t.Log("service was updated") case <-ctx.Done(): t.Error("test timed out before watchServiceUpdate provided an event") return @@ -278,10 +280,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa } } - atomic.StoreUint32(&failImage1, 1) - // Repeat the rolling update but this time fail the tasks that the // rollback creates. + failMu.Lock() + failImage1 = true + failMu.Unlock() + err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") require.NotNil(t, s1) diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index e347ec709c..0c6b9a9849 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) { reaper := New(s) // Now, start the reaper - go reaper.Run(ctx) + testutils.EnsureRuns(func() { reaper.Run(ctx) }) // And then stop the reaper. This will cause the reaper to run through its // whole init phase and then immediately enter the loop body, get the stop @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -664,10 +662,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator and the reaper. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) @@ -843,7 +841,7 @@ func TestTaskReaperBatching(t *testing.T) { taskReaper := New(s) taskReaper.tickSignal = make(chan struct{}, 1) defer taskReaper.Stop() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) // None of the tasks we've created are eligible for deletion. We should // see no task delete events. Wait for a tick signal, or 500ms to pass, to @@ -1010,10 +1008,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) diff --git a/manager/orchestrator/testutils/testutils.go b/manager/orchestrator/testutils/testutils.go index 0b3b851ee0..09602ec256 100644 --- a/manager/orchestrator/testutils/testutils.go +++ b/manager/orchestrator/testutils/testutils.go @@ -11,6 +11,22 @@ import ( "github.com/stretchr/testify/assert" ) +// EnsureRuns takes a closure and runs it in a goroutine, blocking until the +// goroutine has had an opportunity to run. It returns a channel which will be +// closed when the provided closure exits. +func EnsureRuns(closure func()) <-chan struct{} { + started := make(chan struct{}) + stopped := make(chan struct{}) + go func() { + close(started) + closure() + close(stopped) + }() + + <-started + return stopped +} + // WatchTaskCreate waits for a task to be created. func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { for { @@ -22,7 +38,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventUpdateTask); ok { assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event)) } - case <-time.After(2 * time.Second): + case <-time.After(3 * time.Second): assert.FailNow(t, "no task creation") } } @@ -39,7 +55,7 @@ func WatchTaskUpdate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventCreateTask); ok { assert.FailNow(t, "got EventCreateTask when expecting EventUpdateTask", fmt.Sprint(event)) } - case <-time.After(time.Second): + case <-time.After(2 * time.Second): assert.FailNow(t, "no task update") } } diff --git a/manager/orchestrator/update/updater.go b/manager/orchestrator/update/updater.go index 7c977dba1c..4e6a2cc0bd 100644 --- a/manager/orchestrator/update/updater.go +++ b/manager/orchestrator/update/updater.go @@ -501,7 +501,10 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove return fmt.Errorf("task %s not found while trying to shut it down", original.ID) } if t.DesiredState > api.TaskStateRunning { - return fmt.Errorf("task %s was already shut down when reached by updater", original.ID) + return fmt.Errorf( + "task %s was already shut down when reached by updater (state: %v)", + original.ID, t.DesiredState, + ) } t.DesiredState = api.TaskStateShutdown return store.UpdateTask(tx, t)