Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,20 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup
return tasksScheduled
}

// noSuitableNode checks unassigned tasks and make sure they have an existing service in the store before
// updating the task status and adding it back to: schedulingDecisions, unassignedTasks and allTasks
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe also say how tasks ends up in noSuitableNode state ?

func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I also request you to add a comment for this function and how its intended to be used ? thx!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roger that!

explanation := s.pipeline.Explain()
for _, t := range taskGroup {
var service *api.Service
s.store.View(func(tx store.ReadTx) {
service = store.GetService(tx, t.ServiceID)
})
if service == nil {
log.G(ctx).WithField("task.id", t.ID).Debug("removing task from the scheduler")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably remove the task from the taskGroup ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to, this map which is originated from the tick -> tasksByCommonSpec and built out of the unassignedTasks , is reset on every tick, by skipping it here we are not adding it back to unassignedTasks

Unless you mean to delete in the current scope of noSuitableNode ? If this is the case, I am not sure what would be the benefit.

continue
}

log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")

newT := *t
Expand Down
135 changes: 129 additions & 6 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
ctx := context.Background()
initialTask := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
Expand All @@ -1128,7 +1129,8 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task
// Add initial service and task
assert.NoError(t, store.CreateService(tx, &api.Service{ID: "serviceID1"}))
assert.NoError(t, store.CreateTask(tx, initialTask))
return nil
})
Expand Down Expand Up @@ -1536,6 +1538,7 @@ func TestSchedulerResourceConstraint(t *testing.T) {

initialTask := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -1559,12 +1562,17 @@ func TestSchedulerResourceConstraint(t *testing.T) {
},
}

initialService := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, initialService))
assert.NoError(t, store.CreateTask(tx, initialTask))
assert.NoError(t, store.CreateNode(tx, underprovisionedNode))
assert.NoError(t, store.CreateNode(tx, nonready1))
Expand Down Expand Up @@ -1788,6 +1796,7 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
bigTask1 := &api.Task{
DesiredState: api.TaskStateRunning,
ID: "id1",
ServiceID: "serviceID1",
Spec: api.TaskSpec{
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
Expand All @@ -1809,12 +1818,17 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
bigTask2 := bigTask1.Copy()
bigTask2.ID = "id2"

bigService := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, bigService))
assert.NoError(t, store.CreateNode(tx, node))
assert.NoError(t, store.CreateTask(tx, bigTask1))
return nil
Expand Down Expand Up @@ -1951,6 +1965,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task1 - has a node it can run on
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
Expand All @@ -1973,6 +1988,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task2 - has no node it can run on
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
Expand All @@ -1995,6 +2011,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task3 - no platform constraints, should run on any node
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name3",
Expand All @@ -2007,6 +2024,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task4 - only OS constraint, is runnable on any linux node
task4 := &api.Task{
ID: "id4",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name4",
Expand All @@ -2029,6 +2047,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task5 - supported on multiple platforms
task5 := &api.Task{
ID: "id5",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name5",
Expand Down Expand Up @@ -2103,12 +2122,16 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
Description: &api.NodeDescription{},
}

service1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task and nodes to the store
// Add initial task, service and nodes to the store
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
Expand Down Expand Up @@ -2168,6 +2191,86 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
assert.Regexp(t, assignment4.NodeID, "(node1|node2)")
}

// TestSchedulerUnassignedMap tests that unassigned tasks are deleted from unassignedTasks when the service is removed
func TestSchedulerUnassignedMap(t *testing.T) {
ctx := context.Background()
// create a service and a task with OS constraint that is not met
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "amd64",
OS: "windows",
},
},
},
},
}

node1 := &api.Node{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Platform: &api.Platform{
Architecture: "x86_64",
OS: "linux",
},
},
}

service1 := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task, service and nodes to the store
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
return nil
})
assert.NoError(t, err)

scheduler := New(s)
scheduler.unassignedTasks["id1"] = task1

scheduler.tick(ctx)
// task1 is in the unassigned map
assert.Contains(t, scheduler.unassignedTasks, task1.ID)

// delete the service of an unassigned task
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.DeleteService(tx, service1.ID))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be much simpler test by directly calling tick()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was testing the full stack call and let the scheduler does its work, but in this case, make sure to limit the change to unassigned map and the tick() call

return nil
})
assert.NoError(t, err)

scheduler.tick(ctx)
// task1 is removed from the unassigned map
assert.NotContains(t, scheduler.unassignedTasks, task1.ID)
}

func TestPreassignedTasks(t *testing.T) {
ctx := context.Background()
initialNodeSet := []*api.Node{
Expand Down Expand Up @@ -2523,6 +2626,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task0: bind mount
t0 := &api.Task{
ID: "task0_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2548,6 +2652,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task1: vol plugin1
t1 := &api.Task{
ID: "task1_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2574,6 +2679,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task2: vol plugin1, vol plugin2
t2 := &api.Task{
ID: "task2_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand Down Expand Up @@ -2606,6 +2712,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task3: vol plugin1, network plugin1
t3 := &api.Task{
ID: "task3_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Networks: []*api.NetworkAttachment{
{
Expand Down Expand Up @@ -2646,6 +2753,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task4: log plugin1
t4 := &api.Task{
ID: "task4_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2663,6 +2771,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task5: log plugin1
t5 := &api.Task{
ID: "task5_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2681,6 +2790,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// no logging
t6 := &api.Task{
ID: "task6_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2699,6 +2809,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// log driver with no name
t7 := &api.Task{
ID: "task7_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2718,12 +2829,16 @@ func TestSchedulerPluginConstraint(t *testing.T) {
},
}

s1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

// Add initial node and task
// Add initial node, service and task
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, s1))
assert.NoError(t, store.CreateTask(tx, t1))
assert.NoError(t, store.CreateNode(tx, n1))
return nil
Expand Down Expand Up @@ -3014,6 +3129,7 @@ func TestSchedulerHostPort(t *testing.T) {

task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -3038,6 +3154,7 @@ func TestSchedulerHostPort(t *testing.T) {
}
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -3062,6 +3179,7 @@ func TestSchedulerHostPort(t *testing.T) {
}
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand Down Expand Up @@ -3090,12 +3208,17 @@ func TestSchedulerHostPort(t *testing.T) {
},
}

service1 := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
Expand Down