Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 114 additions & 73 deletions manager/orchestrator/replicated/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/docker/swarmkit/manager/state/store"
gogotypes "github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -442,6 +443,9 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
},
},
},
SpecVersion: &api.Version{
Index: 1,
},
}
assert.NoError(t, store.CreateService(tx, j1))
return nil
Expand All @@ -453,89 +457,126 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
assert.NoError(t, orchestrator.Run(ctx))
}()

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")

observedTask2 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask2.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1")

// Fail the first task. Confirm that it gets restarted.
updatedTask1 := observedTask1.Copy()
updatedTask1.Status = api.TaskStatus{State: api.TaskStateFailed}
before := time.Now()
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask1))
return nil
})
assert.NoError(t, err)
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})

observedTask3 := testutils.WatchTaskCreate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask3.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady)
assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1")
testRestart := func() {
observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")

observedTask2 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask2.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1")

testutils.Expect(t, watch, state.EventCommit{})

// Fail the first task. Confirm that it gets restarted.
updatedTask1 := observedTask1.Copy()
updatedTask1.Status = api.TaskStatus{State: api.TaskStateFailed}
before := time.Now()
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask1))
return nil
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})

observedTask3 := testutils.WatchTaskCreate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask3.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady)
assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1")

observedTask4 := testutils.WatchTaskUpdate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})
after := time.Now()

// At least 100 ms should have elapsed. Only check the lower bound,
// because the system may be slow and it could have taken longer.
if after.Sub(before) < 100*time.Millisecond {
t.Fatal("restart delay should have elapsed")
}

observedTask4 := testutils.WatchTaskUpdate(t, watch)
after := time.Now()
assert.Equal(t, observedTask4.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask4.DesiredState, api.TaskStateRunning)
assert.Equal(t, observedTask4.ServiceAnnotations.Name, "name1")

// Fail the second task. Confirm that it gets restarted.
updatedTask2 := observedTask2.Copy()
updatedTask2.Status = api.TaskStatus{State: api.TaskStateFailed}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask2))
return nil
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})

observedTask5 := testutils.WatchTaskCreate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask5.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady)

observedTask6 := testutils.WatchTaskUpdate(t, watch) // task gets started after a delay
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask6.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask6.DesiredState, api.TaskStateRunning)
assert.Equal(t, observedTask6.ServiceAnnotations.Name, "name1")

// Fail the first instance again. It should not be restarted.
updatedTask1 = observedTask3.Copy()
updatedTask1.Status = api.TaskStatus{State: api.TaskStateFailed}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask1))
return nil
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
t.Fatal("got unexpected event")
case <-time.After(200 * time.Millisecond):
}

// At least 100 ms should have elapsed. Only check the lower bound,
// because the system may be slow and it could have taken longer.
if after.Sub(before) < 100*time.Millisecond {
t.Fatal("restart delay should have elapsed")
// Fail the second instance again. It should not be restarted.
updatedTask2 = observedTask5.Copy()
updatedTask2.Status = api.TaskStatus{State: api.TaskStateFailed}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask2))
return nil
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
t.Fatal("got unexpected event")
case <-time.After(200 * time.Millisecond):
}
}

assert.Equal(t, observedTask4.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask4.DesiredState, api.TaskStateRunning)
assert.Equal(t, observedTask4.ServiceAnnotations.Name, "name1")

// Fail the second task. Confirm that it gets restarted.
updatedTask2 := observedTask2.Copy()
updatedTask2.Status = api.TaskStatus{State: api.TaskStateFailed}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask2))
return nil
})
assert.NoError(t, err)
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})

observedTask5 := testutils.WatchTaskCreate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask5.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady)
testRestart()

observedTask6 := testutils.WatchTaskUpdate(t, watch) // task gets started after a delay
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask6.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask6.DesiredState, api.TaskStateRunning)
assert.Equal(t, observedTask6.ServiceAnnotations.Name, "name1")

// Fail the first instance again. It should not be restarted.
updatedTask1 = observedTask3.Copy()
updatedTask1.Status = api.TaskStatus{State: api.TaskStateFailed}
// Update the service spec
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask1))
s := store.GetService(tx, "id1")
require.NotNil(t, s)
s.Spec.Task.GetContainer().Image = "newimage"
s.SpecVersion.Index = 2
assert.NoError(t, store.UpdateService(tx, s))
return nil
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
t.Fatal("got unexpected event")
case <-time.After(200 * time.Millisecond):
}
testRestart()
}

func TestOrchestratorRestartWindow(t *testing.T) {
Expand Down
52 changes: 28 additions & 24 deletions manager/orchestrator/restart/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type instanceRestartInfo struct {
// Restart.MaxAttempts and Restart.Window are both
// nonzero.
restartedInstances *list.List
// Why is specVersion in this structure and not in the map key? While
// putting it in the key would be a very simple solution, it wouldn't
// be easy to clean up map entries corresponding to old specVersions.
// Making the key version-agnostic and clearing the value whenever the
// version changes avoids the issue of stale map entries for old
// versions.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 ❤️

specVersion api.Version
}

type delayedStart struct {
Expand All @@ -54,8 +61,7 @@ type Supervisor struct {
mu sync.Mutex
store *store.MemoryStore
delays map[string]*delayedStart
history map[instanceTuple]*instanceRestartInfo
historyByService map[string]map[instanceTuple]struct{}
historyByService map[string]map[instanceTuple]*instanceRestartInfo
TaskTimeout time.Duration
}

Expand All @@ -64,8 +70,7 @@ func NewSupervisor(store *store.MemoryStore) *Supervisor {
return &Supervisor{
store: store,
delays: make(map[string]*delayedStart),
history: make(map[instanceTuple]*instanceRestartInfo),
historyByService: make(map[string]map[instanceTuple]struct{}),
historyByService: make(map[string]map[instanceTuple]*instanceRestartInfo),
TaskTimeout: defaultOldTaskTimeout,
}
}
Expand Down Expand Up @@ -214,8 +219,8 @@ func (r *Supervisor) shouldRestart(ctx context.Context, t *api.Task, service *ap
r.mu.Lock()
defer r.mu.Unlock()

restartInfo := r.history[instanceTuple]
if restartInfo == nil {
restartInfo := r.historyByService[t.ServiceID][instanceTuple]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I'm mis-interpreting the TODO at the top;

// TODO(aluzzardi): This function should not depend on `service`.

But doesn't this make it even more dependent on service? (I agree it's cleaner 😄)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the TODO is saying that service should not be passed in as a function parameter. I don't see how this function could work without being aware of the service ID.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was thinking if the TODO meant: instance id's are globally unique, thus we only need to track per instance

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't have a concept of an instance ID. We track instances as a combination of service ID, and either a slot number or node ID.

if restartInfo == nil || (t.SpecVersion != nil && *t.SpecVersion != restartInfo.specVersion) {
return true
}

Expand Down Expand Up @@ -268,17 +273,26 @@ func (r *Supervisor) recordRestartHistory(restartTask *api.Task) {
r.mu.Lock()
defer r.mu.Unlock()

if r.history[tuple] == nil {
r.history[tuple] = &instanceRestartInfo{}
if r.historyByService[restartTask.ServiceID] == nil {
r.historyByService[restartTask.ServiceID] = make(map[instanceTuple]*instanceRestartInfo)
}
if r.historyByService[restartTask.ServiceID][tuple] == nil {
r.historyByService[restartTask.ServiceID][tuple] = &instanceRestartInfo{}
}

restartInfo := r.history[tuple]
restartInfo.totalRestarts++
restartInfo := r.historyByService[restartTask.ServiceID][tuple]

if r.historyByService[restartTask.ServiceID] == nil {
r.historyByService[restartTask.ServiceID] = make(map[instanceTuple]struct{})
if restartTask.SpecVersion != nil && *restartTask.SpecVersion != restartInfo.specVersion {
// This task has a different SpecVersion from the one we're
// tracking. Most likely, the service was updated. Past failures
// shouldn't count against the new service definition, so clear
// the history for this instance.
*restartInfo = instanceRestartInfo{
specVersion: *restartTask.SpecVersion,
}
}
r.historyByService[restartTask.ServiceID][tuple] = struct{}{}

restartInfo.totalRestarts++
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do we track max attempted restarts for a task, not for a service? Does this mean that if I have a service with two replicas, and one of those replica's keeps failing, that the replica is no longer restarted, but the other replica is kept running? (i.e., the service running in degraded mode)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess I didn't realise that (it's confusing which options apply to the service as a whole, and which ones apply to individual tasks.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's per "instance", so either per-replica or per-node depending whether it's a replicated or global service.

I believe the reasoning for this was that if you have one node that's broken or misbehaving and has tasks restarting in a loop, you don't want that to impact the ability to restart other tasks when they encounter occasional problems. By tracking restarts per-instance, the instances don't share a global maximum number of restarts. It seems more useful overall.

But I agree it can be confusing.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a future addition; on failure; re-schedule task 🤔

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a future addition; on failure; re-schedule task

We actually do have a mechanism that reschedules tasks after they have failed on the same node repeatedly: #1552

Of course, it only applies to replicated services.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL @mstanleyjones not sure if we documented that?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have. It sounds familiar.


if restartTask.Spec.Restart.Window != nil && (restartTask.Spec.Restart.Window.Seconds != 0 || restartTask.Spec.Restart.Window.Nanos != 0) {
if restartInfo.restartedInstances == nil {
Expand Down Expand Up @@ -432,16 +446,6 @@ func (r *Supervisor) CancelAll() {
// ClearServiceHistory forgets restart history related to a given service ID.
func (r *Supervisor) ClearServiceHistory(serviceID string) {
r.mu.Lock()
defer r.mu.Unlock()

tuples := r.historyByService[serviceID]
if tuples == nil {
return
}

delete(r.historyByService, serviceID)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Wondering if this should also re-initialize the history

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called when the service is deleted, so we want the map entry for it to be gone.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah makes sense

for t := range tuples {
delete(r.history, t)
}
r.mu.Unlock()
}