From f970ef971e3bd409315bf8bb7d964035007697ec Mon Sep 17 00:00:00 2001 From: Olli Janatuinen Date: Wed, 10 Oct 2018 18:21:02 +0000 Subject: [PATCH 1/4] Increased wait time on test utils WaitForCluster and WatchTaskCreate Signed-off-by: Olli Janatuinen (cherry picked from commit 5f167cab731ee75bc6cf3888fd2a4a5b5194f924) Signed-off-by: Sebastiaan van Stijn --- agent/agent_test.go | 2 +- manager/orchestrator/testutils/testutils.go | 2 +- manager/state/raft/testutils/testutils.go | 13 +++++++++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 8b84e957b2..be31631eb6 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -354,7 +354,7 @@ func TestSessionReconnectsIfDispatcherErrors(t *testing.T) { return fmt.Errorf("expecting 2 closed sessions, got %d", len(closedSessions)) } return nil - }, 5*time.Second)) + }, 10*time.Second)) } type testSessionTracker struct { diff --git a/manager/orchestrator/testutils/testutils.go b/manager/orchestrator/testutils/testutils.go index 5c6fe2de43..0b3b851ee0 100644 --- a/manager/orchestrator/testutils/testutils.go +++ b/manager/orchestrator/testutils/testutils.go @@ -22,7 +22,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventUpdateTask); ok { assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event)) } - case <-time.After(time.Second): + case <-time.After(2 * time.Second): assert.FailNow(t, "no task creation") } } diff --git a/manager/state/raft/testutils/testutils.go b/manager/state/raft/testutils/testutils.go index dae38b9afa..87f02b289f 100644 --- a/manager/state/raft/testutils/testutils.go +++ b/manager/state/raft/testutils/testutils.go @@ -61,13 +61,15 @@ func AdvanceTicks(clockSource *fakeclock.FakeClock, ticks int) { func WaitForCluster(t *testing.T, clockSource *fakeclock.FakeClock, nodes map[uint64]*TestNode) { err := testutils.PollFunc(clockSource, func() error { var prev *etcdraft.Status + var leadNode *TestNode nodeLoop: for _, n := range nodes { if prev == nil { prev = new(etcdraft.Status) *prev = n.Status() for _, n2 := range nodes { - if n2.Config.ID == prev.Lead && n2.ReadyForProposals() { + if n2.Config.ID == prev.Lead { + leadNode = n2 continue nodeLoop } } @@ -85,7 +87,14 @@ func WaitForCluster(t *testing.T, clockSource *fakeclock.FakeClock, nodes map[ui } return errors.New("did not find leader in member list") } - return nil + // Don't raise error just because test machine is running slowly + for i := 0; i < 5; i++ { + if leadNode.ReadyForProposals() { + return nil + } + time.Sleep(2 * time.Second) + } + return errors.New("leader is not ready") }) require.NoError(t, err) } From 2d28e8fda58d489bd17b1b92c60c8a71ebae762a Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Wed, 16 Jan 2019 12:33:43 -0600 Subject: [PATCH 2/4] attempt to fix weirdly broken tests Signed-off-by: Drew Erny (cherry picked from commit be26111c4a48c44fac04c17c69fd2504aea6db91) Signed-off-by: Sebastiaan van Stijn --- .../orchestrator/replicated/update_test.go | 84 +++++++++++++++++-- 1 file changed, 75 insertions(+), 9 deletions(-) diff --git a/manager/orchestrator/replicated/update_test.go b/manager/orchestrator/replicated/update_test.go index 8170b16a0e..ccc084b846 100644 --- a/manager/orchestrator/replicated/update_test.go +++ b/manager/orchestrator/replicated/update_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/orchestrator/testutils" "github.com/docker/swarmkit/manager/state" @@ -26,14 +27,19 @@ func TestUpdaterRollback(t *testing.T) { } func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) { - ctx := context.Background() + // this test should complete within 20 seconds. if not, bail out + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + s := store.NewMemoryStore(nil) assert.NotNil(t, s) defer s.Close() orchestrator := NewReplicatedOrchestrator(s) - defer orchestrator.Stop() + // TODO(dperny): these are used with atomic.StoreUint32 and + // atomic.LoadUint32. using atomic primitives is bad practice and easy to + // mess up var ( failImage1 uint32 failImage2 uint32 @@ -51,7 +57,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa go func() { failedLast := false for { - e := <-watchUpdate + var e events.Event + select { + case e = <-watchUpdate: + case <-ctx.Done(): + return + } task := e.(api.EventUpdateTask).Task if task.DesiredState == task.Status.State { continue @@ -141,8 +152,32 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.NoError(t, err) // Start the orchestrator. + var orchestratorError error + orchestratorDone := make(chan struct{}) + // verify that the orchestrator has had a chance to run by blocking the + // main test routine until it has. + orchestratorRan := make(chan struct{}) go func() { - assert.NoError(t, orchestrator.Run(ctx)) + close(orchestratorRan) + // try not to fail the test in go routines. it's racey. instead, save + // the error and check it in a defer + orchestratorError = orchestrator.Run(ctx) + close(orchestratorDone) + }() + + select { + case <-orchestratorRan: + case <-ctx.Done(): + t.Error("orchestrator did not start before test timed out") + } + + defer func() { + orchestrator.Stop() + select { + case <-ctx.Done(): + case <-orchestratorDone: + assert.NoError(t, orchestratorError) + } }() observedTask := testutils.WatchTaskCreate(t, watchCreate) @@ -195,7 +230,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Should get to the ROLLBACK_STARTED state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } if e.(api.EventUpdateService).Service.UpdateStatus == nil { continue } @@ -224,7 +265,14 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Should end up in ROLLBACK_COMPLETED state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } + if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_COMPLETED { break } @@ -265,7 +313,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Should get to the ROLLBACK_STARTED state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } if e.(api.EventUpdateService).Service.UpdateStatus == nil { continue } @@ -290,7 +344,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa case api.UpdateConfig_PAUSE: // Should end up in ROLLBACK_PAUSED state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_PAUSED { return } @@ -298,7 +358,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa case api.UpdateConfig_CONTINUE: // Should end up in ROLLBACK_COMPLETE state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_COMPLETED { return } From f5adf368c385661a37ac17304f821aefa68b8f3c Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Wed, 16 Jan 2019 13:09:11 -0600 Subject: [PATCH 3/4] Fix flaky tests It is likely that a large portion of test flakiness, especially in CI, comes from the fact that swarmkit components under test are started in goroutines, but those goroutines never have an opportunity to run. This adds code ensuring those goroutines are scheduled and run, which should hopefully solve many inexplicably flaky tests. Additionally, increased test timeouts, to hopefully cover a few more flaky cases. Finally, removed direct use of the atomic package, in favor of less efficient but higher-level mutexes. Signed-off-by: Drew Erny (cherry picked from commit 06a356671bc11e4fd5d754f257f9c5f93ec5c563) Signed-off-by: Sebastiaan van Stijn --- manager/orchestrator/global/global_test.go | 36 +++++----- .../orchestrator/replicated/update_test.go | 72 ++++++++++--------- .../taskreaper/task_reaper_test.go | 34 +++++---- manager/orchestrator/testutils/testutils.go | 20 +++++- manager/orchestrator/update/updater.go | 5 +- 5 files changed, 94 insertions(+), 73 deletions(-) diff --git a/manager/orchestrator/global/global_test.go b/manager/orchestrator/global/global_test.go index 4a88f763c9..9d83e3c14c 100644 --- a/manager/orchestrator/global/global_test.go +++ b/manager/orchestrator/global/global_test.go @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc ctx := context.Background() // Start the global orchestrator. global := NewGlobalOrchestrator(store) - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, global.Run(ctx)) - }() + }) addService(t, store, service1) testutils.Expect(t, watch, api.EventCreateService{}) @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2") @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Nothing should happen because both tasks are up to date. select { @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskDelete(t, watch) assert.Equal(t, observedTask1.ID, "task2") @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Fail the running task s.Update(func(tx store.Tx) error { diff --git a/manager/orchestrator/replicated/update_test.go b/manager/orchestrator/replicated/update_test.go index ccc084b846..cc6d6e820e 100644 --- a/manager/orchestrator/replicated/update_test.go +++ b/manager/orchestrator/replicated/update_test.go @@ -1,7 +1,7 @@ package replicated import ( - "sync/atomic" + "sync" "testing" "time" @@ -27,7 +27,7 @@ func TestUpdaterRollback(t *testing.T) { } func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) { - // this test should complete within 20 seconds. if not, bail out + // this test should complete within 30 seconds. if not, bail out ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -37,14 +37,15 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa orchestrator := NewReplicatedOrchestrator(s) - // TODO(dperny): these are used with atomic.StoreUint32 and - // atomic.LoadUint32. using atomic primitives is bad practice and easy to - // mess up + // These variables will be used to signal that The Fail Loop should start + // failing these tasks. Once they're closed, The Failing Can Begin. var ( - failImage1 uint32 - failImage2 uint32 + failMu sync.Mutex + failImage1 bool ) + // create a watch for task creates, which we will use to verify that the + // updater works correctly. watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{}) defer cancelCreate() @@ -54,8 +55,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Fail new tasks the updater tries to run watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) defer cancelUpdate() - go func() { + + // We're gonna call this big chunk here "The Fail Loop". its job is to put + // tasks into a Failed state in certain conditions. + testutils.EnsureRuns(func() { failedLast := false + // typical go pattern: infinite for loop in a goroutine, exits on + // ctx.Done for { var e events.Event select { @@ -67,15 +73,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa if task.DesiredState == task.Status.State { continue } - if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning { + // This used to have a 3rd clause, + // "&& task.Status.State != api.TaskStateRunning" + // however, this is unneeded. If DesiredState is Running, then + // actual state cannot be Running, because that would get caught + // in the condition about (DesiredState == State) + if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed { err := s.Update(func(tx store.Tx) error { task = store.GetTask(tx, task.ID) - // Never fail two image2 tasks in a row, so there's a mix of - // failed and successful tasks for the rollback. - if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 { + // lock mutex governing access to failImage1. + failMu.Lock() + defer failMu.Unlock() + // we should start failing tasks with image1 only after1 + if task.Spec.GetContainer().Image == "image1" && failImage1 { + // only fail the task if we can read from failImage1 + // (which will only be true if it's closed) task.Status.State = api.TaskStateFailed failedLast = true - } else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast { + } else if task.Spec.GetContainer().Image == "image2" && !failedLast { + // Never fail two image2 tasks in a row, so there's a mix of + // failed and successful tasks for the rollback. task.Status.State = api.TaskStateFailed failedLast = true } else { @@ -94,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.NoError(t, err) } } - }() + }) // Create a service with four replicas specified before the orchestrator // is started. This should result in two tasks when the orchestrator @@ -153,23 +170,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Start the orchestrator. var orchestratorError error - orchestratorDone := make(chan struct{}) - // verify that the orchestrator has had a chance to run by blocking the - // main test routine until it has. - orchestratorRan := make(chan struct{}) - go func() { - close(orchestratorRan) - // try not to fail the test in go routines. it's racey. instead, save - // the error and check it in a defer + orchestratorDone := testutils.EnsureRuns(func() { orchestratorError = orchestrator.Run(ctx) - close(orchestratorDone) - }() - - select { - case <-orchestratorRan: - case <-ctx.Done(): - t.Error("orchestrator did not start before test timed out") - } + }) defer func() { orchestrator.Stop() @@ -196,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.Equal(t, observedTask.Status.State, api.TaskStateNew) assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1") - atomic.StoreUint32(&failImage2, 1) - // Start a rolling update err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") @@ -268,6 +269,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa var e events.Event select { case e = <-watchServiceUpdate: + t.Log("service was updated") case <-ctx.Done(): t.Error("test timed out before watchServiceUpdate provided an event") return @@ -278,10 +280,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa } } - atomic.StoreUint32(&failImage1, 1) - // Repeat the rolling update but this time fail the tasks that the // rollback creates. + failMu.Lock() + failImage1 = true + failMu.Unlock() + err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") require.NotNil(t, s1) diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index e347ec709c..0c6b9a9849 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) { reaper := New(s) // Now, start the reaper - go reaper.Run(ctx) + testutils.EnsureRuns(func() { reaper.Run(ctx) }) // And then stop the reaper. This will cause the reaper to run through its // whole init phase and then immediately enter the loop body, get the stop @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -664,10 +662,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator and the reaper. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) @@ -843,7 +841,7 @@ func TestTaskReaperBatching(t *testing.T) { taskReaper := New(s) taskReaper.tickSignal = make(chan struct{}, 1) defer taskReaper.Stop() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) // None of the tasks we've created are eligible for deletion. We should // see no task delete events. Wait for a tick signal, or 500ms to pass, to @@ -1010,10 +1008,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) diff --git a/manager/orchestrator/testutils/testutils.go b/manager/orchestrator/testutils/testutils.go index 0b3b851ee0..09602ec256 100644 --- a/manager/orchestrator/testutils/testutils.go +++ b/manager/orchestrator/testutils/testutils.go @@ -11,6 +11,22 @@ import ( "github.com/stretchr/testify/assert" ) +// EnsureRuns takes a closure and runs it in a goroutine, blocking until the +// goroutine has had an opportunity to run. It returns a channel which will be +// closed when the provided closure exits. +func EnsureRuns(closure func()) <-chan struct{} { + started := make(chan struct{}) + stopped := make(chan struct{}) + go func() { + close(started) + closure() + close(stopped) + }() + + <-started + return stopped +} + // WatchTaskCreate waits for a task to be created. func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { for { @@ -22,7 +38,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventUpdateTask); ok { assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event)) } - case <-time.After(2 * time.Second): + case <-time.After(3 * time.Second): assert.FailNow(t, "no task creation") } } @@ -39,7 +55,7 @@ func WatchTaskUpdate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventCreateTask); ok { assert.FailNow(t, "got EventCreateTask when expecting EventUpdateTask", fmt.Sprint(event)) } - case <-time.After(time.Second): + case <-time.After(2 * time.Second): assert.FailNow(t, "no task update") } } diff --git a/manager/orchestrator/update/updater.go b/manager/orchestrator/update/updater.go index 5a7d61231c..0a690648fc 100644 --- a/manager/orchestrator/update/updater.go +++ b/manager/orchestrator/update/updater.go @@ -502,7 +502,10 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove return fmt.Errorf("task %s not found while trying to shut it down", original.ID) } if t.DesiredState > api.TaskStateRunning { - return fmt.Errorf("task %s was already shut down when reached by updater", original.ID) + return fmt.Errorf( + "task %s was already shut down when reached by updater (state: %v)", + original.ID, t.DesiredState, + ) } t.DesiredState = api.TaskStateShutdown return store.UpdateTask(tx, t) From 5fca4d731cd6c28628fbeade010efee518309996 Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Tue, 16 Jul 2019 12:26:54 -0500 Subject: [PATCH 4/4] Fix update out of sequence A simple but old error has recently become evident. Due to the fact that we read an object and then write it back across the boundaries of a transaction, it is possible for the task object to have changed in between transactions. This would cause the attempt to write out the old task to suffer an "Update out of sequence" error. This fix simply reads the latest version of the task back out within the boundary of a transaction to avoid the race. Signed-off-by: Drew Erny (cherry picked from commit d68ac46e3b11d7384472677d210bb0ce941284dc) Signed-off-by: Sebastiaan van Stijn --- manager/orchestrator/service.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/manager/orchestrator/service.go b/manager/orchestrator/service.go index 7356c38cd5..c9912cdced 100644 --- a/manager/orchestrator/service.go +++ b/manager/orchestrator/service.go @@ -46,22 +46,27 @@ func SetServiceTasksRemove(ctx context.Context, s *store.MemoryStore, service *a err = s.Batch(func(batch *store.Batch) error { for _, t := range tasks { err := batch.Update(func(tx store.Tx) error { + // the task may have changed for some reason in the meantime + // since we read it out, so we need to get from the store again + // within the boundaries of a transaction + latestTask := store.GetTask(tx, t.ID) + // time travel is not allowed. if the current desired state is // above the one we're trying to go to we can't go backwards. // we have nothing to do and we should skip to the next task - if t.DesiredState > api.TaskStateRemove { + if latestTask.DesiredState > api.TaskStateRemove { // log a warning, though. we shouln't be trying to rewrite // a state to an earlier state log.G(ctx).Warnf( "cannot update task %v in desired state %v to an earlier desired state %v", - t.ID, t.DesiredState, api.TaskStateRemove, + latestTask.ID, latestTask.DesiredState, api.TaskStateRemove, ) return nil } // update desired state to REMOVE - t.DesiredState = api.TaskStateRemove + latestTask.DesiredState = api.TaskStateRemove - if err := store.UpdateTask(tx, t); err != nil { + if err := store.UpdateTask(tx, latestTask); err != nil { log.G(ctx).WithError(err).Errorf("failed transaction: update task desired state to REMOVE") } return nil