Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 148 additions & 64 deletions cli/command/service/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"os/signal"
"strings"
"time"

"github.com/docker/docker/api/types"
Expand All @@ -29,6 +30,13 @@ var (
swarm.TaskStateReady: 7,
swarm.TaskStateStarting: 8,
swarm.TaskStateRunning: 9,

// The following states are not actually shown in progress
// output, but are used internally for ordering.
swarm.TaskStateComplete: 10,
swarm.TaskStateShutdown: 11,
swarm.TaskStateFailed: 12,
swarm.TaskStateRejected: 13,
}

longestState int
Expand All @@ -40,22 +48,26 @@ const (
)

type progressUpdater interface {
update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error)
update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error)
}

func init() {
for state := range numberedStates {
if len(state) > longestState {
if !terminalState(state) && len(state) > longestState {
longestState = len(state)
}
}
}

func terminalState(state swarm.TaskState) bool {
return numberedStates[state] > numberedStates[swarm.TaskStateRunning]
}

func stateToProgress(state swarm.TaskState, rollback bool) int64 {
if !rollback {
return numberedStates[state]
}
return int64(len(numberedStates)) - numberedStates[state]
return numberedStates[swarm.TaskStateRunning] - numberedStates[state]
}

// ServiceProgress outputs progress information for convergence of a service.
Expand Down Expand Up @@ -187,16 +199,16 @@ func ServiceProgress(ctx context.Context, client client.APIClient, serviceID str
}
}

func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) {
func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]struct{}, error) {
nodes, err := client.NodeList(ctx, types.NodeListOptions{})
if err != nil {
return nil, err
}

activeNodes := make(map[string]swarm.Node)
activeNodes := make(map[string]struct{})
for _, n := range nodes {
if n.Status.State != swarm.NodeStateDown {
activeNodes[n.ID] = n
activeNodes[n.ID] = struct{}{}
}
}
return activeNodes, nil
Expand Down Expand Up @@ -230,6 +242,18 @@ func writeOverallProgress(progressOut progress.Output, numerator, denominator in
})
}

func truncError(errMsg string) string {
// Remove newlines from the error, which corrupt the output.
errMsg = strings.Replace(errMsg, "\n", " ", -1)

// Limit the length to 75 characters, so that even on narrow terminals
// this will not overflow to the next line.
if len(errMsg) > 75 {
errMsg = errMsg[:74] + "…"
}
return errMsg
}

type replicatedProgressUpdater struct {
progressOut progress.Output

Expand All @@ -241,8 +265,7 @@ type replicatedProgressUpdater struct {
done bool
}

// nolint: gocyclo
func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) {
if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil {
return false, errors.New("no replica count")
}
Expand All @@ -262,27 +285,7 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
u.initialized = true
}

// If there are multiple tasks with the same slot number, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksBySlot := make(map[int]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 {
continue
}
if existingTask, ok := tasksBySlot[task.Slot]; ok {
if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
continue
}
}
if task.NodeID != "" {
if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
tasksBySlot[task.Slot] = task
}
} else {
tasksBySlot[task.Slot] = task
}
}
tasksBySlot := u.tasksBySlot(tasks, activeNodes)

// If we had reached a converged state, check if we are still converged.
if u.done {
Expand All @@ -303,18 +306,11 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
u.slotMap[task.Slot] = mappedSlot
}

if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
if task.Status.State == swarm.TaskStateRunning {
if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning {
running++
}

u.writeTaskProgress(task, mappedSlot, replicas, rollback)
}

if !u.done {
Expand All @@ -328,30 +324,72 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.
return running == replicas, nil
}

type globalProgressUpdater struct {
progressOut progress.Output

initialized bool
done bool
}

func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
// If there are multiple tasks with the same node ID, favor the one
func (u *replicatedProgressUpdater) tasksBySlot(tasks []swarm.Task, activeNodes map[string]struct{}) map[int]swarm.Task {
// If there are multiple tasks with the same slot number, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksByNode := make(map[string]swarm.Task)
tasksBySlot := make(map[int]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 {
if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 {
continue
}
if existingTask, ok := tasksByNode[task.NodeID]; ok {
if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
if existingTask, ok := tasksBySlot[task.Slot]; ok {
if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] {
continue
}
// If the desired states match, observed state breaks
// ties. This can happen with the "start first" service
// update mode.
if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] &&
numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] {
continue
}
}
tasksByNode[task.NodeID] = task
if task.NodeID != "" {
if _, nodeActive := activeNodes[task.NodeID]; !nodeActive {
continue
}
}
tasksBySlot[task.Slot] = task
}

return tasksBySlot
}

func (u *replicatedProgressUpdater) writeTaskProgress(task swarm.Task, mappedSlot int, replicas uint64, rollback bool) {
if u.done || replicas > maxProgressBars || uint64(mappedSlot) > replicas {
return
}

if task.Status.Err != "" {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: truncError(task.Status.Err),
})
return
}

if !terminalState(task.DesiredState) && !terminalState(task.Status.State) {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
}

type globalProgressUpdater struct {
progressOut progress.Output

initialized bool
done bool
}

func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) {
tasksByNode := u.tasksByNode(tasks)

// We don't have perfect knowledge of how many nodes meet the
// constraints for this service. But the orchestrator creates tasks
// for all eligible nodes at the same time, so we should see all those
Expand Down Expand Up @@ -387,19 +425,12 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task
running := 0

for _, task := range tasksByNode {
if node, nodeActive := activeNodes[task.NodeID]; nodeActive {
if !u.done && nodeCount <= maxProgressBars {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(node.ID),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
if task.Status.State == swarm.TaskStateRunning {
if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning {
running++
}

u.writeTaskProgress(task, nodeCount, rollback)
}
}

Expand All @@ -413,3 +444,56 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task

return running == nodeCount, nil
}

func (u *globalProgressUpdater) tasksByNode(tasks []swarm.Task) map[string]swarm.Task {
// If there are multiple tasks with the same node ID, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksByNode := make(map[string]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 {
continue
}
if existingTask, ok := tasksByNode[task.NodeID]; ok {
if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] {
continue
}

// If the desired states match, observed state breaks
// ties. This can happen with the "start first" service
// update mode.
if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] &&
numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] {
continue
}

}
tasksByNode[task.NodeID] = task
}

return tasksByNode
}

func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int, rollback bool) {
if u.done || nodeCount > maxProgressBars {
return
}

if task.Status.Err != "" {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(task.NodeID),
Action: truncError(task.Status.Err),
})
return
}

if !terminalState(task.DesiredState) && !terminalState(task.Status.State) {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(task.NodeID),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
}
Loading