Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 2 additions & 31 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,6 @@ func (g *Orchestrator) Run(ctx context.Context) error {
delete(g.nodes, v.Node.ID)
case api.EventUpdateTask:
g.handleTaskChange(ctx, v.Task)
case api.EventDeleteTask:
// CLI allows deleting task
if _, exists := g.globalServices[v.Task.ServiceID]; !exists {
continue
}
g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID)
}
case <-g.stopChan:
return nil
Expand Down Expand Up @@ -216,7 +210,7 @@ func (g *Orchestrator) handleTaskChange(ctx context.Context, t *api.Task) {
if _, exists := g.globalServices[t.ServiceID]; !exists {
return
}
// if a task's DesiredState has past running, which
// if a task's DesiredState has passed running, it
// means the task has been processed
if t.DesiredState > api.TaskStateRunning {
return
Expand Down Expand Up @@ -264,7 +258,6 @@ func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node,
}

func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []string) {
nodeCompleted := make(map[string]map[string]struct{})
nodeTasks := make(map[string]map[string][]*api.Task)

g.store.View(func(tx store.ReadTx) {
Expand All @@ -275,20 +268,13 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
continue
}

// a node may have completed this service
nodeCompleted[serviceID] = make(map[string]struct{})
// nodeID -> task list
nodeTasks[serviceID] = make(map[string][]*api.Task)

for _, t := range tasks {
if t.DesiredState <= api.TaskStateRunning {
// Collect all running instances of this service
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
} else {
// for finished tasks, check restartPolicy
if isTaskCompleted(t, orchestrator.RestartCondition(t)) {
nodeCompleted[serviceID][t.NodeID] = struct{}{}
}
}
}
}
Expand All @@ -311,9 +297,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
ntasks := nodeTasks[serviceID][nodeID]
delete(nodeTasks[serviceID], nodeID)

// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
if !meetsConstraints {
g.shutdownTasks(ctx, batch, ntasks)
continue
}
Expand Down Expand Up @@ -400,8 +384,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
return
}

// whether each service has completed on the node
completed := make(map[string]bool)
// tasks by service
tasks := make(map[string][]*api.Task)

Expand All @@ -425,10 +407,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
}
if t.DesiredState <= api.TaskStateRunning {
tasks[serviceID] = append(tasks[serviceID], t)
} else {
if isTaskCompleted(t, orchestrator.RestartCondition(t)) {
completed[serviceID] = true
}
}
}
}
Expand All @@ -444,13 +422,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
continue
}

// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] {
g.shutdownTasks(ctx, batch, tasks[serviceID])
continue
}

if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update tasks
continue
Expand Down
161 changes: 148 additions & 13 deletions manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,29 @@ var (
},
},
}

serviceNoRestart = &api.Service{
ID: "serviceid3",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "norestart",
},
Task: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Restart: &api.RestartPolicy{
Condition: api.RestartOnNone,
},
},
Mode: &api.ServiceSpec_Global{
Global: &api.GlobalService{},
},
},
}
)

func SetupCluster(t *testing.T, store *store.MemoryStore, watch chan events.Event) *api.Task {
func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orchestrator {
ctx := context.Background()
// Start the global orchestrator.
global := NewGlobalOrchestrator(store)
Expand All @@ -107,8 +127,7 @@ func SetupCluster(t *testing.T, store *store.MemoryStore, watch chan events.Even
testutils.Expect(t, watch, api.EventCreateNode{})
testutils.Expect(t, watch, state.EventCommit{})

// return task creation from orchestrator
return testutils.WatchTaskCreate(t, watch)
return global
}

func TestSetup(t *testing.T) {
Expand All @@ -119,7 +138,10 @@ func TestSetup(t *testing.T) {
watch, cancel := state.Watch(store.WatchQueue() /*state.EventCreateTask{}, state.EventUpdateTask{}*/)
defer cancel()

observedTask1 := SetupCluster(t, store, watch)
orchestrator := setup(t, store, watch)
defer orchestrator.Stop()

observedTask1 := testutils.WatchTaskCreate(t, watch)

assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")
Expand All @@ -134,7 +156,10 @@ func TestAddNode(t *testing.T) {
watch, cancel := state.Watch(store.WatchQueue())
defer cancel()

SetupCluster(t, store, watch)
orchestrator := setup(t, store, watch)
defer orchestrator.Stop()

testutils.WatchTaskCreate(t, watch)

addNode(t, store, node2)
observedTask2 := testutils.WatchTaskCreate(t, watch)
Expand All @@ -151,7 +176,10 @@ func TestDeleteNode(t *testing.T) {
watch, cancel := state.Watch(store.WatchQueue())
defer cancel()

SetupCluster(t, store, watch)
orchestrator := setup(t, store, watch)
defer orchestrator.Stop()

testutils.WatchTaskCreate(t, watch)

deleteNode(t, store, node1)
// task should be set to dead
Expand All @@ -168,7 +196,10 @@ func TestNodeAvailability(t *testing.T) {
watch, cancel := state.Watch(store.WatchQueue())
defer cancel()

SetupCluster(t, store, watch)
orchestrator := setup(t, store, watch)
defer orchestrator.Stop()

testutils.WatchTaskCreate(t, watch)

// set node1 to drain
updateNodeAvailability(t, store, node1, api.NodeAvailabilityDrain)
Expand All @@ -195,7 +226,10 @@ func TestAddService(t *testing.T) {
watch, cancel := state.Watch(store.WatchQueue())
defer cancel()

SetupCluster(t, store, watch)
orchestrator := setup(t, store, watch)
defer orchestrator.Stop()

testutils.WatchTaskCreate(t, watch)

addService(t, store, service2)
observedTask := testutils.WatchTaskCreate(t, watch)
Expand All @@ -212,7 +246,10 @@ func TestDeleteService(t *testing.T) {
watch, cancel := state.Watch(store.WatchQueue())
defer cancel()

SetupCluster(t, store, watch)
orchestrator := setup(t, store, watch)
defer orchestrator.Stop()

testutils.WatchTaskCreate(t, watch)

deleteService(t, store, service1)
// task should be deleted
Expand All @@ -222,32 +259,120 @@ func TestDeleteService(t *testing.T) {
}

func TestRemoveTask(t *testing.T) {
t.Parallel()

store := store.NewMemoryStore(nil)
assert.NotNil(t, store)
defer store.Close()

watch, cancel := state.Watch(store.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/)
defer cancel()

observedTask1 := SetupCluster(t, store, watch)
orchestrator := setup(t, store, watch)
defer orchestrator.Stop()

observedTask1 := testutils.WatchTaskCreate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})

assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask1.NodeID, "nodeid1")

// delete the task
deleteTask(t, store, observedTask1)
testutils.Expect(t, watch, api.EventDeleteTask{})
testutils.Expect(t, watch, state.EventCommit{})

// the task should not be recreated
select {
case event := <-watch:
t.Fatalf("got unexpected event %T: %+v", event, event)
case <-time.After(100 * time.Millisecond):
}
}

func TestTaskFailure(t *testing.T) {
t.Parallel()

store := store.NewMemoryStore(nil)
assert.NotNil(t, store)
defer store.Close()

watch, cancel := state.Watch(store.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/)
defer cancel()

// first, try a "restart on any" policy
orchestrator := setup(t, store, watch)
defer orchestrator.Stop()

observedTask1 := testutils.WatchTaskCreate(t, watch)

assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask1.NodeID, "nodeid1")

failTask(t, store, observedTask1)

testutils.WatchShutdownTask(t, watch)

// the task should be recreated
observedTask2 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask2.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask2.NodeID, "nodeid1")
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{}) // ready->running
testutils.Expect(t, watch, state.EventCommit{})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to also delete the task here, and assert that it doesn't get re-created, since that behavior has also changed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back TestRemoveTask, now expecting different behavior.

// repeat with service set up not to restart
addService(t, store, serviceNoRestart)
testutils.Expect(t, watch, api.EventCreateService{})
testutils.Expect(t, watch, state.EventCommit{})

observedTask3 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask3.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask3.ServiceAnnotations.Name, "norestart")
assert.Equal(t, observedTask3.NodeID, "nodeid1")
testutils.Expect(t, watch, state.EventCommit{})

failTask(t, store, observedTask3)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
observedTask4 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask4.DesiredState, api.TaskStateShutdown)
testutils.Expect(t, watch, state.EventCommit{})

// the task should not be recreated
select {
case event := <-watch:
t.Fatalf("got unexpected event %T: %+v", event, event)
case <-time.After(100 * time.Millisecond):
}

// update the service. now the task should be recreated.
updateService(t, store, serviceNoRestart)
testutils.Expect(t, watch, api.EventUpdateService{})
testutils.Expect(t, watch, state.EventCommit{})

observedTask5 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask5.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask5.ServiceAnnotations.Name, "norestart")
assert.Equal(t, observedTask5.NodeID, "nodeid1")
testutils.Expect(t, watch, state.EventCommit{})
}

func addService(t *testing.T, s *store.MemoryStore, service *api.Service) {
s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service))
assert.NoError(t, store.CreateService(tx, service.Copy()))
return nil
})
}

func updateService(t *testing.T, s *store.MemoryStore, service *api.Service) {
s.Update(func(tx store.Tx) error {
service := store.GetService(tx, service.ID)
require.NotNil(t, service)
service.Spec.Task.ForceUpdate++
assert.NoError(t, store.UpdateService(tx, service))
return nil
})
}
Expand All @@ -261,7 +386,7 @@ func deleteService(t *testing.T, s *store.MemoryStore, service *api.Service) {

func addNode(t *testing.T, s *store.MemoryStore, node *api.Node) {
s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNode(tx, node))
assert.NoError(t, store.CreateNode(tx, node.Copy()))
return nil
})
}
Expand Down Expand Up @@ -295,6 +420,16 @@ func deleteTask(t *testing.T, s *store.MemoryStore, task *api.Task) {
})
}

func failTask(t *testing.T, s *store.MemoryStore, task *api.Task) {
s.Update(func(tx store.Tx) error {
task := store.GetTask(tx, task.ID)
require.NotNil(t, task)
task.Status.State = api.TaskStateFailed
assert.NoError(t, store.UpdateTask(tx, task))
return nil
})
}

func TestInitializationRejectedTasks(t *testing.T) {
ctx := context.Background()
s := store.NewMemoryStore(nil)
Expand Down
2 changes: 1 addition & 1 deletion manager/orchestrator/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Expect(t *testing.T, watch chan events.Event, specifiers ...api.Event) {
}
return
case <-time.After(time.Second):
assert.FailNow(t, "no commit event")
assert.FailNow(t, "no matching event")
}
}
}