Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 122 additions & 2 deletions manager/orchestrator/replicated/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,9 @@ func TestServiceRemoveDeadTasks(t *testing.T) {

// Set both task states to RUNNING.
updatedTask1 := observedTask1.Copy()
updatedTask1.DesiredState = api.TaskStateRunning
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anshulpundir sorry I don't follow why this is removed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because desired state is api.TaskStateRunning when a task is created.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, makes sense.

updatedTask1.Status.State = api.TaskStateRunning
updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"}
updatedTask2 := observedTask2.Copy()
updatedTask2.DesiredState = api.TaskStateRunning
updatedTask2.Status.State = api.TaskStateRunning
updatedTask2.ServiceAnnotations = api.Annotations{Name: "original"}
err = s.Update(func(tx store.Tx) error {
Expand Down Expand Up @@ -569,3 +567,125 @@ func TestServiceRemoveDeadTasks(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, foundTasks, 0)
}

// TestServiceRemoveDeadTasks tests removal of
// tasks in state < TaskStateAssigned.
func TestServiceRemoveUnassignedTasks(t *testing.T) {
ctx := context.Background()
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

assert.NoError(t, s.Update(func(tx store.Tx) error {
store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
// set TaskHistoryRetentionLimit to a negative value, so
// that tasks are cleaned up right away.
TaskHistoryRetentionLimit: 1,
},
},
})
return nil
}))

taskReaper := taskreaper.New(s)
defer taskReaper.Stop()
orchestrator := NewReplicatedOrchestrator(s)
defer orchestrator.Stop()

watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/)
defer cancel()

service1 := &api.Service{
ID: "id1",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name1",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 1,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

// Create a service with one replica specified before the orchestrator is
// started. This should result in two tasks when the orchestrator
// starts up.
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
return nil
})
assert.NoError(t, err)

// Start the orchestrator.
go func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")

// Set the task state to PENDING to simulate allocation.
updatedTask1 := observedTask1.Copy()
updatedTask1.Status.State = api.TaskStatePending
updatedTask1.ServiceAnnotations = api.Annotations{Name: "original"}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateTask(tx, updatedTask1))
return nil
})
require.NoError(t, err)

testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

service1.Spec.Task.ForceUpdate++
// This should shutdown the previous task and create a new one.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.UpdateService(tx, service1))
return nil
})
testutils.Expect(t, watch, api.EventUpdateService{})
testutils.Expect(t, watch, state.EventCommit{})

// New task should be created and old task marked for SHUTDOWN.
observedTask1 = testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")

observedTask3 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateShutdown, observedTask3.DesiredState)
assert.Equal(t, "original", observedTask3.ServiceAnnotations.Name)

testutils.Expect(t, watch, state.EventCommit{})

// Task reaper should delete the task previously marked for SHUTDOWN.
deletedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStatePending, deletedTask1.Status.State)
assert.Equal(t, "original", deletedTask1.ServiceAnnotations.Name)

testutils.Expect(t, watch, state.EventCommit{})

var foundTasks []*api.Task
s.View(func(tx store.ReadTx) {
foundTasks, err = store.FindTasks(tx, store.All)
})
assert.NoError(t, err)
assert.Len(t, foundTasks, 1)
}
7 changes: 6 additions & 1 deletion manager/orchestrator/taskreaper/task_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ func (tr *TaskReaper) tick() {

runningTasks := 0
for _, t := range historicTasks {
if t.DesiredState <= api.TaskStateRunning || t.Status.State <= api.TaskStateRunning {
// Skip tasks which are desired to be running but the current state
// is less than or equal to running.
// This check is important to ignore tasks which are running or need to be running,
// but to delete tasks which are either past running,
// or have not reached running but need to be shutdown (because of a service update, for example).
if t.DesiredState == api.TaskStateRunning && t.Status.State <= api.TaskStateRunning {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is really needed. Don't new tasks always start with desired state running?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why you think this is not needed. We need to shutdown tasks with desired state SHUTDOWN and actual state < ASSIGNED.

This condition is to skip tasks which are not those covered by the condition above. LMK if this is not clear and we can discuss IRL. @nishanttotla

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh oops, I missed the change from || to &&, because previously it would count a task with desired state of SHUTDOWN and current state of PENDING to be running - thanks for fixing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I missed it too. This is needed, sorry for the sliip.

// Don't delete running tasks
runningTasks++
continue
Expand Down
6 changes: 6 additions & 0 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (s *Scheduler) setupTasksList(tx store.ReadTx) error {
continue
}

// Also ignore tasks that have not yet been assigned but desired state is beyond TaskStateRunning
// This can happen if you update, delete or scale down a service before its tasks were assigned.
if t.Status.State == api.TaskStatePending && t.DesiredState > api.TaskStateRunning {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this comparison not t.Status.State <= api.TaskStatePending

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The < is already handled in the condition above @nishanttotla

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anshulpundir sorry maybe I'm missing something, but for case api.EventUpdateTask:, I don't see a specific condition about this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, please ignore my comments. I got confused between two changes.

continue
}

s.allTasks[t.ID] = t
if t.NodeID == "" {
s.enqueue(t)
Expand Down
88 changes: 88 additions & 0 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2277,6 +2277,94 @@ func TestPreassignedTasks(t *testing.T) {
assert.Equal(t, assignment3.NodeID, "node2")
}

func TestIgnoreTasks(t *testing.T) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
}

// Tasks with desired state running, shutdown, remove.
initialTaskSet := []*api.Task{
{
ID: "task1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},

Status: api.TaskStatus{
State: api.TaskStatePending,
},
},
{
ID: "task2",
DesiredState: api.TaskStateShutdown,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
NodeID: initialNodeSet[0].ID,
},
{
ID: "task3",
DesiredState: api.TaskStateRemove,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
NodeID: initialNodeSet[0].ID,
},
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Prepopulate nodes
for _, n := range initialNodeSet {
assert.NoError(t, store.CreateNode(tx, n))
}

// Prepopulate tasks
for _, task := range initialTaskSet {
assert.NoError(t, store.CreateTask(tx, task))
}
return nil
})
assert.NoError(t, err)

scheduler := New(s)

watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()

go func() {
assert.NoError(t, scheduler.Run(ctx))
}()

// task1 is the only task that gets assigned since other two tasks
// are ignored by the scheduler.
// Normally task2/task3 should get assigned first since its a preassigned task.
assignment3 := watchAssignment(t, watch)
assert.Equal(t, assignment3.ID, "task1")
assert.Equal(t, assignment3.NodeID, "node1")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also assert here that task2/task3 are still in the same state?

}

func watchAssignmentFailure(t *testing.T, watch chan events.Event) *api.Task {
for {
select {
Expand Down