diff --git a/manager/orchestrator/restart.go b/manager/orchestrator/restart.go index 72cbcc378d..851426725f 100644 --- a/manager/orchestrator/restart.go +++ b/manager/orchestrator/restart.go @@ -31,8 +31,13 @@ type instanceRestartInfo struct { } type delayedStart struct { + // cancel is called to cancel the delayed start. cancel func() doneCh chan struct{} + + // waiter is set to true if the next restart is waiting for this delay + // to complete. + waiter bool } // RestartSupervisor initiates and manages restarts. It's responsible for @@ -40,7 +45,7 @@ type delayedStart struct { type RestartSupervisor struct { mu sync.Mutex store *store.MemoryStore - delays map[string]delayedStart + delays map[string]*delayedStart history map[instanceTuple]*instanceRestartInfo historyByService map[string]map[instanceTuple]struct{} taskTimeout time.Duration @@ -50,18 +55,59 @@ type RestartSupervisor struct { func NewRestartSupervisor(store *store.MemoryStore) *RestartSupervisor { return &RestartSupervisor{ store: store, - delays: make(map[string]delayedStart), + delays: make(map[string]*delayedStart), history: make(map[instanceTuple]*instanceRestartInfo), historyByService: make(map[string]map[instanceTuple]struct{}), taskTimeout: defaultOldTaskTimeout, } } +func (r *RestartSupervisor) waitRestart(ctx context.Context, oldDelay *delayedStart, cluster *api.Cluster, taskID string) { + // Wait for the last restart delay to elapse. + select { + case <-oldDelay.doneCh: + case <-ctx.Done(): + return + } + + // Start the next restart + err := r.store.Update(func(tx store.Tx) error { + t := store.GetTask(tx, taskID) + if t == nil { + return nil + } + service := store.GetService(tx, t.ServiceID) + if service == nil { + return nil + } + return r.Restart(ctx, tx, cluster, service, *t) + }) + + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to restart task after waiting for previous restart") + } +} + // Restart initiates a new task to replace t if appropriate under the service's // restart policy. func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *api.Cluster, service *api.Service, t api.Task) error { // TODO(aluzzardi): This function should not depend on `service`. + // Is the old task still in the process of restarting? If so, wait for + // its restart delay to elapse, to avoid tight restart loops (for + // example, when the image doesn't exist). + r.mu.Lock() + oldDelay, ok := r.delays[t.ID] + if ok { + if !oldDelay.waiter { + oldDelay.waiter = true + go r.waitRestart(ctx, oldDelay, cluster, t.ID) + } + r.mu.Unlock() + return nil + } + r.mu.Unlock() + t.DesiredState = api.TaskStateShutdown err := store.UpdateTask(tx, &t) if err != nil { @@ -87,7 +133,7 @@ func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *a n := store.GetNode(tx, t.NodeID) - restartTask.DesiredState = api.TaskStateAccepted + restartTask.DesiredState = api.TaskStateReady var restartDelay time.Duration // Restart delay does not applied to drained nodes @@ -254,7 +300,7 @@ func (r *RestartSupervisor) DelayStart(ctx context.Context, _ store.Tx, oldTask <-oldDelay.doneCh r.mu.Lock() } - r.delays[newTaskID] = delayedStart{cancel: cancel, doneCh: doneCh} + r.delays[newTaskID] = &delayedStart{cancel: cancel, doneCh: doneCh} r.mu.Unlock() var watch chan events.Event diff --git a/manager/orchestrator/restart_test.go b/manager/orchestrator/restart_test.go index def1945542..3c1a6e4204 100644 --- a/manager/orchestrator/restart_test.go +++ b/manager/orchestrator/restart_test.go @@ -184,7 +184,7 @@ func TestOrchestratorRestartOnFailure(t *testing.T) { observedTask3 := watchTaskCreate(t, watch) assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") expectCommit(t, watch) @@ -380,7 +380,7 @@ func TestOrchestratorRestartDelay(t *testing.T) { observedTask3 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") observedTask4 := watchTaskUpdate(t, watch) @@ -470,7 +470,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) { observedTask3 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") observedTask4 := watchTaskUpdate(t, watch) @@ -503,7 +503,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) { observedTask5 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask5.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask5.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady) observedTask6 := watchTaskUpdate(t, watch) // task gets started after a delay expectCommit(t, watch) @@ -602,7 +602,7 @@ func TestOrchestratorRestartWindow(t *testing.T) { observedTask3 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask3.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1") observedTask4 := watchTaskUpdate(t, watch) @@ -635,7 +635,7 @@ func TestOrchestratorRestartWindow(t *testing.T) { observedTask5 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask5.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask5.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady) assert.Equal(t, observedTask5.ServiceAnnotations.Name, "name1") observedTask6 := watchTaskUpdate(t, watch) // task gets started after a delay @@ -683,7 +683,7 @@ func TestOrchestratorRestartWindow(t *testing.T) { observedTask7 := watchTaskCreate(t, watch) expectCommit(t, watch) assert.Equal(t, observedTask7.Status.State, api.TaskStateNew) - assert.Equal(t, observedTask7.DesiredState, api.TaskStateAccepted) + assert.Equal(t, observedTask7.DesiredState, api.TaskStateReady) observedTask8 := watchTaskUpdate(t, watch) after = time.Now() diff --git a/manager/orchestrator/task_reaper_test.go b/manager/orchestrator/task_reaper_test.go index 4ef347cc06..df315edc11 100644 --- a/manager/orchestrator/task_reaper_test.go +++ b/manager/orchestrator/task_reaper_test.go @@ -7,6 +7,7 @@ import ( "github.com/docker/swarmkit/identity" "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" + "github.com/docker/swarmkit/protobuf/ptypes" "github.com/stretchr/testify/assert" "golang.org/x/net/context" ) @@ -54,6 +55,12 @@ func TestTaskHistory(t *testing.T) { Replicas: 2, }, }, + Task: api.TaskSpec{ + Restart: &api.RestartPolicy{ + Condition: api.RestartOnAny, + Delay: ptypes.DurationProto(0), + }, + }, }, } assert.NoError(t, store.CreateService(tx, j1)) diff --git a/manager/orchestrator/tasks.go b/manager/orchestrator/tasks.go index c6d4c2f368..46fc8db3b9 100644 --- a/manager/orchestrator/tasks.go +++ b/manager/orchestrator/tasks.go @@ -56,7 +56,7 @@ func (r *ReplicatedOrchestrator) initTasks(ctx context.Context, readTx store.Rea continue } // TODO(aluzzardi): This is shady. We should have a more generic condition. - if t.DesiredState != api.TaskStateAccepted || !isReplicatedService(service) { + if t.DesiredState != api.TaskStateReady || !isReplicatedService(service) { continue } restartDelay := defaultRestartDelay @@ -80,7 +80,7 @@ func (r *ReplicatedOrchestrator) initTasks(ctx context.Context, readTx store.Rea _ = batch.Update(func(tx store.Tx) error { t := store.GetTask(tx, t.ID) // TODO(aluzzardi): This is shady as well. We should have a more generic condition. - if t == nil || t.DesiredState != api.TaskStateAccepted { + if t == nil || t.DesiredState != api.TaskStateReady { return nil } r.restarts.DelayStart(ctx, tx, nil, t.ID, restartDelay, true)