diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 9ee0b9e5c9..9e708ed1b6 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -702,9 +702,20 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup return tasksScheduled } +// noSuitableNode checks unassigned tasks and make sure they have an existing service in the store before +// updating the task status and adding it back to: schedulingDecisions, unassignedTasks and allTasks func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) { explanation := s.pipeline.Explain() for _, t := range taskGroup { + var service *api.Service + s.store.View(func(tx store.ReadTx) { + service = store.GetService(tx, t.ServiceID) + }) + if service == nil { + log.G(ctx).WithField("task.id", t.ID).Debug("removing task from the scheduler") + continue + } + log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task") newT := *t diff --git a/manager/scheduler/scheduler_test.go b/manager/scheduler/scheduler_test.go index 2126e70397..d387dc4a01 100644 --- a/manager/scheduler/scheduler_test.go +++ b/manager/scheduler/scheduler_test.go @@ -1114,6 +1114,7 @@ func TestSchedulerNoReadyNodes(t *testing.T) { ctx := context.Background() initialTask := &api.Task{ ID: "id1", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, ServiceAnnotations: api.Annotations{ Name: "name1", @@ -1128,7 +1129,8 @@ func TestSchedulerNoReadyNodes(t *testing.T) { defer s.Close() err := s.Update(func(tx store.Tx) error { - // Add initial task + // Add initial service and task + assert.NoError(t, store.CreateService(tx, &api.Service{ID: "serviceID1"})) assert.NoError(t, store.CreateTask(tx, initialTask)) return nil }) @@ -1536,6 +1538,7 @@ func TestSchedulerResourceConstraint(t *testing.T) { initialTask := &api.Task{ ID: "id1", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -1559,12 +1562,17 @@ func TestSchedulerResourceConstraint(t *testing.T) { }, } + initialService := &api.Service{ + ID: "serviceID1", + } + s := store.NewMemoryStore(nil) assert.NotNil(t, s) defer s.Close() err := s.Update(func(tx store.Tx) error { - // Add initial node and task + // Add initial node, service and task + assert.NoError(t, store.CreateService(tx, initialService)) assert.NoError(t, store.CreateTask(tx, initialTask)) assert.NoError(t, store.CreateNode(tx, underprovisionedNode)) assert.NoError(t, store.CreateNode(tx, nonready1)) @@ -1788,6 +1796,7 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) { bigTask1 := &api.Task{ DesiredState: api.TaskStateRunning, ID: "id1", + ServiceID: "serviceID1", Spec: api.TaskSpec{ Resources: &api.ResourceRequirements{ Reservations: &api.Resources{ @@ -1809,12 +1818,17 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) { bigTask2 := bigTask1.Copy() bigTask2.ID = "id2" + bigService := &api.Service{ + ID: "serviceID1", + } + s := store.NewMemoryStore(nil) assert.NotNil(t, s) defer s.Close() err := s.Update(func(tx store.Tx) error { - // Add initial node and task + // Add initial node, service and task + assert.NoError(t, store.CreateService(tx, bigService)) assert.NoError(t, store.CreateNode(tx, node)) assert.NoError(t, store.CreateTask(tx, bigTask1)) return nil @@ -1951,6 +1965,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) { // task1 - has a node it can run on task1 := &api.Task{ ID: "id1", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, ServiceAnnotations: api.Annotations{ Name: "name1", @@ -1973,6 +1988,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) { // task2 - has no node it can run on task2 := &api.Task{ ID: "id2", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, ServiceAnnotations: api.Annotations{ Name: "name2", @@ -1995,6 +2011,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) { // task3 - no platform constraints, should run on any node task3 := &api.Task{ ID: "id3", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, ServiceAnnotations: api.Annotations{ Name: "name3", @@ -2007,6 +2024,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) { // task4 - only OS constraint, is runnable on any linux node task4 := &api.Task{ ID: "id4", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, ServiceAnnotations: api.Annotations{ Name: "name4", @@ -2029,6 +2047,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) { // task5 - supported on multiple platforms task5 := &api.Task{ ID: "id5", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, ServiceAnnotations: api.Annotations{ Name: "name5", @@ -2103,12 +2122,16 @@ func TestSchedulerCompatiblePlatform(t *testing.T) { Description: &api.NodeDescription{}, } + service1 := &api.Service{ + ID: "serviceID1", + } s := store.NewMemoryStore(nil) assert.NotNil(t, s) defer s.Close() err := s.Update(func(tx store.Tx) error { - // Add initial task and nodes to the store + // Add initial task, service and nodes to the store + assert.NoError(t, store.CreateService(tx, service1)) assert.NoError(t, store.CreateTask(tx, task1)) assert.NoError(t, store.CreateNode(tx, node1)) assert.NoError(t, store.CreateNode(tx, node2)) @@ -2168,6 +2191,86 @@ func TestSchedulerCompatiblePlatform(t *testing.T) { assert.Regexp(t, assignment4.NodeID, "(node1|node2)") } +// TestSchedulerUnassignedMap tests that unassigned tasks are deleted from unassignedTasks when the service is removed +func TestSchedulerUnassignedMap(t *testing.T) { + ctx := context.Background() + // create a service and a task with OS constraint that is not met + task1 := &api.Task{ + ID: "id1", + ServiceID: "serviceID1", + DesiredState: api.TaskStateRunning, + ServiceAnnotations: api.Annotations{ + Name: "name1", + }, + Status: api.TaskStatus{ + State: api.TaskStatePending, + }, + Spec: api.TaskSpec{ + Placement: &api.Placement{ + Platforms: []*api.Platform{ + { + Architecture: "amd64", + OS: "windows", + }, + }, + }, + }, + } + + node1 := &api.Node{ + ID: "node1", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "node1", + }, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + Description: &api.NodeDescription{ + Platform: &api.Platform{ + Architecture: "x86_64", + OS: "linux", + }, + }, + } + + service1 := &api.Service{ + ID: "serviceID1", + } + + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + err := s.Update(func(tx store.Tx) error { + // Add initial task, service and nodes to the store + assert.NoError(t, store.CreateService(tx, service1)) + assert.NoError(t, store.CreateTask(tx, task1)) + assert.NoError(t, store.CreateNode(tx, node1)) + return nil + }) + assert.NoError(t, err) + + scheduler := New(s) + scheduler.unassignedTasks["id1"] = task1 + + scheduler.tick(ctx) + // task1 is in the unassigned map + assert.Contains(t, scheduler.unassignedTasks, task1.ID) + + // delete the service of an unassigned task + err = s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteService(tx, service1.ID)) + return nil + }) + assert.NoError(t, err) + + scheduler.tick(ctx) + // task1 is removed from the unassigned map + assert.NotContains(t, scheduler.unassignedTasks, task1.ID) +} + func TestPreassignedTasks(t *testing.T) { ctx := context.Background() initialNodeSet := []*api.Node{ @@ -2523,6 +2626,7 @@ func TestSchedulerPluginConstraint(t *testing.T) { // Task0: bind mount t0 := &api.Task{ ID: "task0_ID", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -2548,6 +2652,7 @@ func TestSchedulerPluginConstraint(t *testing.T) { // Task1: vol plugin1 t1 := &api.Task{ ID: "task1_ID", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -2574,6 +2679,7 @@ func TestSchedulerPluginConstraint(t *testing.T) { // Task2: vol plugin1, vol plugin2 t2 := &api.Task{ ID: "task2_ID", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -2606,6 +2712,7 @@ func TestSchedulerPluginConstraint(t *testing.T) { // Task3: vol plugin1, network plugin1 t3 := &api.Task{ ID: "task3_ID", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Networks: []*api.NetworkAttachment{ { @@ -2646,6 +2753,7 @@ func TestSchedulerPluginConstraint(t *testing.T) { // Task4: log plugin1 t4 := &api.Task{ ID: "task4_ID", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -2663,6 +2771,7 @@ func TestSchedulerPluginConstraint(t *testing.T) { // Task5: log plugin1 t5 := &api.Task{ ID: "task5_ID", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -2681,6 +2790,7 @@ func TestSchedulerPluginConstraint(t *testing.T) { // no logging t6 := &api.Task{ ID: "task6_ID", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -2699,6 +2809,7 @@ func TestSchedulerPluginConstraint(t *testing.T) { // log driver with no name t7 := &api.Task{ ID: "task7_ID", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -2718,12 +2829,16 @@ func TestSchedulerPluginConstraint(t *testing.T) { }, } + s1 := &api.Service{ + ID: "serviceID1", + } s := store.NewMemoryStore(nil) assert.NotNil(t, s) defer s.Close() - // Add initial node and task + // Add initial node, service and task err := s.Update(func(tx store.Tx) error { + assert.NoError(t, store.CreateService(tx, s1)) assert.NoError(t, store.CreateTask(tx, t1)) assert.NoError(t, store.CreateNode(tx, n1)) return nil @@ -3014,6 +3129,7 @@ func TestSchedulerHostPort(t *testing.T) { task1 := &api.Task{ ID: "id1", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -3038,6 +3154,7 @@ func TestSchedulerHostPort(t *testing.T) { } task2 := &api.Task{ ID: "id2", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -3062,6 +3179,7 @@ func TestSchedulerHostPort(t *testing.T) { } task3 := &api.Task{ ID: "id3", + ServiceID: "serviceID1", DesiredState: api.TaskStateRunning, Spec: api.TaskSpec{ Runtime: &api.TaskSpec_Container{ @@ -3090,12 +3208,17 @@ func TestSchedulerHostPort(t *testing.T) { }, } + service1 := &api.Service{ + ID: "serviceID1", + } + s := store.NewMemoryStore(nil) assert.NotNil(t, s) defer s.Close() err := s.Update(func(tx store.Tx) error { - // Add initial node and task + // Add initial node, service and task + assert.NoError(t, store.CreateService(tx, service1)) assert.NoError(t, store.CreateTask(tx, task1)) assert.NoError(t, store.CreateTask(tx, task2)) return nil