Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion agent/exec/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus,
status.PortStatus = portStatus
}()

if task.DesiredState == api.TaskStateShutdown {
// this branch bounds the largest state achievable in the agent as SHUTDOWN, which
// is exactly the correct behavior for the agent.
if task.DesiredState >= api.TaskStateShutdown {
if status.State >= api.TaskStateCompleted {
return noop()
}
Expand Down
23 changes: 23 additions & 0 deletions agent/exec/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,29 @@ func TestShutdown(t *testing.T) {
})
}

// TestDesiredStateRemove checks that the agent maintains SHUTDOWN as the
// maximum state in the agent. This is particularly relevant for the case
// where a service scale down or deletion sets the desired state of tasks
// that are supposed to be removed to REMOVE.
func TestDesiredStateRemove(t *testing.T) {
var (
task = newTestTask(t, api.TaskStateNew, api.TaskStateRemove)
ctx, ctlr, finish = buildTestEnv(t, task)
)
defer func() {
finish()
assert.Equal(t, 1, ctlr.calls["Shutdown"])
}()
ctlr.ShutdownFn = func(_ context.Context) error {
return nil
}

checkDo(ctx, t, task, ctlr, &api.TaskStatus{
State: api.TaskStateShutdown,
Message: "shutdown",
})
}

// StatuserController is used to create a new Controller, which is also a ContainerStatuser.
// We cannot add ContainerStatus() to the Controller, due to the check in controller.go:242
type StatuserController struct {
Expand Down
7 changes: 7 additions & 0 deletions api/api.pb.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4139,6 +4139,13 @@ file {
66001: "TaskStateRejected"
}
}
value {
name: "REMOVE"
number: 800
options {
66001: "TaskStateRemove"
}
}
value {
name: "ORPHANED"
number: 832
Expand Down
640 changes: 325 additions & 315 deletions api/types.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions api/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@ enum TaskState {
SHUTDOWN = 640 [(gogoproto.enumvalue_customname)="TaskStateShutdown"]; // orchestrator requested shutdown
FAILED = 704 [(gogoproto.enumvalue_customname)="TaskStateFailed"]; // task execution failed with error
REJECTED = 768 [(gogoproto.enumvalue_customname)="TaskStateRejected"]; // task could not be executed here.
// The main purpose of the REMOVE state is to correctly handle service deletions
// and scale downs. This allows us to keep track of tasks that have been marked
// for deletion, but can't yet be removed because the agent is in the process of
// shutting them down. Once the agent has shut down tasks with desired state
// REMOVE, the task reaper is responsible for removing them.
REMOVE = 800 [(gogoproto.enumvalue_customname)="TaskStateRemove"];
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

832 for consistency with the others?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you missed the ORPHANED = 832 below, which makes this correct at halfway between REJECTED and ORPHANED.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Remove ...

// The main purpose of this state is to free up resources associated with service tasks on
// unresponsive nodes without having to delete those tasks. This state is directly assigned
// to the task by the orchestrator.
Expand Down
2 changes: 1 addition & 1 deletion manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (g *Orchestrator) Run(ctx context.Context) error {
if !orchestrator.IsGlobalService(v.Service) {
continue
}
orchestrator.DeleteServiceTasks(ctx, g.store, v.Service)
orchestrator.SetServiceTasksRemove(ctx, g.store, v.Service)
// delete the service from service map
delete(g.globalServices, v.Service.ID)
g.restarts.ClearServiceHistory(v.Service.ID)
Expand Down
2 changes: 1 addition & 1 deletion manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestDeleteService(t *testing.T) {

deleteService(t, store, service1)
// task should be deleted
observedTask := testutils.WatchTaskDelete(t, watch)
observedTask := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask.NodeID, "nodeid1")
}
Expand Down
24 changes: 13 additions & 11 deletions manager/orchestrator/replicated/replicated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ func TestReplicatedOrchestrator(t *testing.T) {
})
assert.NoError(t, err)

observedDeletion1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedDeletion1.Status.State, api.TaskStateNew)
assert.Equal(t, observedDeletion1.ServiceAnnotations.Name, "name2")
observedUpdateRemove1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedUpdateRemove1.DesiredState, api.TaskStateRemove)
assert.Equal(t, observedUpdateRemove1.ServiceAnnotations.Name, "name2")

observedDeletion2 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedDeletion2.Status.State, api.TaskStateNew)
assert.Equal(t, observedDeletion2.ServiceAnnotations.Name, "name2")
observedUpdateRemove2 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedUpdateRemove2.DesiredState, api.TaskStateRemove)
assert.Equal(t, observedUpdateRemove2.ServiceAnnotations.Name, "name2")

// There should be one remaining task attached to service id2/name2.
var liveTasks []*api.Task
Expand Down Expand Up @@ -383,10 +383,11 @@ func TestReplicatedScaleDown(t *testing.T) {

// Replicas was set to 6, but we started with 7 tasks. task7 should
// be the one the orchestrator chose to shut down because it was not
// assigned yet.
// assigned yet. The desired state of task7 will be set to "REMOVE"

observedShutdown := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, "task7", observedShutdown.ID)
observedUpdateRemove := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateRemove, observedUpdateRemove.DesiredState)
assert.Equal(t, "task7", observedUpdateRemove.ID)

// Now scale down to 2 instances.
err = s.Update(func(tx store.Tx) error {
Expand All @@ -406,8 +407,9 @@ func TestReplicatedScaleDown(t *testing.T) {

shutdowns := make(map[string]int)
for i := 0; i != 4; i++ {
observedShutdown := testutils.WatchTaskDelete(t, watch)
shutdowns[observedShutdown.NodeID]++
observedUpdateDesiredRemove := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateRemove, observedUpdateDesiredRemove.DesiredState)
shutdowns[observedUpdateDesiredRemove.NodeID]++
}

assert.Equal(t, 1, shutdowns["node1"])
Expand Down
32 changes: 28 additions & 4 deletions manager/orchestrator/replicated/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *Orchestrator) handleServiceEvent(ctx context.Context, event events.Even
if !orchestrator.IsReplicatedService(v.Service) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated by not totally: Please add a comment for this case.

return
}
orchestrator.DeleteServiceTasks(ctx, r.store, v.Service)
orchestrator.SetServiceTasksRemove(ctx, r.store, v.Service)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to make the same change in github.com/swarmkit/manager/orchestrator/global/global.go L150.

Then, you can remove the DeleteServiceTasks method, because it has no other uses in the codebase.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

r.restarts.ClearServiceHistory(v.Service.ID)
delete(r.reconcileServices, v.Service.ID)
case api.EventCreateService:
Expand Down Expand Up @@ -86,6 +86,12 @@ func (r *Orchestrator) resolveService(ctx context.Context, task *api.Task) *api.
return service
}

// reconcile decides what actions must be taken depending on the number of
// specificed slots and actual running slots. If the actual running slots are
// fewer than what is requested, it creates new tasks. If the actual running
// slots are more than requested, then it decides which slots must be removed
// and sets desired state of those tasks to REMOVE (the actual removal is handled
// by the task reaper, after the agent shuts the tasks down).
func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) {
runningSlots, deadSlots, err := r.updatableAndDeadSlots(ctx, service)
if err != nil {
Expand Down Expand Up @@ -157,7 +163,11 @@ func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) {
r.updater.Update(ctx, r.cluster, service, sortedSlots[:specifiedSlots])
err = r.store.Batch(func(batch *store.Batch) error {
r.deleteTasksMap(ctx, batch, deadSlots)
r.deleteTasks(ctx, batch, sortedSlots[specifiedSlots:])
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove the (*replicated.Orchestrator).deleteTasks method, because it's no longer used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// for all slots that we are removing, we set the desired state of those tasks
// to REMOVE. Then, the agent is responsible for shutting them down, and the
// task reaper is responsible for actually removing them from the store after
// shutdown.
r.setTasksDesiredState(ctx, batch, sortedSlots[specifiedSlots:], api.TaskStateRemove)
return nil
})
if err != nil {
Expand Down Expand Up @@ -198,10 +208,24 @@ func (r *Orchestrator) addTasks(ctx context.Context, batch *store.Batch, service
}
}

func (r *Orchestrator) deleteTasks(ctx context.Context, batch *store.Batch, slots []orchestrator.Slot) {
// setTasksDesiredState sets the desired state for all tasks for the given slots to the
// requested state
func (r *Orchestrator) setTasksDesiredState(ctx context.Context, batch *store.Batch, slots []orchestrator.Slot, state api.TaskState) {
for _, slot := range slots {
for _, t := range slot {
r.deleteTask(ctx, batch, t)
err := batch.Update(func(tx store.Tx) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do all the updates in the same txn ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anshulpundir how do you mean?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind. I realized that updates to the store are batched using store.Batch.

// update desired state
t.DesiredState = state
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good practice to make sure that the state is being advanced, and never moves backwards. Currently there isn't a state beyond "remove" but we could potentially add one later.


err := store.UpdateTask(tx, t)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to update task %s desired state to %s", t.ID, state.String())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log statement seems redundant with the one below.

}
return err
})
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to update batch to set task desired state to %s", state.String())
}
}
}
}
Expand Down
Loading