Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 137 additions & 95 deletions api/objects.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions api/objects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ message Task {
// is only written by the manager.
TaskState desired_state = 10;

// DontRestart indicates that the restart supervisor decided not to
// start a replacement task for this task. This flag records the
// decision so that orchestrators can honor it when they do
// service-level reconciliation.
bool dont_restart = 16;

// List of network attachments by the task.
repeated NetworkAttachment networks = 11;

Expand Down
9 changes: 6 additions & 3 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,11 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
nodeTasks[serviceID] = make(map[string][]*api.Task)

for _, t := range tasks {
if t.DesiredState <= api.TaskStateRunning {
// Collect all running instances of this service
if t.DesiredState <= api.TaskStateRunning || t.DontRestart {
// Collect all runnable instances of this service,
// and instances that were not be restarted due
// to restart policy but may be updated if the
// service spec changed.
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
}
}
Expand Down Expand Up @@ -405,7 +408,7 @@ func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
if t.ServiceID != serviceID {
continue
}
if t.DesiredState <= api.TaskStateRunning {
if t.DesiredState <= api.TaskStateRunning || t.DontRestart {
tasks[serviceID] = append(tasks[serviceID], t)
}
}
Expand Down
28 changes: 21 additions & 7 deletions manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestNodeAvailability(t *testing.T) {
testutils.Expect(t, watch, state.EventCommit{})

// updating the service shouldn't restart the task
updateService(t, store, service1)
updateService(t, store, service1, true)
testutils.Expect(t, watch, api.EventUpdateService{})
testutils.Expect(t, watch, state.EventCommit{})
select {
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestNodeAvailability(t *testing.T) {
testutils.Expect(t, watch, state.EventCommit{})

// updating the service shouldn't restart the task
updateService(t, store, service1)
updateService(t, store, service1, true)
testutils.Expect(t, watch, api.EventUpdateService{})
testutils.Expect(t, watch, state.EventCommit{})
select {
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestNodeState(t *testing.T) {
testutils.Expect(t, watch, state.EventCommit{})

// updating the service shouldn't restart the task
updateService(t, store, service1)
updateService(t, store, service1, true)
testutils.Expect(t, watch, api.EventUpdateService{})
testutils.Expect(t, watch, state.EventCommit{})
select {
Expand Down Expand Up @@ -425,8 +425,20 @@ func TestTaskFailure(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}

// update the service. now the task should be recreated.
updateService(t, store, serviceNoRestart)
// update the service with no spec changes, to trigger a
// reconciliation. the task should still not be updated.
updateService(t, store, serviceNoRestart, false)
testutils.Expect(t, watch, api.EventUpdateService{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case event := <-watch:
t.Fatalf("got unexpected event %T: %+v", event, event)
case <-time.After(100 * time.Millisecond):
}

// update the service with spec changes. now the task should be recreated.
updateService(t, store, serviceNoRestart, true)
testutils.Expect(t, watch, api.EventUpdateService{})
testutils.Expect(t, watch, state.EventCommit{})

Expand All @@ -444,11 +456,13 @@ func addService(t *testing.T, s *store.MemoryStore, service *api.Service) {
})
}

func updateService(t *testing.T, s *store.MemoryStore, service *api.Service) {
func updateService(t *testing.T, s *store.MemoryStore, service *api.Service, force bool) {
s.Update(func(tx store.Tx) error {
service := store.GetService(tx, service.ID)
require.NotNil(t, service)
service.Spec.Task.ForceUpdate++
if force {
service.Spec.Task.ForceUpdate++
}
assert.NoError(t, store.UpdateService(tx, service))
return nil
})
Expand Down
Loading