diff --git a/agent/agent.go b/agent/agent.go index 8f686a0a07..a13c5970ec 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -365,7 +365,7 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api err = nil // dispatcher no longer cares about this task. } else { log.G(ctx).WithError(err).Error("closing session after fatal error") - session.close() + session.errs <- err } } else { log.G(ctx).Debug("task status reported") diff --git a/agent/reporter.go b/agent/reporter.go index eac4b3267a..9ec779262c 100644 --- a/agent/reporter.go +++ b/agent/reporter.go @@ -3,12 +3,15 @@ package agent import ( "reflect" "sync" + "time" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "golang.org/x/net/context" ) +const updateDelay = 4 * time.Second + // StatusReporter receives updates to task status. Method may be called // concurrently, so implementations should be goroutine-safe. type StatusReporter interface { @@ -92,10 +95,26 @@ func (sr *statusReporter) run(ctx context.Context) { }() for { - if len(sr.statuses) == 0 { - sr.cond.Wait() + exitCh := make(chan struct{}) + + if len(sr.statuses) != 0 { + // if it's retry, then wait some time if there is no new updates reported + // it helps to keep retry loop less tight on update failures + go func() { + after := time.NewTimer(updateDelay) + defer after.Stop() + select { + case <-after.C: + sr.cond.Signal() + case <-exitCh: + } + }() } + sr.cond.Wait() + // exit timer goroutine if there was one + close(exitCh) + if sr.closed { // TODO(stevvooe): Add support here for waiting until all // statuses are flushed before shutting down. @@ -123,6 +142,9 @@ func (sr *statusReporter) run(ctx context.Context) { if _, ok := sr.statuses[taskID]; !ok { sr.statuses[taskID] = status } + // if task update failed - it's better to give some time to agent + // to recover from possible error before try to update other tasks + break } } }