Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.T
node = g.nodes[t.NodeID]
}
// if the node no longer valid, remove the task
if t.NodeID == "" || orchestrator.InvalidNode(node) {
if node == nil || node.Spec.Availability == api.NodeAvailabilityDrain {
g.shutdownTask(ctx, batch, t)
return
}
Expand Down Expand Up @@ -295,9 +295,14 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
continue
}

if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update
// any tasks
if node.Spec.Availability == api.NodeAvailabilityPause ||
node.Status.State == api.NodeStatus_DOWN {
// The node is paused or down, so we
// won't add or update any tasks. When
// the node is unpaused or comes back
// up, it will trigger node
// reconciliation, correcting anything
// we might have skiped here.
continue
}

Expand Down Expand Up @@ -334,7 +339,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin

// updateNode updates g.nodes based on the current node value
func (g *Orchestrator) updateNode(node *api.Node) {
if node.Spec.Availability == api.NodeAvailabilityDrain || node.Status.State == api.NodeStatus_DOWN {
if node.Spec.Availability == api.NodeAvailabilityDrain {
delete(g.nodes, node.ID)
} else {
g.nodes[node.ID] = node
Expand Down Expand Up @@ -363,14 +368,12 @@ func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
return
}

if node.Status.State == api.NodeStatus_DOWN {
log.G(ctx).Debugf("global orchestrator: node %s is down, shutting down its tasks", node.ID)
g.foreachTaskFromNode(ctx, node, g.shutdownTask)
return
}

if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update tasks
if node.Spec.Availability == api.NodeAvailabilityPause ||
node.Status.State == api.NodeStatus_DOWN {
// The node is paused or down, so we won't add or update any
// tasks. When the node is unpaused or comes back up, it will
// trigger node reconciliation, correcting anything we might
// have skiped here.
return
}

Expand Down Expand Up @@ -490,13 +493,12 @@ func (g *Orchestrator) tickTasks(ctx context.Context) {
return nil
}

if node.Spec.Availability == api.NodeAvailabilityPause ||
!constraint.NodeMatches(serviceEntry.constraints, node) {
if !constraint.NodeMatches(serviceEntry.constraints, node) {
t.DesiredState = api.TaskStateShutdown
return store.UpdateTask(tx, t)
}

return g.restarts.Restart(ctx, tx, g.cluster, service, *t)
return g.restarts.Restart(ctx, tx, g.cluster, service, *t, false)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("orchestrator restartTask transaction failed")
Expand Down
31 changes: 18 additions & 13 deletions manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,20 @@ func TestNodeState(t *testing.T) {
defer orchestrator.Stop()

testutils.WatchTaskCreate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})

// set node1 to down
updateNodeState(t, store, node1, api.NodeStatus_DOWN)

// task should be set to dead
observedTask1 := testutils.WatchShutdownTask(t, watch)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")
assert.Equal(t, observedTask1.NodeID, "nodeid1")
testutils.Expect(t, watch, api.EventUpdateNode{})
testutils.Expect(t, watch, state.EventCommit{})

// nothing should happen
select {
case event := <-watch:
t.Fatalf("got unexpected event %T: %+v", event, event)
case <-time.After(100 * time.Millisecond):
}

// updating the service shouldn't restart the task
updateService(t, store, service1)
testutils.Expect(t, watch, api.EventUpdateService{})
Expand All @@ -288,7 +292,7 @@ func TestNodeState(t *testing.T) {

// set node1 to ready
updateNodeState(t, store, node1, api.NodeStatus_READY)
// task should be added back
// task should be updated now
observedTask2 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask2.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1")
Expand Down Expand Up @@ -414,9 +418,6 @@ func TestTaskFailure(t *testing.T) {
failTask(t, store, observedTask3)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
observedTask4 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask4.DesiredState, api.TaskStateShutdown)
testutils.Expect(t, watch, state.EventCommit{})

// the task should not be recreated
select {
Expand All @@ -430,10 +431,14 @@ func TestTaskFailure(t *testing.T) {
testutils.Expect(t, watch, api.EventUpdateService{})
testutils.Expect(t, watch, state.EventCommit{})

observedTask5 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask5.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask5.ServiceAnnotations.Name, "norestart")
assert.Equal(t, observedTask5.NodeID, "nodeid1")
observedTask4 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask4.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask4.ServiceAnnotations.Name, "norestart")
assert.Equal(t, observedTask4.NodeID, "nodeid1")

// old task gets shut down as the new one is created
observedTask5 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask5.DesiredState, api.TaskStateShutdown)
testutils.Expect(t, watch, state.EventCommit{})
}

Expand Down
15 changes: 6 additions & 9 deletions manager/orchestrator/replicated/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,28 +216,25 @@ func TestDrain(t *testing.T) {
assert.NoError(t, orchestrator.Run(ctx))
}()

// id2 and id5 should be killed immediately
// id5 should be killed immediately
deletion1 := testutils.WatchShutdownTask(t, watch)
deletion2 := testutils.WatchShutdownTask(t, watch)

assert.Regexp(t, "id(2|5)", deletion1.ID)
assert.Regexp(t, "id(2|5)", deletion1.NodeID)
assert.Regexp(t, "id(2|5)", deletion2.ID)
assert.Regexp(t, "id(2|5)", deletion2.NodeID)
assert.Equal(t, "id5", deletion1.ID)
assert.Equal(t, "id5", deletion1.NodeID)

// Create a new task, assigned to node id2
// Create a new task, assigned to node id5
err = s.Update(func(tx store.Tx) error {
task := initialTaskSet[2].Copy()
task.ID = "newtask"
task.NodeID = "id2"
task.NodeID = "id5"
assert.NoError(t, store.CreateTask(tx, task))
return nil
})
assert.NoError(t, err)

deletion3 := testutils.WatchShutdownTask(t, watch)
assert.Equal(t, "newtask", deletion3.ID)
assert.Equal(t, "id2", deletion3.NodeID)
assert.Equal(t, "id5", deletion3.NodeID)

// Set node id4 to the DRAINED state
err = s.Update(func(tx store.Tx) error {
Expand Down
9 changes: 7 additions & 2 deletions manager/orchestrator/replicated/replicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ type Orchestrator struct {
store *store.MemoryStore

reconcileServices map[string]*api.Service
restartTasks map[string]struct{}

// restartTasks' keys are tasks that need to have Restart called.
// The value is whether to force the desired state to shutdown
// even when the restart policy does not call for this, for example
// when a node is drained.
restartTasks map[string]bool

// stopChan signals to the state machine to stop running.
stopChan chan struct{}
Expand All @@ -37,7 +42,7 @@ func NewReplicatedOrchestrator(store *store.MemoryStore) *Orchestrator {
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
reconcileServices: make(map[string]*api.Service),
restartTasks: make(map[string]struct{}),
restartTasks: make(map[string]bool),
updater: updater,
restarts: restartSupervisor,
}
Expand Down
60 changes: 34 additions & 26 deletions manager/orchestrator/replicated/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,6 @@ func TestOrchestratorRestartOnFailure(t *testing.T) {
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
Expand Down Expand Up @@ -284,8 +282,6 @@ func TestOrchestratorRestartOnNone(t *testing.T) {
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
Expand All @@ -303,8 +299,6 @@ func TestOrchestratorRestartOnNone(t *testing.T) {
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
Expand Down Expand Up @@ -414,7 +408,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
orchestrator := NewReplicatedOrchestrator(s)
defer orchestrator.Stop()

watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/)
watch, cancel := state.Watch(s.WatchQueue(), api.EventCreateTask{}, api.EventUpdateTask{})
defer cancel()

// Create a service with two instances specified before the orchestrator is
Expand Down Expand Up @@ -457,16 +451,44 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
assert.NoError(t, orchestrator.Run(ctx))
}()

testRestart := func() {
testRestart := func(serviceUpdated bool) {
observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask1.ServiceAnnotations.Name, "name1")
if serviceUpdated {
shutdownTask := testutils.WatchTaskUpdate(t, watch)
runnableTask := testutils.WatchTaskUpdate(t, watch)

assert.Equal(t, api.TaskStateShutdown, shutdownTask.DesiredState)
err = s.Update(func(tx store.Tx) error {
task := shutdownTask.Copy()
task.Status.State = api.TaskStateShutdown
assert.NoError(t, store.UpdateTask(tx, task))
return nil
})
assert.NoError(t, err)

testutils.Expect(t, watch, api.EventUpdateTask{})

assert.Equal(t, api.TaskStateRunning, runnableTask.DesiredState)
err = s.Update(func(tx store.Tx) error {
task := runnableTask.Copy()
task.Status.State = api.TaskStateRunning
assert.NoError(t, store.UpdateTask(tx, task))
return nil
})
assert.NoError(t, err)

testutils.Expect(t, watch, api.EventUpdateTask{})
}

observedTask2 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask2.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask2.ServiceAnnotations.Name, "name1")

testutils.Expect(t, watch, state.EventCommit{})
if serviceUpdated {
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, api.EventUpdateTask{})
}

// Fail the first task. Confirm that it gets restarted.
updatedTask1 := observedTask1.Copy()
Expand All @@ -478,17 +500,14 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})

observedTask3 := testutils.WatchTaskCreate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask3.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask3.DesiredState, api.TaskStateReady)
assert.Equal(t, observedTask3.ServiceAnnotations.Name, "name1")

observedTask4 := testutils.WatchTaskUpdate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})
after := time.Now()

// At least 100 ms should have elapsed. Only check the lower bound,
Expand All @@ -510,16 +529,13 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})

observedTask5 := testutils.WatchTaskCreate(t, watch)
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask5.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask5.DesiredState, api.TaskStateReady)

observedTask6 := testutils.WatchTaskUpdate(t, watch) // task gets started after a delay
testutils.Expect(t, watch, state.EventCommit{})
assert.Equal(t, observedTask6.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask6.DesiredState, api.TaskStateRunning)
assert.Equal(t, observedTask6.ServiceAnnotations.Name, "name1")
Expand All @@ -533,9 +549,6 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
Expand All @@ -552,9 +565,6 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
})
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
Expand All @@ -563,7 +573,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
}
}

testRestart()
testRestart(false)

// Update the service spec
err = s.Update(func(tx store.Tx) error {
Expand All @@ -576,7 +586,7 @@ func TestOrchestratorRestartMaxAttempts(t *testing.T) {
})
assert.NoError(t, err)

testRestart()
testRestart(true)
}

func TestOrchestratorRestartWindow(t *testing.T) {
Expand Down Expand Up @@ -704,8 +714,6 @@ func TestOrchestratorRestartWindow(t *testing.T) {
assert.NoError(t, err)
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})
testutils.Expect(t, watch, api.EventUpdateTask{})
testutils.Expect(t, watch, state.EventCommit{})

select {
case <-watch:
Expand Down
Loading