Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions manager/orchestrator/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@ type instanceRestartInfo struct {
}

type delayedStart struct {
// cancel is called to cancel the delayed start.
cancel func()
doneCh chan struct{}

// waiter is set to true if the next restart is waiting for this delay
// to complete.
waiter bool
}

// RestartSupervisor initiates and manages restarts. It's responsible for
// delaying restarts when applicable.
type RestartSupervisor struct {
mu sync.Mutex
store *store.MemoryStore
delays map[string]delayedStart
delays map[string]*delayedStart
history map[instanceTuple]*instanceRestartInfo
historyByService map[string]map[instanceTuple]struct{}
taskTimeout time.Duration
Expand All @@ -50,18 +55,59 @@ type RestartSupervisor struct {
func NewRestartSupervisor(store *store.MemoryStore) *RestartSupervisor {
return &RestartSupervisor{
store: store,
delays: make(map[string]delayedStart),
delays: make(map[string]*delayedStart),
history: make(map[instanceTuple]*instanceRestartInfo),
historyByService: make(map[string]map[instanceTuple]struct{}),
taskTimeout: defaultOldTaskTimeout,
}
}

func (r *RestartSupervisor) waitRestart(ctx context.Context, oldDelay *delayedStart, cluster *api.Cluster, taskID string) {
// Wait for the last restart delay to elapse.
select {
case <-oldDelay.doneCh:
case <-ctx.Done():
return
}

// Start the next restart
err := r.store.Update(func(tx store.Tx) error {
t := store.GetTask(tx, taskID)
if t == nil {
return nil
}
service := store.GetService(tx, t.ServiceID)
if service == nil {
return nil
}
return r.Restart(ctx, tx, cluster, service, *t)
})

if err != nil {
log.G(ctx).WithError(err).Errorf("failed to restart task after waiting for previous restart")
}
}

// Restart initiates a new task to replace t if appropriate under the service's
// restart policy.
func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *api.Cluster, service *api.Service, t api.Task) error {
// TODO(aluzzardi): This function should not depend on `service`.

// Is the old task still in the process of restarting? If so, wait for
// its restart delay to elapse, to avoid tight restart loops (for
// example, when the image doesn't exist).
r.mu.Lock()
oldDelay, ok := r.delays[t.ID]
if ok {
if !oldDelay.waiter {
oldDelay.waiter = true
go r.waitRestart(ctx, oldDelay, cluster, t.ID)
}
r.mu.Unlock()
return nil
}
r.mu.Unlock()

t.DesiredState = api.TaskStateShutdown
err := store.UpdateTask(tx, &t)
if err != nil {
Expand All @@ -87,7 +133,7 @@ func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *a

n := store.GetNode(tx, t.NodeID)

restartTask.DesiredState = api.TaskStateAccepted
restartTask.DesiredState = api.TaskStateReady

var restartDelay time.Duration
// Restart delay does not applied to drained nodes
Expand Down Expand Up @@ -254,7 +300,7 @@ func (r *RestartSupervisor) DelayStart(ctx context.Context, _ store.Tx, oldTask
<-oldDelay.doneCh
r.mu.Lock()
}
r.delays[newTaskID] = delayedStart{cancel: cancel, doneCh: doneCh}
r.delays[newTaskID] = &delayedStart{cancel: cancel, doneCh: doneCh}
r.mu.Unlock()

var watch chan events.Event
Expand Down
14 changes: 7 additions & 7 deletions manager/orchestrator/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestOrchestratorRestartOnFailure(t *testing.T) {

observedTask3 := watchTaskCreate(t, watch)
assert.Equal(t, observedTask3.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady)
assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1")

expectCommit(t, watch)
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestOrchestratorRestartDelay(t *testing.T) {
observedTask3 := watchTaskCreate(t, watch)
expectCommit(t, watch)
assert.Equal(t, observedTask3.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady)
assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1")

observedTask4 := watchTaskUpdate(t, watch)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
observedTask3 := watchTaskCreate(t, watch)
expectCommit(t, watch)
assert.Equal(t, observedTask3.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady)
assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1")

observedTask4 := watchTaskUpdate(t, watch)
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
observedTask5 := watchTaskCreate(t, watch)
expectCommit(t, watch)
assert.Equal(t, observedTask5.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask5.DesiredState, api.TaskStateAccepted)
assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady)

observedTask6 := watchTaskUpdate(t, watch) // task gets started after a delay
expectCommit(t, watch)
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestOrchestratorRestartWindow(t *testing.T) {
observedTask3 := watchTaskCreate(t, watch)
expectCommit(t, watch)
assert.Equal(t, observedTask3.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateAccepted)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady)
assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1")

observedTask4 := watchTaskUpdate(t, watch)
Expand Down Expand Up @@ -635,7 +635,7 @@ func TestOrchestratorRestartWindow(t *testing.T) {
observedTask5 := watchTaskCreate(t, watch)
expectCommit(t, watch)
assert.Equal(t, observedTask5.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask5.DesiredState, api.TaskStateAccepted)
assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady)
assert.Equal(t, observedTask5.ServiceAnnotations.Name, "name1")

observedTask6 := watchTaskUpdate(t, watch) // task gets started after a delay
Expand Down Expand Up @@ -683,7 +683,7 @@ func TestOrchestratorRestartWindow(t *testing.T) {
observedTask7 := watchTaskCreate(t, watch)
expectCommit(t, watch)
assert.Equal(t, observedTask7.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask7.DesiredState, api.TaskStateAccepted)
assert.Equal(t, observedTask7.DesiredState, api.TaskStateReady)

observedTask8 := watchTaskUpdate(t, watch)
after = time.Now()
Expand Down
7 changes: 7 additions & 0 deletions manager/orchestrator/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -54,6 +55,12 @@ func TestTaskHistory(t *testing.T) {
Replicas: 2,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
Condition: api.RestartOnAny,
Delay: ptypes.DurationProto(0),
},
},
},
}
assert.NoError(t, store.CreateService(tx, j1))
Expand Down
4 changes: 2 additions & 2 deletions manager/orchestrator/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (r *ReplicatedOrchestrator) initTasks(ctx context.Context, readTx store.Rea
continue
}
// TODO(aluzzardi): This is shady. We should have a more generic condition.
if t.DesiredState != api.TaskStateAccepted || !isReplicatedService(service) {
if t.DesiredState != api.TaskStateReady || !isReplicatedService(service) {
continue
}
restartDelay := defaultRestartDelay
Expand All @@ -80,7 +80,7 @@ func (r *ReplicatedOrchestrator) initTasks(ctx context.Context, readTx store.Rea
_ = batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, t.ID)
// TODO(aluzzardi): This is shady as well. We should have a more generic condition.
if t == nil || t.DesiredState != api.TaskStateAccepted {
if t == nil || t.DesiredState != api.TaskStateReady {
return nil
}
r.restarts.DelayStart(ctx, tx, nil, t.ID, restartDelay, true)
Expand Down