From 6a454da90ce5065a76f57734f15f62ea59e95604 Mon Sep 17 00:00:00 2001 From: Olli Janatuinen Date: Wed, 10 Oct 2018 18:21:02 +0000 Subject: [PATCH 1/6] Increased wait time on test utils WaitForCluster and WatchTaskCreate Signed-off-by: Olli Janatuinen (cherry picked from commit 5f167cab731ee75bc6cf3888fd2a4a5b5194f924) Signed-off-by: Sebastiaan van Stijn --- agent/agent_test.go | 2 +- manager/orchestrator/testutils/testutils.go | 2 +- manager/state/raft/testutils/testutils.go | 13 +++++++++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 49c33b908a..0522b5bb88 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -355,7 +355,7 @@ func TestSessionReconnectsIfDispatcherErrors(t *testing.T) { return fmt.Errorf("expecting 2 closed sessions, got %d", len(closedSessions)) } return nil - }, 5*time.Second)) + }, 10*time.Second)) } type testSessionTracker struct { diff --git a/manager/orchestrator/testutils/testutils.go b/manager/orchestrator/testutils/testutils.go index 5c6fe2de43..0b3b851ee0 100644 --- a/manager/orchestrator/testutils/testutils.go +++ b/manager/orchestrator/testutils/testutils.go @@ -22,7 +22,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventUpdateTask); ok { assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event)) } - case <-time.After(time.Second): + case <-time.After(2 * time.Second): assert.FailNow(t, "no task creation") } } diff --git a/manager/state/raft/testutils/testutils.go b/manager/state/raft/testutils/testutils.go index 91dc6c4c69..de247e4b6a 100644 --- a/manager/state/raft/testutils/testutils.go +++ b/manager/state/raft/testutils/testutils.go @@ -60,13 +60,15 @@ func AdvanceTicks(clockSource *fakeclock.FakeClock, ticks int) { func WaitForCluster(t *testing.T, clockSource *fakeclock.FakeClock, nodes map[uint64]*TestNode) { err := testutils.PollFunc(clockSource, func() error { var prev *etcdraft.Status + var leadNode *TestNode nodeLoop: for _, n := range nodes { if prev == nil { prev = new(etcdraft.Status) *prev = n.Status() for _, n2 := range nodes { - if n2.Config.ID == prev.Lead && n2.ReadyForProposals() { + if n2.Config.ID == prev.Lead { + leadNode = n2 continue nodeLoop } } @@ -84,7 +86,14 @@ func WaitForCluster(t *testing.T, clockSource *fakeclock.FakeClock, nodes map[ui } return errors.New("did not find leader in member list") } - return nil + // Don't raise error just because test machine is running slowly + for i := 0; i < 5; i++ { + if leadNode.ReadyForProposals() { + return nil + } + time.Sleep(2 * time.Second) + } + return errors.New("leader is not ready") }) require.NoError(t, err) } From 6a2573e3e576827453ca3a8d37c8edfbb47c9721 Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Wed, 16 Jan 2019 12:33:43 -0600 Subject: [PATCH 2/6] attempt to fix weirdly broken tests Signed-off-by: Drew Erny (cherry picked from commit be26111c4a48c44fac04c17c69fd2504aea6db91) Signed-off-by: Sebastiaan van Stijn --- .../orchestrator/replicated/update_test.go | 84 +++++++++++++++++-- 1 file changed, 75 insertions(+), 9 deletions(-) diff --git a/manager/orchestrator/replicated/update_test.go b/manager/orchestrator/replicated/update_test.go index 45dacac6f6..325a0141aa 100644 --- a/manager/orchestrator/replicated/update_test.go +++ b/manager/orchestrator/replicated/update_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/orchestrator/testutils" "github.com/docker/swarmkit/manager/state" @@ -26,14 +27,19 @@ func TestUpdaterRollback(t *testing.T) { } func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) { - ctx := context.Background() + // this test should complete within 20 seconds. if not, bail out + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + s := store.NewMemoryStore(nil) assert.NotNil(t, s) defer s.Close() orchestrator := NewReplicatedOrchestrator(s) - defer orchestrator.Stop() + // TODO(dperny): these are used with atomic.StoreUint32 and + // atomic.LoadUint32. using atomic primitives is bad practice and easy to + // mess up var ( failImage1 uint32 failImage2 uint32 @@ -51,7 +57,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa go func() { failedLast := false for { - e := <-watchUpdate + var e events.Event + select { + case e = <-watchUpdate: + case <-ctx.Done(): + return + } task := e.(api.EventUpdateTask).Task if task.DesiredState == task.Status.State { continue @@ -141,8 +152,32 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.NoError(t, err) // Start the orchestrator. + var orchestratorError error + orchestratorDone := make(chan struct{}) + // verify that the orchestrator has had a chance to run by blocking the + // main test routine until it has. + orchestratorRan := make(chan struct{}) go func() { - assert.NoError(t, orchestrator.Run(ctx)) + close(orchestratorRan) + // try not to fail the test in go routines. it's racey. instead, save + // the error and check it in a defer + orchestratorError = orchestrator.Run(ctx) + close(orchestratorDone) + }() + + select { + case <-orchestratorRan: + case <-ctx.Done(): + t.Error("orchestrator did not start before test timed out") + } + + defer func() { + orchestrator.Stop() + select { + case <-ctx.Done(): + case <-orchestratorDone: + assert.NoError(t, orchestratorError) + } }() observedTask := testutils.WatchTaskCreate(t, watchCreate) @@ -195,7 +230,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Should get to the ROLLBACK_STARTED state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } if e.(api.EventUpdateService).Service.UpdateStatus == nil { continue } @@ -224,7 +265,14 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Should end up in ROLLBACK_COMPLETED state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } + if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_COMPLETED { break } @@ -265,7 +313,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Should get to the ROLLBACK_STARTED state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } if e.(api.EventUpdateService).Service.UpdateStatus == nil { continue } @@ -290,7 +344,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa case api.UpdateConfig_PAUSE: // Should end up in ROLLBACK_PAUSED state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_PAUSED { return } @@ -298,7 +358,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa case api.UpdateConfig_CONTINUE: // Should end up in ROLLBACK_COMPLETE state for { - e := <-watchServiceUpdate + var e events.Event + select { + case e = <-watchServiceUpdate: + case <-ctx.Done(): + t.Error("test timed out before watchServiceUpdate provided an event") + return + } if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_COMPLETED { return } From 6db8d2da5d34e8fdc68fc2172f093269c3a9c046 Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Wed, 16 Jan 2019 13:09:11 -0600 Subject: [PATCH 3/6] Fix flaky tests It is likely that a large portion of test flakiness, especially in CI, comes from the fact that swarmkit components under test are started in goroutines, but those goroutines never have an opportunity to run. This adds code ensuring those goroutines are scheduled and run, which should hopefully solve many inexplicably flaky tests. Additionally, increased test timeouts, to hopefully cover a few more flaky cases. Finally, removed direct use of the atomic package, in favor of less efficient but higher-level mutexes. Signed-off-by: Drew Erny (cherry picked from commit 06a356671bc11e4fd5d754f257f9c5f93ec5c563) Signed-off-by: Sebastiaan van Stijn --- manager/orchestrator/global/global_test.go | 36 +++++----- .../orchestrator/replicated/update_test.go | 72 ++++++++++--------- .../taskreaper/task_reaper_test.go | 34 +++++---- manager/orchestrator/testutils/testutils.go | 20 +++++- manager/orchestrator/update/updater.go | 5 +- 5 files changed, 94 insertions(+), 73 deletions(-) diff --git a/manager/orchestrator/global/global_test.go b/manager/orchestrator/global/global_test.go index c7cede8d94..abb24fc307 100644 --- a/manager/orchestrator/global/global_test.go +++ b/manager/orchestrator/global/global_test.go @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc ctx := context.Background() // Start the global orchestrator. global := NewGlobalOrchestrator(store) - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, global.Run(ctx)) - }() + }) addService(t, store, service1) testutils.Expect(t, watch, api.EventCreateService{}) @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2") @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Nothing should happen because both tasks are up to date. select { @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskDelete(t, watch) assert.Equal(t, observedTask1.ID, "task2") @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) observedTask1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, observedTask1.ID, "task1") @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) { orchestrator := NewGlobalOrchestrator(s) defer orchestrator.Stop() - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() + }) // Fail the running task s.Update(func(tx store.Tx) error { diff --git a/manager/orchestrator/replicated/update_test.go b/manager/orchestrator/replicated/update_test.go index 325a0141aa..53d6da728e 100644 --- a/manager/orchestrator/replicated/update_test.go +++ b/manager/orchestrator/replicated/update_test.go @@ -2,7 +2,7 @@ package replicated import ( "context" - "sync/atomic" + "sync" "testing" "time" @@ -27,7 +27,7 @@ func TestUpdaterRollback(t *testing.T) { } func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) { - // this test should complete within 20 seconds. if not, bail out + // this test should complete within 30 seconds. if not, bail out ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -37,14 +37,15 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa orchestrator := NewReplicatedOrchestrator(s) - // TODO(dperny): these are used with atomic.StoreUint32 and - // atomic.LoadUint32. using atomic primitives is bad practice and easy to - // mess up + // These variables will be used to signal that The Fail Loop should start + // failing these tasks. Once they're closed, The Failing Can Begin. var ( - failImage1 uint32 - failImage2 uint32 + failMu sync.Mutex + failImage1 bool ) + // create a watch for task creates, which we will use to verify that the + // updater works correctly. watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{}) defer cancelCreate() @@ -54,8 +55,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Fail new tasks the updater tries to run watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) defer cancelUpdate() - go func() { + + // We're gonna call this big chunk here "The Fail Loop". its job is to put + // tasks into a Failed state in certain conditions. + testutils.EnsureRuns(func() { failedLast := false + // typical go pattern: infinite for loop in a goroutine, exits on + // ctx.Done for { var e events.Event select { @@ -67,15 +73,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa if task.DesiredState == task.Status.State { continue } - if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning { + // This used to have a 3rd clause, + // "&& task.Status.State != api.TaskStateRunning" + // however, this is unneeded. If DesiredState is Running, then + // actual state cannot be Running, because that would get caught + // in the condition about (DesiredState == State) + if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed { err := s.Update(func(tx store.Tx) error { task = store.GetTask(tx, task.ID) - // Never fail two image2 tasks in a row, so there's a mix of - // failed and successful tasks for the rollback. - if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 { + // lock mutex governing access to failImage1. + failMu.Lock() + defer failMu.Unlock() + // we should start failing tasks with image1 only after1 + if task.Spec.GetContainer().Image == "image1" && failImage1 { + // only fail the task if we can read from failImage1 + // (which will only be true if it's closed) task.Status.State = api.TaskStateFailed failedLast = true - } else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast { + } else if task.Spec.GetContainer().Image == "image2" && !failedLast { + // Never fail two image2 tasks in a row, so there's a mix of + // failed and successful tasks for the rollback. task.Status.State = api.TaskStateFailed failedLast = true } else { @@ -94,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.NoError(t, err) } } - }() + }) // Create a service with four replicas specified before the orchestrator // is started. This should result in two tasks when the orchestrator @@ -153,23 +170,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa // Start the orchestrator. var orchestratorError error - orchestratorDone := make(chan struct{}) - // verify that the orchestrator has had a chance to run by blocking the - // main test routine until it has. - orchestratorRan := make(chan struct{}) - go func() { - close(orchestratorRan) - // try not to fail the test in go routines. it's racey. instead, save - // the error and check it in a defer + orchestratorDone := testutils.EnsureRuns(func() { orchestratorError = orchestrator.Run(ctx) - close(orchestratorDone) - }() - - select { - case <-orchestratorRan: - case <-ctx.Done(): - t.Error("orchestrator did not start before test timed out") - } + }) defer func() { orchestrator.Stop() @@ -196,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa assert.Equal(t, observedTask.Status.State, api.TaskStateNew) assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1") - atomic.StoreUint32(&failImage2, 1) - // Start a rolling update err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") @@ -268,6 +269,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa var e events.Event select { case e = <-watchServiceUpdate: + t.Log("service was updated") case <-ctx.Done(): t.Error("test timed out before watchServiceUpdate provided an event") return @@ -278,10 +280,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa } } - atomic.StoreUint32(&failImage1, 1) - // Repeat the rolling update but this time fail the tasks that the // rollback creates. + failMu.Lock() + failImage1 = true + failMu.Unlock() + err = s.Update(func(tx store.Tx) error { s1 := store.GetService(tx, "id1") require.NotNil(t, s1) diff --git a/manager/orchestrator/taskreaper/task_reaper_test.go b/manager/orchestrator/taskreaper/task_reaper_test.go index e347ec709c..0c6b9a9849 100644 --- a/manager/orchestrator/taskreaper/task_reaper_test.go +++ b/manager/orchestrator/taskreaper/task_reaper_test.go @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) { reaper := New(s) // Now, start the reaper - go reaper.Run(ctx) + testutils.EnsureRuns(func() { reaper.Run(ctx) }) // And then stop the reaper. This will cause the reaper to run through its // whole init phase and then immediately enter the loop body, get the stop @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { - assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, observedTask1.Status.State, api.TaskStateNew) @@ -664,10 +662,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator and the reaper. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) @@ -843,7 +841,7 @@ func TestTaskReaperBatching(t *testing.T) { taskReaper := New(s) taskReaper.tickSignal = make(chan struct{}, 1) defer taskReaper.Stop() - go taskReaper.Run(ctx) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) // None of the tasks we've created are eligible for deletion. We should // see no task delete events. Wait for a tick signal, or 500ms to pass, to @@ -1010,10 +1008,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) { assert.NoError(t, err) // Start the orchestrator. - go func() { + testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) - }() - go taskReaper.Run(ctx) + }) + testutils.EnsureRuns(func() { taskReaper.Run(ctx) }) observedTask1 := testutils.WatchTaskCreate(t, watch) assert.Equal(t, api.TaskStateNew, observedTask1.Status.State) diff --git a/manager/orchestrator/testutils/testutils.go b/manager/orchestrator/testutils/testutils.go index 0b3b851ee0..09602ec256 100644 --- a/manager/orchestrator/testutils/testutils.go +++ b/manager/orchestrator/testutils/testutils.go @@ -11,6 +11,22 @@ import ( "github.com/stretchr/testify/assert" ) +// EnsureRuns takes a closure and runs it in a goroutine, blocking until the +// goroutine has had an opportunity to run. It returns a channel which will be +// closed when the provided closure exits. +func EnsureRuns(closure func()) <-chan struct{} { + started := make(chan struct{}) + stopped := make(chan struct{}) + go func() { + close(started) + closure() + close(stopped) + }() + + <-started + return stopped +} + // WatchTaskCreate waits for a task to be created. func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { for { @@ -22,7 +38,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventUpdateTask); ok { assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event)) } - case <-time.After(2 * time.Second): + case <-time.After(3 * time.Second): assert.FailNow(t, "no task creation") } } @@ -39,7 +55,7 @@ func WatchTaskUpdate(t *testing.T, watch chan events.Event) *api.Task { if _, ok := event.(api.EventCreateTask); ok { assert.FailNow(t, "got EventCreateTask when expecting EventUpdateTask", fmt.Sprint(event)) } - case <-time.After(time.Second): + case <-time.After(2 * time.Second): assert.FailNow(t, "no task update") } } diff --git a/manager/orchestrator/update/updater.go b/manager/orchestrator/update/updater.go index 7c977dba1c..4e6a2cc0bd 100644 --- a/manager/orchestrator/update/updater.go +++ b/manager/orchestrator/update/updater.go @@ -501,7 +501,10 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove return fmt.Errorf("task %s not found while trying to shut it down", original.ID) } if t.DesiredState > api.TaskStateRunning { - return fmt.Errorf("task %s was already shut down when reached by updater", original.ID) + return fmt.Errorf( + "task %s was already shut down when reached by updater (state: %v)", + original.ID, t.DesiredState, + ) } t.DesiredState = api.TaskStateShutdown return store.UpdateTask(tx, t) From ba217b76c4c856b8d727ae46a88859efbb170c7f Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Thu, 20 Jun 2019 14:02:39 -0500 Subject: [PATCH 4/6] add golangci-lint gometalinter is deprecated, and golangci-lint is its recommended successor. This commit adds golangci-lint as the linter for swarmkit. In addition, golangci-lint found a few issues in the code that were not yet identified, and so those issues have been fixed. Signed-off-by: Drew Erny (cherry picked from commit 27c2d27e23e76243bb49ec5d803a6e40b3f96f7a) Signed-off-by: Sebastiaan van Stijn --- .golangci.yml | 14 ++++++++++++++ .gometalinter.json | 17 ----------------- agent/agent.go | 2 +- agent/exec/dockerapi/controller.go | 2 +- agent/session.go | 4 ++-- cmd/swarmctl/service/flagparser/tmpfs.go | 2 +- direct.mk | 6 +++--- manager/dispatcher/dispatcher.go | 2 +- manager/drivers/provider.go | 2 +- manager/orchestrator/restart/restart.go | 9 +-------- 10 files changed, 25 insertions(+), 35 deletions(-) create mode 100644 .golangci.yml delete mode 100644 .gometalinter.json diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000000..694ae5c372 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,14 @@ +run: + tests: false +linters: + disable-all: true + enable: + - misspell + - gofmt + - goimports + - golint + - ineffassign + - deadcode + - unconvert + - govet + diff --git a/.gometalinter.json b/.gometalinter.json deleted file mode 100644 index 6710a180dc..0000000000 --- a/.gometalinter.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "Vendor": true, - "Exclude": [ - ".*\\.pb\\.go" - ], - "Enable": [ - "vet", - "misspell", - "gofmt", - "goimports", - "golint", - "ineffassign", - "deadcode", - "unconvert" - ], - "Deadline": "2m" -} diff --git a/agent/agent.go b/agent/agent.go index 743072f9da..58ebff5934 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -575,7 +575,7 @@ func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.No // Override hostname and TLS info if desc != nil { - if a.config.Hostname != "" && desc != nil { + if a.config.Hostname != "" { desc.Hostname = a.config.Hostname } desc.TLSInfo = tlsInfo diff --git a/agent/exec/dockerapi/controller.go b/agent/exec/dockerapi/controller.go index 1450fbdc36..abb2e15e90 100644 --- a/agent/exec/dockerapi/controller.go +++ b/agent/exec/dockerapi/controller.go @@ -654,7 +654,7 @@ func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) { return nil, err } - protocol := api.ProtocolTCP + var protocol api.PortConfig_Protocol switch strings.ToLower(parts[1]) { case "tcp": protocol = api.ProtocolTCP diff --git a/agent/session.go b/agent/session.go index 526953509b..152b5c4546 100644 --- a/agent/session.go +++ b/agent/session.go @@ -136,7 +136,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e // `ctx` is done and hence fail to propagate the timeout error to the agent. // If the error is not propogated to the agent, the agent will not close // the session or rebuild a new session. - sessionCtx, cancelSession := context.WithCancel(ctx) // nolint: vet + sessionCtx, cancelSession := context.WithCancel(ctx) //nolint:govet // Need to run Session in a goroutine since there's no way to set a // timeout for an individual Recv call in a stream. @@ -159,7 +159,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e select { case err := <-errChan: if err != nil { - return err // nolint: vet + return err //nolint:govet } case <-time.After(dispatcherRPCTimeout): cancelSession() diff --git a/cmd/swarmctl/service/flagparser/tmpfs.go b/cmd/swarmctl/service/flagparser/tmpfs.go index 0d7a0e276e..aab4509bb2 100644 --- a/cmd/swarmctl/service/flagparser/tmpfs.go +++ b/cmd/swarmctl/service/flagparser/tmpfs.go @@ -64,7 +64,7 @@ func parseTmpfs(flags *pflag.FlagSet, spec *api.ServiceSpec) error { // remove suffix and try again suffix := meat[len(meat)-1] meat = meat[:len(meat)-1] - var multiplier int64 = 1 + var multiplier int64 switch suffix { case 'g': multiplier = 1 << 30 diff --git a/direct.mk b/direct.mk index 8ac8348c22..fd1c32038b 100644 --- a/direct.mk +++ b/direct.mk @@ -17,8 +17,8 @@ version/version.go: setup: ## install dependencies @echo "🐳 $@" # TODO(stevvooe): Install these from the vendor directory - @go get -u github.com/alecthomas/gometalinter - @gometalinter --install + # install golangci-lint version 1.17.1 to ./bin/golangci-lint + @curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s v1.17.1 @go get -u github.com/lk4d4/vndr @go get -u github.com/stevvooe/protobuild @@ -43,7 +43,7 @@ checkprotos: generate ## check if protobufs needs to be generated again check: fmt-proto check: ## Run various source code validation tools @echo "🐳 $@" - @gometalinter ./... + @./bin/golangci-lint run .PHONY: fmt-proto fmt-proto: diff --git a/manager/dispatcher/dispatcher.go b/manager/dispatcher/dispatcher.go index 6149806470..d1db2fdc83 100644 --- a/manager/dispatcher/dispatcher.go +++ b/manager/dispatcher/dispatcher.go @@ -238,7 +238,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { if err != nil { return err } - if err == nil && len(clusters) == 1 { + if len(clusters) == 1 { heartbeatPeriod, err := gogotypes.DurationFromProto(clusters[0].Spec.Dispatcher.HeartbeatPeriod) if err == nil && heartbeatPeriod > 0 { d.config.HeartbeatPeriod = heartbeatPeriod diff --git a/manager/drivers/provider.go b/manager/drivers/provider.go index 0d9be6119d..97c36fe73d 100644 --- a/manager/drivers/provider.go +++ b/manager/drivers/provider.go @@ -22,7 +22,7 @@ func (m *DriverProvider) NewSecretDriver(driver *api.Driver) (*SecretDriver, err if m.pluginGetter == nil { return nil, fmt.Errorf("plugin getter is nil") } - if driver == nil && driver.Name == "" { + if driver == nil || driver.Name == "" { return nil, fmt.Errorf("driver specification is nil") } // Search for the specified plugin diff --git a/manager/orchestrator/restart/restart.go b/manager/orchestrator/restart/restart.go index c034183ba2..c79d02d98f 100644 --- a/manager/orchestrator/restart/restart.go +++ b/manager/orchestrator/restart/restart.go @@ -508,20 +508,13 @@ func (r *Supervisor) Cancel(taskID string) { <-delay.doneCh } -// CancelAll aborts all pending restarts and waits for any instances of -// StartNow that have already triggered to complete. +// CancelAll aborts all pending restarts func (r *Supervisor) CancelAll() { - var cancelled []delayedStart - r.mu.Lock() for _, delay := range r.delays { delay.cancel() } r.mu.Unlock() - - for _, delay := range cancelled { - <-delay.doneCh - } } // ClearServiceHistory forgets restart history related to a given service ID. From 177caa5af91a2cacbccf42426145fdb464a48c8b Mon Sep 17 00:00:00 2001 From: nmengin Date: Fri, 12 Jul 2019 18:03:08 +0200 Subject: [PATCH 5/6] Set bigger grpc value to initialize connection broker Signed-off-by: nmengin (cherry picked from commit 127e816ed8c8de4c981e02c9d0b45c70d3baa824) Signed-off-by: Sebastiaan van Stijn --- agent/session.go | 2 ++ manager/manager.go | 2 ++ node/node.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/agent/session.go b/agent/session.go index 152b5c4546..2e7f1b6a37 100644 --- a/agent/session.go +++ b/agent/session.go @@ -3,6 +3,7 @@ package agent import ( "context" "errors" + "math" "sync" "time" @@ -64,6 +65,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI cc, err := agent.config.ConnBroker.Select( grpc.WithTransportCredentials(agent.config.Credentials), grpc.WithTimeout(dispatcherRPCTimeout), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), ) if err != nil { diff --git a/manager/manager.go b/manager/manager.go index 90b2815d68..ba7b005665 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + "math" "net" "os" "path/filepath" @@ -758,6 +759,7 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { func(addr string, timeout time.Duration) (net.Conn, error) { return xnet.DialTimeoutLocal(addr, timeout) }), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), ) if err != nil { logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster") diff --git a/node/node.go b/node/node.go index 92bd74884e..1d3e6c4f50 100644 --- a/node/node.go +++ b/node/node.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/json" "io/ioutil" + "math" "net" "os" "path/filepath" @@ -896,6 +897,7 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) opts := []grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), } insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) opts = append(opts, grpc.WithTransportCredentials(insecureCreds)) From bfce89e3eeb7ff6cd3e2dc9b887e4a291f13a2b0 Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Tue, 16 Jul 2019 12:26:54 -0500 Subject: [PATCH 6/6] Fix update out of sequence A simple but old error has recently become evident. Due to the fact that we read an object and then write it back across the boundaries of a transaction, it is possible for the task object to have changed in between transactions. This would cause the attempt to write out the old task to suffer an "Update out of sequence" error. This fix simply reads the latest version of the task back out within the boundary of a transaction to avoid the race. Signed-off-by: Drew Erny (cherry picked from commit d68ac46e3b11d7384472677d210bb0ce941284dc) Signed-off-by: Sebastiaan van Stijn --- manager/orchestrator/service.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/manager/orchestrator/service.go b/manager/orchestrator/service.go index 037e493b30..c5d298c516 100644 --- a/manager/orchestrator/service.go +++ b/manager/orchestrator/service.go @@ -47,22 +47,27 @@ func SetServiceTasksRemove(ctx context.Context, s *store.MemoryStore, service *a err = s.Batch(func(batch *store.Batch) error { for _, t := range tasks { err := batch.Update(func(tx store.Tx) error { + // the task may have changed for some reason in the meantime + // since we read it out, so we need to get from the store again + // within the boundaries of a transaction + latestTask := store.GetTask(tx, t.ID) + // time travel is not allowed. if the current desired state is // above the one we're trying to go to we can't go backwards. // we have nothing to do and we should skip to the next task - if t.DesiredState > api.TaskStateRemove { + if latestTask.DesiredState > api.TaskStateRemove { // log a warning, though. we shouln't be trying to rewrite // a state to an earlier state log.G(ctx).Warnf( "cannot update task %v in desired state %v to an earlier desired state %v", - t.ID, t.DesiredState, api.TaskStateRemove, + latestTask.ID, latestTask.DesiredState, api.TaskStateRemove, ) return nil } // update desired state to REMOVE - t.DesiredState = api.TaskStateRemove + latestTask.DesiredState = api.TaskStateRemove - if err := store.UpdateTask(tx, t); err != nil { + if err := store.UpdateTask(tx, latestTask); err != nil { log.G(ctx).WithError(err).Errorf("failed transaction: update task desired state to REMOVE") } return nil