Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func (g *Orchestrator) Run(ctx context.Context) error {
switch v.Node.Status.State {
// NodeStatus_DISCONNECTED is a transient state, no need to make any change
case api.NodeStatus_DOWN:
g.removeTasksFromNode(ctx, v.Node)
g.foreachTaskFromNode(ctx, v.Node, g.shutdownTask)
case api.NodeStatus_READY:
// node could come back to READY from DOWN or DISCONNECT
g.reconcileOneNode(ctx, v.Node)
}
case api.EventDeleteNode:
g.removeTasksFromNode(ctx, v.Node)
g.foreachTaskFromNode(ctx, v.Node, g.deleteTask)
delete(g.nodes, v.Node.ID)
case api.EventUpdateTask:
g.handleTaskChange(ctx, v.Task)
Expand Down Expand Up @@ -201,7 +201,7 @@ func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.T
}
// if the node no longer valid, remove the task
if t.NodeID == "" || orchestrator.InvalidNode(node) {
g.removeTask(ctx, batch, t)
g.shutdownTask(ctx, batch, t)
return
}

Expand Down Expand Up @@ -236,7 +236,7 @@ func (g *Orchestrator) Stop() {
g.restarts.CancelAll()
}

func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) {
func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, cb func(context.Context, *store.Batch, *api.Task)) {
var (
tasks []*api.Task
err error
Expand All @@ -245,21 +245,21 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node)
tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID))
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed finding tasks")
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed finding tasks")
return
}

err = g.store.Batch(func(batch *store.Batch) error {
for _, t := range tasks {
// Global orchestrator only removes tasks from globalServices
if _, exists := g.globalServices[t.ServiceID]; exists {
g.removeTask(ctx, batch, t)
cb(ctx, batch, t)
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed batching tasks")
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed batching tasks")
}
}

Expand Down Expand Up @@ -314,7 +314,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
g.removeTasks(ctx, batch, ntasks)
g.shutdownTasks(ctx, batch, ntasks)
continue
}

Expand All @@ -340,7 +340,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
// These must be associated with nodes that are drained, or
// nodes that no longer exist.
for _, ntasks := range nodeTasks[serviceID] {
g.removeTasks(ctx, batch, ntasks)
g.shutdownTasks(ctx, batch, ntasks)
}
}
return nil
Expand Down Expand Up @@ -382,7 +382,7 @@ func (g *Orchestrator) updateService(service *api.Service) {
func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
if node.Spec.Availability == api.NodeAvailabilityDrain {
log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID)
g.removeTasksFromNode(ctx, node)
g.foreachTaskFromNode(ctx, node, g.shutdownTask)
return
}

Expand Down Expand Up @@ -447,7 +447,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] {
g.removeTasks(ctx, batch, tasks[serviceID])
g.shutdownTasks(ctx, batch, tasks[serviceID])
continue
}

Expand Down Expand Up @@ -491,7 +491,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
} else {
dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
}
g.removeTasks(ctx, batch, dirtyTasks)
g.shutdownTasks(ctx, batch, dirtyTasks)
}
}
return nil
Expand Down Expand Up @@ -542,7 +542,7 @@ func (g *Orchestrator) tickTasks(ctx context.Context) {
g.restartTasks = make(map[string]struct{})
}

func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *api.Task) {
func (g *Orchestrator) shutdownTask(ctx context.Context, batch *store.Batch, t *api.Task) {
// set existing task DesiredState to TaskStateShutdown
// TODO(aaronl): optimistic update?
err := batch.Update(func(tx store.Tx) error {
Expand All @@ -554,7 +554,7 @@ func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *ap
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTask failed to remove %s", t.ID)
log.G(ctx).WithError(err).Errorf("global orchestrator: shutdownTask failed to shut down %s", t.ID)
}
}

Expand All @@ -572,9 +572,18 @@ func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service
}
}

func (g *Orchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
func (g *Orchestrator) shutdownTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
for _, t := range tasks {
g.removeTask(ctx, batch, t)
g.shutdownTask(ctx, batch, t)
}
}

func (g *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) {
err := batch.Update(func(tx store.Tx) error {
return store.DeleteTask(tx, t.ID)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: deleteTask failed to delete %s", t.ID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestDeleteNode(t *testing.T) {

deleteNode(t, store, node1)
// task should be set to dead
observedTask := testutils.WatchShutdownTask(t, watch)
observedTask := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask.NodeID, "nodeid1")
}
Expand Down