Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,20 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup
return tasksScheduled
}

// noSuitableNode checks unassigned tasks and make sure they have an existing service in the store before
// updating the task status and adding it back to: schedulingDecisions, unassignedTasks and allTasks
func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
explanation := s.pipeline.Explain()
for _, t := range taskGroup {
var service *api.Service
s.store.View(func(tx store.ReadTx) {
service = store.GetService(tx, t.ServiceID)
})
if service == nil {
log.G(ctx).WithField("task.id", t.ID).Debug("removing task from the scheduler")
continue
}

log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")

newT := *t
Expand Down
135 changes: 129 additions & 6 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
ctx := context.Background()
initialTask := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
Expand All @@ -1128,7 +1129,8 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task
// Add initial service and task
assert.NoError(t, store.CreateService(tx, &api.Service{ID: "serviceID1"}))
assert.NoError(t, store.CreateTask(tx, initialTask))
return nil
})
Expand Down Expand Up @@ -1536,6 +1538,7 @@ func TestSchedulerResourceConstraint(t *testing.T) {

initialTask := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -1559,12 +1562,17 @@ func TestSchedulerResourceConstraint(t *testing.T) {
},
}

initialService := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, initialService))
assert.NoError(t, store.CreateTask(tx, initialTask))
assert.NoError(t, store.CreateNode(tx, underprovisionedNode))
assert.NoError(t, store.CreateNode(tx, nonready1))
Expand Down Expand Up @@ -1788,6 +1796,7 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
bigTask1 := &api.Task{
DesiredState: api.TaskStateRunning,
ID: "id1",
ServiceID: "serviceID1",
Spec: api.TaskSpec{
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
Expand All @@ -1809,12 +1818,17 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
bigTask2 := bigTask1.Copy()
bigTask2.ID = "id2"

bigService := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, bigService))
assert.NoError(t, store.CreateNode(tx, node))
assert.NoError(t, store.CreateTask(tx, bigTask1))
return nil
Expand Down Expand Up @@ -1951,6 +1965,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task1 - has a node it can run on
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
Expand All @@ -1973,6 +1988,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task2 - has no node it can run on
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
Expand All @@ -1995,6 +2011,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task3 - no platform constraints, should run on any node
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name3",
Expand All @@ -2007,6 +2024,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task4 - only OS constraint, is runnable on any linux node
task4 := &api.Task{
ID: "id4",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name4",
Expand All @@ -2029,6 +2047,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
// task5 - supported on multiple platforms
task5 := &api.Task{
ID: "id5",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name5",
Expand Down Expand Up @@ -2103,12 +2122,16 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
Description: &api.NodeDescription{},
}

service1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task and nodes to the store
// Add initial task, service and nodes to the store
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
Expand Down Expand Up @@ -2168,6 +2191,86 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
assert.Regexp(t, assignment4.NodeID, "(node1|node2)")
}

// TestSchedulerUnassignedMap tests that unassigned tasks are deleted from unassignedTasks when the service is removed
func TestSchedulerUnassignedMap(t *testing.T) {
ctx := context.Background()
// create a service and a task with OS constraint that is not met
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "amd64",
OS: "windows",
},
},
},
},
}

node1 := &api.Node{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Platform: &api.Platform{
Architecture: "x86_64",
OS: "linux",
},
},
}

service1 := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task, service and nodes to the store
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
return nil
})
assert.NoError(t, err)

scheduler := New(s)
scheduler.unassignedTasks["id1"] = task1

scheduler.tick(ctx)
// task1 is in the unassigned map
assert.Contains(t, scheduler.unassignedTasks, task1.ID)

// delete the service of an unassigned task
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.DeleteService(tx, service1.ID))
return nil
})
assert.NoError(t, err)

scheduler.tick(ctx)
// task1 is removed from the unassigned map
assert.NotContains(t, scheduler.unassignedTasks, task1.ID)
}

func TestPreassignedTasks(t *testing.T) {
ctx := context.Background()
initialNodeSet := []*api.Node{
Expand Down Expand Up @@ -2523,6 +2626,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task0: bind mount
t0 := &api.Task{
ID: "task0_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2548,6 +2652,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task1: vol plugin1
t1 := &api.Task{
ID: "task1_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2574,6 +2679,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task2: vol plugin1, vol plugin2
t2 := &api.Task{
ID: "task2_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand Down Expand Up @@ -2606,6 +2712,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task3: vol plugin1, network plugin1
t3 := &api.Task{
ID: "task3_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Networks: []*api.NetworkAttachment{
{
Expand Down Expand Up @@ -2646,6 +2753,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task4: log plugin1
t4 := &api.Task{
ID: "task4_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2663,6 +2771,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// Task5: log plugin1
t5 := &api.Task{
ID: "task5_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2681,6 +2790,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// no logging
t6 := &api.Task{
ID: "task6_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2699,6 +2809,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
// log driver with no name
t7 := &api.Task{
ID: "task7_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -2718,12 +2829,16 @@ func TestSchedulerPluginConstraint(t *testing.T) {
},
}

s1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

// Add initial node and task
// Add initial node, service and task
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, s1))
assert.NoError(t, store.CreateTask(tx, t1))
assert.NoError(t, store.CreateNode(tx, n1))
return nil
Expand Down Expand Up @@ -3014,6 +3129,7 @@ func TestSchedulerHostPort(t *testing.T) {

task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -3038,6 +3154,7 @@ func TestSchedulerHostPort(t *testing.T) {
}
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand All @@ -3062,6 +3179,7 @@ func TestSchedulerHostPort(t *testing.T) {
}
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Expand Down Expand Up @@ -3090,12 +3208,17 @@ func TestSchedulerHostPort(t *testing.T) {
},
}

service1 := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
Expand Down