Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"github.com/docker/go-events"
events "github.com/docker/go-events"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change shouldn't be necessary.

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/allocator/cnmallocator"
Expand Down Expand Up @@ -529,6 +529,7 @@ func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly boo
nc = a.netCtx
tasks []*api.Task
allocatedTasks []*api.Task
errorTasks []*api.Task
err error
)
a.store.View(func(tx store.ReadTx) {
Expand Down Expand Up @@ -587,14 +588,16 @@ func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly boo
if err == nil {
allocatedTasks = append(allocatedTasks, t)
} else if err != errNoChanges {
errorTasks = append(errorTasks, t)
log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
updateTaskError(t, fmt.Sprintf("failed allocating task during init: %s", err))
nc.unallocatedTasks[t.ID] = t
}
}

if err := a.store.Batch(func(batch *store.Batch) error {
for _, t := range allocatedTasks {
if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
for _, t := range append(allocatedTasks, errorTasks...) {
if err := a.commitTask(ctx, batch, t); err != nil {
log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
}
}
Expand Down Expand Up @@ -1001,7 +1004,7 @@ func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
return nil
}

func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
func (a *Allocator) commitTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
return batch.Update(func(tx store.Tx) error {
err := store.UpdateTask(tx, t)

Expand Down Expand Up @@ -1106,26 +1109,29 @@ func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
quiet = true
}
allocatedTasks := make([]*api.Task, 0, len(toAllocate))
errorTasks := make([]*api.Task, 0, len(toAllocate))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do tasks get added to this slice?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, forgot to update this code block. Fixed.


for _, t := range toAllocate {
if err := a.allocateTask(ctx, t); err == nil {
allocatedTasks = append(allocatedTasks, t)
} else if err != errNoChanges {
errorTasks = append(errorTasks, t)
if quiet {
log.G(ctx).WithError(err).Debug("task allocation failure")
} else {
log.G(ctx).WithError(err).Error("task allocation failure")
}
updateTaskError(t, fmt.Sprintf("task allocation failure: %s", err))
}
}

if len(allocatedTasks) == 0 {
if len(allocatedTasks) == 0 && len(errorTasks) == 0 {
return
}

err := a.store.Batch(func(batch *store.Batch) error {
for _, t := range allocatedTasks {
err := a.commitAllocatedTask(ctx, batch, t)
for _, t := range append(allocatedTasks, errorTasks...) {
err := a.commitTask(ctx, batch, t)
if err != nil {
log.G(ctx).WithError(err).Error("task allocation commit failure")
continue
Expand Down Expand Up @@ -1157,13 +1163,19 @@ func PredefinedNetworks() []networkallocator.PredefinedNetworkData {
return cnmallocator.PredefinedNetworks()
}

// updateTaskStatus sets TaskStatus and updates timestamp.
// updateTaskStatus sets TaskStatus and message and updates timestamp.
func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
t.Status.State = newStatus
t.Status.Message = message
t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
}

// updateTaskError sets the task error and updates timestamp.
func updateTaskError(t *api.Task, message string) {
t.Status.Err = message
t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
}

// IsIngressNetwork returns whether the passed network is an ingress network.
func IsIngressNetwork(nw *api.Network) bool {
return networkallocator.IsIngressNetwork(nw)
Expand Down