diff --git a/manager/orchestrator/global/global.go b/manager/orchestrator/global/global.go index f5e6b3afc0..b89a105fec 100644 --- a/manager/orchestrator/global/global.go +++ b/manager/orchestrator/global/global.go @@ -159,13 +159,13 @@ func (g *Orchestrator) Run(ctx context.Context) error { switch v.Node.Status.State { // NodeStatus_DISCONNECTED is a transient state, no need to make any change case api.NodeStatus_DOWN: - g.removeTasksFromNode(ctx, v.Node) + g.foreachTaskFromNode(ctx, v.Node, g.shutdownTask) case api.NodeStatus_READY: // node could come back to READY from DOWN or DISCONNECT g.reconcileOneNode(ctx, v.Node) } case api.EventDeleteNode: - g.removeTasksFromNode(ctx, v.Node) + g.foreachTaskFromNode(ctx, v.Node, g.deleteTask) delete(g.nodes, v.Node.ID) case api.EventUpdateTask: g.handleTaskChange(ctx, v.Task) @@ -201,7 +201,7 @@ func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.T } // if the node no longer valid, remove the task if t.NodeID == "" || orchestrator.InvalidNode(node) { - g.removeTask(ctx, batch, t) + g.shutdownTask(ctx, batch, t) return } @@ -236,7 +236,7 @@ func (g *Orchestrator) Stop() { g.restarts.CancelAll() } -func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) { +func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, cb func(context.Context, *store.Batch, *api.Task)) { var ( tasks []*api.Task err error @@ -245,7 +245,7 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID)) }) if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed finding tasks") + log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed finding tasks") return } @@ -253,13 +253,13 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) for _, t := range tasks { // Global orchestrator only removes tasks from globalServices if _, exists := g.globalServices[t.ServiceID]; exists { - g.removeTask(ctx, batch, t) + cb(ctx, batch, t) } } return nil }) if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed batching tasks") + log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed batching tasks") } } @@ -314,7 +314,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin // if restart policy considers this node has finished its task // it should remove all running tasks if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints { - g.removeTasks(ctx, batch, ntasks) + g.shutdownTasks(ctx, batch, ntasks) continue } @@ -340,7 +340,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin // These must be associated with nodes that are drained, or // nodes that no longer exist. for _, ntasks := range nodeTasks[serviceID] { - g.removeTasks(ctx, batch, ntasks) + g.shutdownTasks(ctx, batch, ntasks) } } return nil @@ -382,7 +382,7 @@ func (g *Orchestrator) updateService(service *api.Service) { func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) { if node.Spec.Availability == api.NodeAvailabilityDrain { log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID) - g.removeTasksFromNode(ctx, node) + g.foreachTaskFromNode(ctx, node, g.shutdownTask) return } @@ -447,7 +447,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs // if restart policy considers this node has finished its task // it should remove all running tasks if completed[serviceID] { - g.removeTasks(ctx, batch, tasks[serviceID]) + g.shutdownTasks(ctx, batch, tasks[serviceID]) continue } @@ -491,7 +491,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs } else { dirtyTasks = append(dirtyTasks, cleanTasks[1:]...) } - g.removeTasks(ctx, batch, dirtyTasks) + g.shutdownTasks(ctx, batch, dirtyTasks) } } return nil @@ -542,7 +542,7 @@ func (g *Orchestrator) tickTasks(ctx context.Context) { g.restartTasks = make(map[string]struct{}) } -func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *api.Task) { +func (g *Orchestrator) shutdownTask(ctx context.Context, batch *store.Batch, t *api.Task) { // set existing task DesiredState to TaskStateShutdown // TODO(aaronl): optimistic update? err := batch.Update(func(tx store.Tx) error { @@ -554,7 +554,7 @@ func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *ap return nil }) if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: removeTask failed to remove %s", t.ID) + log.G(ctx).WithError(err).Errorf("global orchestrator: shutdownTask failed to shut down %s", t.ID) } } @@ -572,9 +572,18 @@ func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service } } -func (g *Orchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) { +func (g *Orchestrator) shutdownTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) { for _, t := range tasks { - g.removeTask(ctx, batch, t) + g.shutdownTask(ctx, batch, t) + } +} + +func (g *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) { + err := batch.Update(func(tx store.Tx) error { + return store.DeleteTask(tx, t.ID) + }) + if err != nil { + log.G(ctx).WithError(err).Errorf("global orchestrator: deleteTask failed to delete %s", t.ID) } } diff --git a/manager/orchestrator/global/global_test.go b/manager/orchestrator/global/global_test.go index f2ba0c292c..886ed18157 100644 --- a/manager/orchestrator/global/global_test.go +++ b/manager/orchestrator/global/global_test.go @@ -155,7 +155,7 @@ func TestDeleteNode(t *testing.T) { deleteNode(t, store, node1) // task should be set to dead - observedTask := testutils.WatchShutdownTask(t, watch) + observedTask := testutils.WatchTaskDelete(t, watch) assert.Equal(t, observedTask.ServiceAnnotations.Name, "name1") assert.Equal(t, observedTask.NodeID, "nodeid1") }