Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion agent/exec/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus,
status.PortStatus = portStatus
}()

if task.DesiredState == api.TaskStateShutdown {
// this branch bounds the largest state achievable in the agent as SHUTDOWN, which
// is exactly the correct behavior for the agent.
if task.DesiredState >= api.TaskStateShutdown {
if status.State >= api.TaskStateCompleted {
return noop()
}
Expand Down
68 changes: 68 additions & 0 deletions agent/exec/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,74 @@ func TestShutdown(t *testing.T) {
})
}

// TestDesiredStateRemove checks that the agent maintains SHUTDOWN as the
// maximum state in the agent. This is particularly relevant for the case
// where a service scale down or deletion sets the desired state of tasks
// that are supposed to be removed to REMOVE.
func TestDesiredStateRemove(t *testing.T) {
var (
task = newTestTask(t, api.TaskStateNew, api.TaskStateRemove)
ctx, ctlr, finish = buildTestEnv(t, task)
)
defer func() {
finish()
assert.Equal(t, 1, ctlr.calls["Shutdown"])
}()
ctlr.ShutdownFn = func(_ context.Context) error {
return nil
}

checkDo(ctx, t, task, ctlr, &api.TaskStatus{
State: api.TaskStateShutdown,
Message: "shutdown",
})
}

// TestDesiredStateRemoveOnlyNonterminal checks that the agent will only stop
// a container on REMOVE if it's not already in a terminal state. If the
// container is already in a terminal state, (like COMPLETE) the agent should
// take no action
func TestDesiredStateRemoveOnlyNonterminal(t *testing.T) {
// go through all terminal states, just for completeness' sake
for _, state := range []api.TaskState{
api.TaskStateCompleted,
api.TaskStateShutdown,
api.TaskStateFailed,
api.TaskStateRejected,
api.TaskStateRemove,
// no TaskStateOrphaned becaused that's not a state the task can be in
// on the agent
} {
// capture state variable here to run in parallel
state := state
t.Run(state.String(), func(t *testing.T) {
// go parallel to go faster
t.Parallel()
var (
// create a new task, actual state `state`, desired state
// shutdown
task = newTestTask(t, state, api.TaskStateShutdown)
ctx, ctlr, finish = buildTestEnv(t, task)
)
// make the shutdown function a noop
ctlr.ShutdownFn = func(_ context.Context) error {
return nil
}

// Note we check for error ErrTaskNoop, which will be raised
// because nothing happens
checkDo(ctx, t, task, ctlr, &api.TaskStatus{
State: state,
}, ErrTaskNoop)
defer func() {
finish()
// we should never have called shutdown
assert.Equal(t, 0, ctlr.calls["Shutdown"])
}()
})
}
}

// StatuserController is used to create a new Controller, which is also a ContainerStatuser.
// We cannot add ContainerStatus() to the Controller, due to the check in controller.go:242
type StatuserController struct {
Expand Down
606 changes: 309 additions & 297 deletions api/types.pb.go

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion api/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,16 @@ enum TaskState {
SHUTDOWN = 640 [(gogoproto.enumvalue_customname)="TaskStateShutdown"]; // orchestrator requested shutdown
FAILED = 704 [(gogoproto.enumvalue_customname)="TaskStateFailed"]; // task execution failed with error
REJECTED = 768 [(gogoproto.enumvalue_customname)="TaskStateRejected"]; // task could not be executed here.
ORPHANED = 832 [(gogoproto.enumvalue_customname)="TaskStateOrphaned"]; // The node on which this task is scheduled is Down for too long
// TaskStateRemove is used to correctly handle service deletions and scale
// downs. This allows us to keep track of tasks that have been marked for
// deletion, but can't yet be removed because the agent is in the process of
// shutting them down. Once the agent has shut down tasks with desired state
// REMOVE, the task reaper is responsible for removing them.
REMOVE = 800 [(gogoproto.enumvalue_customname)="TaskStateRemove"];
// TaskStateOrphaned is used to free up resources associated with service
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to pick these changes too ? They were part of a different comment.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were they? I think this was just a reword of the comment for orphaned that i did

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anshulpundir those were reworded in b9e2c2f

// tasks on unresponsive nodes without having to delete those tasks. This
// state is directly assigned to the task by the orchestrator.
ORPHANED = 832 [(gogoproto.enumvalue_customname)="TaskStateOrphaned"];

// NOTE(stevvooe): The state of a task is actually a lamport clock, in that
// given two observations, the greater of the two can be considered
Expand Down
2 changes: 1 addition & 1 deletion manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (g *Orchestrator) Run(ctx context.Context) error {
if !orchestrator.IsGlobalService(v.Service) {
continue
}
orchestrator.DeleteServiceTasks(ctx, g.store, v.Service)
orchestrator.SetServiceTasksRemove(ctx, g.store, v.Service)
// delete the service from service map
delete(g.globalServices, v.Service.ID)
g.restarts.ClearServiceHistory(v.Service.ID)
Expand Down
2 changes: 1 addition & 1 deletion manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestDeleteService(t *testing.T) {

deleteService(t, store, service1)
// task should be deleted
observedTask := testutils.WatchTaskDelete(t, watch)
observedTask := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask.NodeID, "nodeid1")
}
Expand Down
24 changes: 13 additions & 11 deletions manager/orchestrator/replicated/replicated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ func TestReplicatedOrchestrator(t *testing.T) {
})
assert.NoError(t, err)

observedDeletion1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedDeletion1.Status.State, api.TaskStateNew)
assert.Equal(t, observedDeletion1.ServiceAnnotations.Name, "name2")
observedUpdateRemove1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedUpdateRemove1.DesiredState, api.TaskStateRemove)
assert.Equal(t, observedUpdateRemove1.ServiceAnnotations.Name, "name2")

observedDeletion2 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedDeletion2.Status.State, api.TaskStateNew)
assert.Equal(t, observedDeletion2.ServiceAnnotations.Name, "name2")
observedUpdateRemove2 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedUpdateRemove2.DesiredState, api.TaskStateRemove)
assert.Equal(t, observedUpdateRemove2.ServiceAnnotations.Name, "name2")

// There should be one remaining task attached to service id2/name2.
var liveTasks []*api.Task
Expand Down Expand Up @@ -383,10 +383,11 @@ func TestReplicatedScaleDown(t *testing.T) {

// Replicas was set to 6, but we started with 7 tasks. task7 should
// be the one the orchestrator chose to shut down because it was not
// assigned yet.
// assigned yet. The desired state of task7 will be set to "REMOVE"

observedShutdown := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, "task7", observedShutdown.ID)
observedUpdateRemove := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateRemove, observedUpdateRemove.DesiredState)
assert.Equal(t, "task7", observedUpdateRemove.ID)

// Now scale down to 2 instances.
err = s.Update(func(tx store.Tx) error {
Expand All @@ -406,8 +407,9 @@ func TestReplicatedScaleDown(t *testing.T) {

shutdowns := make(map[string]int)
for i := 0; i != 4; i++ {
observedShutdown := testutils.WatchTaskDelete(t, watch)
shutdowns[observedShutdown.NodeID]++
observedUpdateDesiredRemove := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateRemove, observedUpdateDesiredRemove.DesiredState)
shutdowns[observedUpdateDesiredRemove.NodeID]++
}

assert.Equal(t, 1, shutdowns["node1"])
Expand Down
42 changes: 38 additions & 4 deletions manager/orchestrator/replicated/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *Orchestrator) handleServiceEvent(ctx context.Context, event events.Even
if !orchestrator.IsReplicatedService(v.Service) {
return
}
orchestrator.DeleteServiceTasks(ctx, r.store, v.Service)
orchestrator.SetServiceTasksRemove(ctx, r.store, v.Service)
r.restarts.ClearServiceHistory(v.Service.ID)
delete(r.reconcileServices, v.Service.ID)
case api.EventCreateService:
Expand Down Expand Up @@ -86,6 +86,12 @@ func (r *Orchestrator) resolveService(ctx context.Context, task *api.Task) *api.
return service
}

// reconcile decides what actions must be taken depending on the number of
// specificed slots and actual running slots. If the actual running slots are
// fewer than what is requested, it creates new tasks. If the actual running
// slots are more than requested, then it decides which slots must be removed
// and sets desired state of those tasks to REMOVE (the actual removal is handled
// by the task reaper, after the agent shuts the tasks down).
func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) {
runningSlots, deadSlots, err := orchestrator.GetRunnableAndDeadSlots(r.store, service.ID)
if err != nil {
Expand Down Expand Up @@ -157,7 +163,11 @@ func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) {
r.updater.Update(ctx, r.cluster, service, sortedSlots[:specifiedSlots])
err = r.store.Batch(func(batch *store.Batch) error {
r.deleteTasksMap(ctx, batch, deadSlots)
r.deleteTasks(ctx, batch, sortedSlots[specifiedSlots:])
// for all slots that we are removing, we set the desired state of those tasks
// to REMOVE. Then, the agent is responsible for shutting them down, and the
// task reaper is responsible for actually removing them from the store after
// shutdown.
r.setTasksDesiredState(ctx, batch, sortedSlots[specifiedSlots:], api.TaskStateRemove)
return nil
})
if err != nil {
Expand Down Expand Up @@ -198,10 +208,34 @@ func (r *Orchestrator) addTasks(ctx context.Context, batch *store.Batch, service
}
}

func (r *Orchestrator) deleteTasks(ctx context.Context, batch *store.Batch, slots []orchestrator.Slot) {
// setTasksDesiredState sets the desired state for all tasks for the given slots to the
// requested state
func (r *Orchestrator) setTasksDesiredState(ctx context.Context, batch *store.Batch, slots []orchestrator.Slot, newDesiredState api.TaskState) {
for _, slot := range slots {
for _, t := range slot {
r.deleteTask(ctx, batch, t)
err := batch.Update(func(tx store.Tx) error {
// time travel is not allowed. if the current desired state is
// above the one we're trying to go to we can't go backwards.
// we have nothing to do and we should skip to the next task
if t.DesiredState > newDesiredState {
// log a warning, though. we shouln't be trying to rewrite
// a state to an earlier state
log.G(ctx).Warnf(
"cannot update task %v in desired state %v to an earlier desired state %v",
t.ID, t.DesiredState, newDesiredState,
)
return nil
}
// update desired state
t.DesiredState = newDesiredState

return store.UpdateTask(tx, t)
})

// log an error if we get one
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to update task to %v", newDesiredState.String())
}
}
}
}
Expand Down
Loading