Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func (a *Agent) run(ctx context.Context) {
select {
case operation := <-sessionq:
operation.response <- operation.fn(session)
case msg := <-session.tasks:
if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
log.G(ctx).WithError(err).Error("task assignment failed")
case msg := <-session.assignments:
if err := a.worker.Update(ctx, msg.UpdateTasks, msg.RemoveTasks, msg.Type); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These leaks the assignment message type into the worker model.

This should be something like the following:

switch msg.Type {
case COMPLETE:
   a.worker.Assign(...)
case INCREMENTAL:
   a.worker.Update(...)
}

log.G(ctx).WithError(err).Error("failed to update worker assignments")
}
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
Expand Down
61 changes: 41 additions & 20 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/picker"
Expand Down Expand Up @@ -31,26 +32,26 @@ type session struct {
conn *grpc.ClientConn
addr string

agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
tasks chan *api.TasksMessage
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage

registered chan struct{} // closed registration
closed chan struct{}
}

func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string) *session {
s := &session{
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
tasks: make(chan *api.TasksMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
}
peer, err := agent.config.Managers.Select()
if err != nil {
Expand Down Expand Up @@ -215,22 +216,42 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
}

func (s *session) watch(ctx context.Context) error {
log.G(ctx).Debugf("(*session).watch")
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
log.Debugf("")
var (
tasksWatch api.Dispatcher_TasksClient
resp *api.AssignmentsMessage
)
client := api.NewDispatcherClient(s.conn)
watch, err := client.Tasks(ctx, &api.TasksRequest{
SessionID: s.sessionID})
assignmentWatch, err := client.Assignments(ctx, &api.AssignmentsRequest{SessionID: s.sessionID})
if err != nil {
return err
}

for {
resp, err := watch.Recv()
if err != nil {
return err
if assignmentWatch != nil {
resp, err = assignmentWatch.Recv()
// If we get a code = 12 desc = unknown method Assignments, try to use tasks
if err != nil && grpc.Code(err) == codes.Unimplemented {
log.WithError(err).Errorf("falling back to Tasks")
assignmentWatch = nil
tasksWatch, err = client.Tasks(ctx, &api.TasksRequest{SessionID: s.sessionID})
}
if err != nil {
return err
}
}
if tasksWatch != nil {
var taskResp *api.TasksMessage
taskResp, err = tasksWatch.Recv()
if err != nil {
return err
}
resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, UpdateTasks: taskResp.Tasks}
}

select {
case s.tasks <- resp:
case s.assignments <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
Expand Down
120 changes: 117 additions & 3 deletions agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ type Worker interface {
// Init prepares the worker for task assignment.
Init(ctx context.Context) error

// Assign the set of tasks to the worker. Tasks outside of this set will be
// removed.
Assign(ctx context.Context, tasks []*api.Task) error
// Update the set of tasks to the worker.
Update(ctx context.Context, added []*api.Task, removed []string, mode api.AssignmentsMessage_AssignmentType) error

// Listen to updates about tasks controlled by the worker. When first
// called, the reporter will receive all updates for all tasks controlled
Expand Down Expand Up @@ -171,6 +170,121 @@ func (w *worker) Assign(ctx context.Context, tasks []*api.Task) error {
return tx.Commit()
}

// Update the set of tasks to the worker.
// Tasks in the added set will be added to the worker, and tasks in the removed set
// will be removed from the worker
func (w *worker) Update(ctx context.Context, added []*api.Task, removed []string, mode api.AssignmentsMessage_AssignmentType) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we refactor Assign to leverage these components?

w.mu.Lock()
defer w.mu.Unlock()

tx, err := w.db.Begin(true)
if err != nil {
log.G(ctx).WithError(err).Error("failed starting transaction against task database")
return err
}
defer tx.Rollback()

log.G(ctx).WithFields(logrus.Fields{
"len(added)": len(added),
"len(removed)": len(removed),
"mode": mode.String(),
}).Debug("(*worker).Update")
assigned := map[string]struct{}{}

for _, task := range added {
log.G(ctx).WithFields(
logrus.Fields{
"task.id": task.ID,
"task.desiredstate": task.DesiredState}).Debug("assigned")
if err := PutTask(tx, task); err != nil {
return err
}

if err := SetTaskAssignment(tx, task.ID, true); err != nil {
return err
}

if mgr, ok := w.taskManagers[task.ID]; ok {
if err := mgr.Update(ctx, task); err != nil && err != ErrClosed {
log.G(ctx).WithError(err).Error("failed updating assigned task")
}
} else {
// we may have still seen the task, let's grab the status from
// storage and replace it with our status, if we have it.
status, err := GetTaskStatus(tx, task.ID)
if err != nil {
if err != errTaskUnknown {
return err
}

// never seen before, register the provided status
if err := PutTaskStatus(tx, task.ID, &task.Status); err != nil {
return err
}

status = &task.Status
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not find usage of status from this line.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copied from above. It looks like it was supposed to be this:

if status, err := GetTaskStatus(tx, task.ID); err != nil {
  if err != errTaskUnknown {
    return err
  }

  if err := PutTaskStatus(tx, task.ID, &task.Status); err != nil {
    return err
  }
} else {
  task.Status = status
}

} else {
task.Status = *status // overwrite the stale manager status with ours.
}

w.startTask(ctx, tx, task)
}

assigned[task.ID] = struct{}{}
}

closeManager := func(tm *taskManager) {
// when a task is no longer assigned, we shutdown the task manager for
// it and leave cleanup to the sweeper.
if err := tm.Close(); err != nil {
log.G(ctx).WithError(err).Error("error closing task manager")
}
}

removeTaskAssignment := func(taskID string) error {
ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", taskID))
if err := SetTaskAssignment(tx, taskID, false); err != nil {
log.G(ctx).WithError(err).Error("error setting task assignment in database")
}
return err
}

// If this was a complete set of assignments, we're going to remove all the remaining
// tasks.
if mode == api.AssignmentsMessage_COMPLETE {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch mode { }?

for id, tm := range w.taskManagers {
if _, ok := assigned[id]; ok {
continue
}

err := removeTaskAssignment(id)
if err == nil {
delete(w.taskManagers, id)
go closeManager(tm)
}
}
}

// If this was an incremental set of assignments, we're going to remove only the tasks
// in the removed set
if mode == api.AssignmentsMessage_INCREMENTAL {
for _, taskID := range removed {
err := removeTaskAssignment(taskID)
if err != nil {
continue
}

tm, ok := w.taskManagers[taskID]
if ok {
delete(w.taskManagers, taskID)
go closeManager(tm)
}
}
}

return tx.Commit()
}

func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down
Loading