diff --git a/manager/controlapi/node.go b/manager/controlapi/node.go index 5308b7419e..6e8bdba5bd 100644 --- a/manager/controlapi/node.go +++ b/manager/controlapi/node.go @@ -265,12 +265,23 @@ func orphanNodeTasks(tx store.Tx, nodeID string) error { return err } for _, task := range tasks { - task.Status = api.TaskStatus{ - Timestamp: gogotypes.TimestampNow(), - State: api.TaskStateOrphaned, - Message: "Task belonged to a node that has been deleted", + // this operation must occur within the same transaction boundary. If + // we cannot accomplish this task orphaning in the same transaction, we + // could crash or die between transactions and not get a chance to do + // this. however, in cases were there is an exceptionally large number + // of tasks for a node, this may cause the transaction to exceed the + // max message size. + // + // therefore, we restrict updating to only tasks in a non-terminal + // state. Tasks in a terminal state do not need to be updated. + if task.Status.State < api.TaskStateCompleted { + task.Status = api.TaskStatus{ + Timestamp: gogotypes.TimestampNow(), + State: api.TaskStateOrphaned, + Message: "Task belonged to a node that has been deleted", + } + store.UpdateTask(tx, task) } - store.UpdateTask(tx, task) } return nil } diff --git a/manager/controlapi/node_test.go b/manager/controlapi/node_test.go index 24e5ee95e4..735dce3703 100644 --- a/manager/controlapi/node_test.go +++ b/manager/controlapi/node_test.go @@ -942,7 +942,7 @@ func TestUpdateNodeDemote(t *testing.T) { testUpdateNodeDemote(t) } -// TestRemoveNodeAttachments tests the unexported removeNodeAttachments +// TestRemoveNodeAttachments tests the unexported orphanNodeTasks func TestOrphanNodeTasks(t *testing.T) { // first, set up a store and all that ts := newTestServer(t) @@ -1081,7 +1081,28 @@ func TestOrphanNodeTasks(t *testing.T) { }, }, } - return store.CreateTask(tx, task4) + if err := store.CreateTask(tx, task4); err != nil { + return err + } + + // 5.) A regular task that's already in a terminal state on the node, + // which does not need to be updated. + task5 := &api.Task{ + ID: "task5", + NodeID: "id2", + DesiredState: api.TaskStateRunning, + Status: api.TaskStatus{ + // use TaskStateCompleted, as this is the earliest terminal + // state (this ensures we don't actually use <= instead of <) + State: api.TaskStateCompleted, + }, + Spec: api.TaskSpec{ + Runtime: &api.TaskSpec_Container{ + Container: &api.ContainerSpec{}, + }, + }, + } + return store.CreateTask(tx, task5) }) require.NoError(t, err) @@ -1096,8 +1117,7 @@ func TestOrphanNodeTasks(t *testing.T) { ts.Store.View(func(tx store.ReadTx) { tasks, err := store.FindTasks(tx, store.All) require.NoError(t, err) - // should only be 3 tasks left - require.Len(t, tasks, 4) + require.Len(t, tasks, 5) // and the list should not contain task1 or task2 for _, task := range tasks { require.NotNil(t, task)