From 303bca8a190c11b065f8ccd81dd107c1abb49a10 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Wed, 20 Jul 2016 18:50:20 -0700 Subject: [PATCH] Put restarted tasks in READY state We used to put restarted tasks in READY state. This makes sense because then they can go ahead and pull an image while we wait for the restart delay to elapse. However, #715 changed the restart supervisor to put restarted tasks into ACCEPTED to work around a tight restart loop when an image doesn't exist. The problem was that the task would fail immediately, leading the orchestrator to request a new restart, which would cancel the ongoing restart delay. As a better fix for this, put tasks in READY, but when a restart is requested and there is already one in progress for the old task, we wait for that restart to complete before starting the new one. Signed-off-by: Aaron Lehmann --- manager/orchestrator/restart.go | 54 ++++++++++++++++++++++-- manager/orchestrator/restart_test.go | 14 +++--- manager/orchestrator/task_reaper_test.go | 7 +++ manager/orchestrator/tasks.go | 4 +- 4 files changed, 66 insertions(+), 13 deletions(-) diff --git a/manager/orchestrator/restart.go b/manager/orchestrator/restart.go index 72cbcc378d..851426725f 100644 --- a/manager/orchestrator/restart.go +++ b/manager/orchestrator/restart.go @@ -31,8 +31,13 @@ type instanceRestartInfo struct { } type delayedStart struct { + // cancel is called to cancel the delayed start. cancel func() doneCh chan struct{} + + // waiter is set to true if the next restart is waiting for this delay + // to complete. + waiter bool } // RestartSupervisor initiates and manages restarts. It's responsible for @@ -40,7 +45,7 @@ type delayedStart struct { type RestartSupervisor struct { mu sync.Mutex store *store.MemoryStore - delays map[string]delayedStart + delays map[string]*delayedStart history map[instanceTuple]*instanceRestartInfo historyByService map[string]map[instanceTuple]struct{} taskTimeout time.Duration @@ -50,18 +55,59 @@ type RestartSupervisor struct { func NewRestartSupervisor(store *store.MemoryStore) *RestartSupervisor { return &RestartSupervisor{ store: store, - delays: make(map[string]delayedStart), + delays: make(map[string]*delayedStart), history: make(map[instanceTuple]*instanceRestartInfo), historyByService: make(map[string]map[instanceTuple]struct{}), taskTimeout: defaultOldTaskTimeout, } } +func (r *RestartSupervisor) waitRestart(ctx context.Context, oldDelay *delayedStart, cluster *api.Cluster, taskID string) { + // Wait for the last restart delay to elapse. + select { + case <-oldDelay.doneCh: + case <-ctx.Done(): + return + } + + // Start the next restart + err := r.store.Update(func(tx store.Tx) error { + t := store.GetTask(tx, taskID) + if t == nil { + return nil + } + service := store.GetService(tx, t.ServiceID) + if service == nil { + return nil + } + return r.Restart(ctx, tx, cluster, service, *t) + }) + + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to restart task after waiting for previous restart") + } +} + // Restart initiates a new task to replace t if appropriate under the service's // restart policy. func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *api.Cluster, service *api.Service, t api.Task) error { // TODO(aluzzardi): This function should not depend on `service`. + // Is the old task still in the process of restarting? If so, wait for + // its restart delay to elapse, to avoid tight restart loops (for + // example, when the image doesn't exist). + r.mu.Lock() + oldDelay, ok := r.delays[t.ID] + if ok { + if !oldDelay.waiter { + oldDelay.waiter = true + go r.waitRestart(ctx, oldDelay, cluster, t.ID) + } + r.mu.Unlock() + return nil + } + r.mu.Unlock() + t.DesiredState = api.TaskStateShutdown err := store.UpdateTask(tx, &t) if err != nil { @@ -87,7 +133,7 @@ func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *a n := store.GetNode(tx, t.NodeID) - restartTask.DesiredState = api.TaskStateAccepted + restartTask.DesiredState = api.TaskStateReady var restartDelay time.Duration // Restart delay does not applied to drained nodes @@ -254,7 +300,7 @@ func (r *RestartSupervisor) DelayStart(ctx context.Context, _ store.Tx, oldTask <-oldDelay.doneCh r.mu.Lock() } - r.delays[newTaskID] = delayedStart{cancel: cancel, doneCh: doneCh} + r.delays[newTaskID] = &delayedStart{cancel: cancel, doneCh: doneCh} r.mu.Unlock() var watch chan events.Event diff --git a/manager/orchestrator/restart_test.go b/manager/orchestrator/restart_test.go index def1945542..3c1a6e4204 100644 --- a/manager/orchestrator/restart_test.go +++ b/manager/orchestrator/restart_test.go @@ -184,7 +184,7 @@ func TestOrchestratorRestartOnFailure(t *testing.T) { observedTask3 := watchTaskCreate(t, watch) assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") expectCommit(t, watch) @@ -380,7 +380,7 @@ func TestOrchestratorRestartDelay(t *testing.T) { observedTask3 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") observedTask4 := watchTaskUpdate(t, watch) @@ -470,7 +470,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) { observedTask3 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") observedTask4 := watchTaskUpdate(t, watch) @@ -503,7 +503,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) { observedTask5 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask5.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask5.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady) observedTask6 := watchTaskUpdate(t, watch) // task gets started after a delay expectCommit(t, watch) @@ -602,7 +602,7 @@ func TestOrchestratorRestartWindow(t *testing.T) { observedTask3 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") observedTask4 := watchTaskUpdate(t, watch) @@ -635,7 +635,7 @@ func TestOrchestratorRestartWindow(t *testing.T) { observedTask5 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask5.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask5.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask5.ServiceAnnotations.Name, "name1") observedTask6 := watchTaskUpdate(t, watch) // task gets started after a delay @@ -683,7 +683,7 @@ func TestOrchestratorRestartWindow(t *testing.T) { observedTask7 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask7.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask7.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask7.DesiredState, api.TaskStateReady) observedTask8 := watchTaskUpdate(t, watch) after = time.Now() diff --git a/manager/orchestrator/task_reaper_test.go b/manager/orchestrator/task_reaper_test.go index 4ef347cc06..df315edc11 100644 --- a/manager/orchestrator/task_reaper_test.go +++ b/manager/orchestrator/task_reaper_test.go @@ -7,6 +7,7 @@ import ( "github.com/docker/swarmkit/identity" "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" + "github.com/docker/swarmkit/protobuf/ptypes" "github.com/stretchr/testify/assert" "golang.org/x/net/context" ) @@ -54,6 +55,12 @@ func TestTaskHistory(t *testing.T) { Replicas: 2, }, }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + Condition: api.RestartOnAny, + Delay: ptypes.DurationProto(0), + }, + }, }, } assert.NoError(t, store.CreateService(tx, j1)) diff --git a/manager/orchestrator/tasks.go b/manager/orchestrator/tasks.go index c6d4c2f368..46fc8db3b9 100644 --- a/manager/orchestrator/tasks.go +++ b/manager/orchestrator/tasks.go @@ -56,7 +56,7 @@ func (r *ReplicatedOrchestrator) initTasks(ctx context.Context, readTx store.Rea continue } // TODO(aluzzardi): This is shady. We should have a more generic condition. - if t.DesiredState != api.TaskStateAccepted || !isReplicatedService(service) { + if t.DesiredState != api.TaskStateReady || !isReplicatedService(service) { continue } restartDelay := defaultRestartDelay @@ -80,7 +80,7 @@ func (r *ReplicatedOrchestrator) initTasks(ctx context.Context, readTx store.Rea _ = batch.Update(func(tx store.Tx) error { t := store.GetTask(tx, t.ID) // TODO(aluzzardi): This is shady as well. We should have a more generic condition. - if t == nil || t.DesiredState != api.TaskStateAccepted { + if t == nil || t.DesiredState != api.TaskStateReady { return nil } r.restarts.DelayStart(ctx, tx, nil, t.ID, restartDelay, true)