Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc
ctx := context.Background()
// Start the global orchestrator.
global := NewGlobalOrchestrator(store)
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, global.Run(ctx))
}()
})

addService(t, store, service1)
testutils.Expect(t, watch, api.EventCreateService{})
Expand Down Expand Up @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2")
Expand Down Expand Up @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Nothing should happen because both tasks are up to date.
select {
Expand Down Expand Up @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask1.ID, "task2")
Expand Down Expand Up @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Fail the running task
s.Update(func(tx store.Tx) error {
Expand Down
72 changes: 38 additions & 34 deletions manager/orchestrator/replicated/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package replicated

import (
"context"
"sync/atomic"
"sync"
"testing"
"time"

Expand All @@ -27,7 +27,7 @@ func TestUpdaterRollback(t *testing.T) {
}

func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) {
// this test should complete within 20 seconds. if not, bail out
// this test should complete within 30 seconds. if not, bail out
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -37,14 +37,15 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

orchestrator := NewReplicatedOrchestrator(s)

// TODO(dperny): these are used with atomic.StoreUint32 and
// atomic.LoadUint32. using atomic primitives is bad practice and easy to
// mess up
// These variables will be used to signal that The Fail Loop should start
// failing these tasks. Once they're closed, The Failing Can Begin.
var (
failImage1 uint32
failImage2 uint32
failMu sync.Mutex
failImage1 bool
)

// create a watch for task creates, which we will use to verify that the
// updater works correctly.
watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{})
defer cancelCreate()

Expand All @@ -54,8 +55,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
// Fail new tasks the updater tries to run
watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancelUpdate()
go func() {

// We're gonna call this big chunk here "The Fail Loop". its job is to put
// tasks into a Failed state in certain conditions.
testutils.EnsureRuns(func() {
failedLast := false
// typical go pattern: infinite for loop in a goroutine, exits on
// ctx.Done
for {
var e events.Event
select {
Expand All @@ -67,15 +73,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
if task.DesiredState == task.Status.State {
continue
}
if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning {
// This used to have a 3rd clause,
// "&& task.Status.State != api.TaskStateRunning"
// however, this is unneeded. If DesiredState is Running, then
// actual state cannot be Running, because that would get caught
// in the condition about (DesiredState == State)
if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed {
err := s.Update(func(tx store.Tx) error {
task = store.GetTask(tx, task.ID)
// Never fail two image2 tasks in a row, so there's a mix of
// failed and successful tasks for the rollback.
if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 {
// lock mutex governing access to failImage1.
failMu.Lock()
defer failMu.Unlock()
// we should start failing tasks with image1 only after1
if task.Spec.GetContainer().Image == "image1" && failImage1 {
// only fail the task if we can read from failImage1
// (which will only be true if it's closed)
task.Status.State = api.TaskStateFailed
failedLast = true
} else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast {
} else if task.Spec.GetContainer().Image == "image2" && !failedLast {
// Never fail two image2 tasks in a row, so there's a mix of
// failed and successful tasks for the rollback.
task.Status.State = api.TaskStateFailed
failedLast = true
} else {
Expand All @@ -94,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.NoError(t, err)
}
}
}()
})

// Create a service with four replicas specified before the orchestrator
// is started. This should result in two tasks when the orchestrator
Expand Down Expand Up @@ -153,23 +170,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

// Start the orchestrator.
var orchestratorError error
orchestratorDone := make(chan struct{})
// verify that the orchestrator has had a chance to run by blocking the
// main test routine until it has.
orchestratorRan := make(chan struct{})
go func() {
close(orchestratorRan)
// try not to fail the test in go routines. it's racey. instead, save
// the error and check it in a defer
orchestratorDone := testutils.EnsureRuns(func() {
orchestratorError = orchestrator.Run(ctx)
close(orchestratorDone)
}()

select {
case <-orchestratorRan:
case <-ctx.Done():
t.Error("orchestrator did not start before test timed out")
}
})

defer func() {
orchestrator.Stop()
Expand All @@ -196,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.Equal(t, observedTask.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1")

atomic.StoreUint32(&failImage2, 1)

// Start a rolling update
err = s.Update(func(tx store.Tx) error {
s1 := store.GetService(tx, "id1")
Expand Down Expand Up @@ -268,6 +269,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
var e events.Event
select {
case e = <-watchServiceUpdate:
t.Log("service was updated")
case <-ctx.Done():
t.Error("test timed out before watchServiceUpdate provided an event")
return
Expand All @@ -278,10 +280,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
}
}

atomic.StoreUint32(&failImage1, 1)

// Repeat the rolling update but this time fail the tasks that the
// rollback creates.
failMu.Lock()
failImage1 = true
failMu.Unlock()

err = s.Update(func(tx store.Tx) error {
s1 := store.GetService(tx, "id1")
require.NotNil(t, s1)
Expand Down
34 changes: 16 additions & 18 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) {
reaper := New(s)

// Now, start the reaper
go reaper.Run(ctx)
testutils.EnsureRuns(func() { reaper.Run(ctx) })

// And then stop the reaper. This will cause the reaper to run through its
// whole init phase and then immediately enter the loop body, get the stop
Expand Down Expand Up @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) })
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -664,10 +662,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator and the reaper.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
Expand Down Expand Up @@ -843,7 +841,7 @@ func TestTaskReaperBatching(t *testing.T) {
taskReaper := New(s)
taskReaper.tickSignal = make(chan struct{}, 1)
defer taskReaper.Stop()
go taskReaper.Run(ctx)
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

// None of the tasks we've created are eligible for deletion. We should
// see no task delete events. Wait for a tick signal, or 500ms to pass, to
Expand Down Expand Up @@ -1010,10 +1008,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
Expand Down
20 changes: 18 additions & 2 deletions manager/orchestrator/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ import (
"github.com/stretchr/testify/assert"
)

// EnsureRuns takes a closure and runs it in a goroutine, blocking until the
// goroutine has had an opportunity to run. It returns a channel which will be
// closed when the provided closure exits.
func EnsureRuns(closure func()) <-chan struct{} {
started := make(chan struct{})
stopped := make(chan struct{})
go func() {
close(started)
closure()
close(stopped)
}()

<-started
return stopped
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the use of the returned channel? It doesn't seem to be used in any of the calls

}

// WatchTaskCreate waits for a task to be created.
func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task {
for {
Expand All @@ -22,7 +38,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task {
if _, ok := event.(api.EventUpdateTask); ok {
assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event))
}
case <-time.After(2 * time.Second):
case <-time.After(3 * time.Second):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the rational behind increasing this by 50% here and 100% below?

assert.FailNow(t, "no task creation")
}
}
Expand All @@ -39,7 +55,7 @@ func WatchTaskUpdate(t *testing.T, watch chan events.Event) *api.Task {
if _, ok := event.(api.EventCreateTask); ok {
assert.FailNow(t, "got EventCreateTask when expecting EventUpdateTask", fmt.Sprint(event))
}
case <-time.After(time.Second):
case <-time.After(2 * time.Second):
assert.FailNow(t, "no task update")
}
}
Expand Down
Loading