Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions manager/controlapi/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,23 @@ func orphanNodeTasks(tx store.Tx, nodeID string) error {
return err
}
for _, task := range tasks {
task.Status = api.TaskStatus{
Timestamp: gogotypes.TimestampNow(),
State: api.TaskStateOrphaned,
Message: "Task belonged to a node that has been deleted",
// this operation must occur within the same transaction boundary. If
// we cannot accomplish this task orphaning in the same transaction, we
// could crash or die between transactions and not get a chance to do
// this. however, in cases were there is an exceptionally large number
// of tasks for a node, this may cause the transaction to exceed the
// max message size.
//
// therefore, we restrict updating to only tasks in a non-terminal
// state. Tasks in a terminal state do not need to be updated.
if task.Status.State < api.TaskStateCompleted {
task.Status = api.TaskStatus{
Timestamp: gogotypes.TimestampNow(),
State: api.TaskStateOrphaned,
Message: "Task belonged to a node that has been deleted",
}
store.UpdateTask(tx, task)
}
store.UpdateTask(tx, task)
}
return nil
}
Expand Down
28 changes: 24 additions & 4 deletions manager/controlapi/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ func TestUpdateNodeDemote(t *testing.T) {
testUpdateNodeDemote(t)
}

// TestRemoveNodeAttachments tests the unexported removeNodeAttachments
// TestRemoveNodeAttachments tests the unexported orphanNodeTasks
func TestOrphanNodeTasks(t *testing.T) {
// first, set up a store and all that
ts := newTestServer(t)
Expand Down Expand Up @@ -1081,7 +1081,28 @@ func TestOrphanNodeTasks(t *testing.T) {
},
},
}
return store.CreateTask(tx, task4)
if err := store.CreateTask(tx, task4); err != nil {
return err
}

// 5.) A regular task that's already in a terminal state on the node,
// which does not need to be updated.
task5 := &api.Task{
ID: "task5",
NodeID: "id2",
DesiredState: api.TaskStateRunning,
Status: api.TaskStatus{
// use TaskStateCompleted, as this is the earliest terminal
// state (this ensures we don't actually use <= instead of <)
State: api.TaskStateCompleted,
},
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
}
return store.CreateTask(tx, task5)
})
require.NoError(t, err)

Expand All @@ -1096,8 +1117,7 @@ func TestOrphanNodeTasks(t *testing.T) {
ts.Store.View(func(tx store.ReadTx) {
tasks, err := store.FindTasks(tx, store.All)
require.NoError(t, err)
// should only be 3 tasks left
require.Len(t, tasks, 4)
require.Len(t, tasks, 5)
// and the list should not contain task1 or task2
for _, task := range tasks {
require.NotNil(t, task)
Expand Down