Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 53 additions & 10 deletions agent/exec/container/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package container

import (
"fmt"
"sync"

engineapi "github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
Expand All @@ -13,6 +14,14 @@ import (
"golang.org/x/net/context"
)

type controllerState int

const (
stateNew controllerState = iota
stateCreated
stateRemoved
)

// controller implements agent.Controller against docker's API.
//
// Most operations against docker's API are done through the container name,
Expand All @@ -23,6 +32,8 @@ type controller struct {
adapter *containerAdapter
closed chan struct{}
err error
state controllerState
mtx sync.Mutex
}

var _ exec.Controller = &controller{}
Expand All @@ -39,6 +50,7 @@ func newController(client engineapi.APIClient, task *api.Task) (exec.Controller,
task: task,
adapter: adapter,
closed: make(chan struct{}),
state: stateNew,
}, nil
}

Expand Down Expand Up @@ -76,16 +88,6 @@ func (r *controller) Prepare(ctx context.Context) error {
return err
}

// Make sure all the networks that the task needs are created.
if err := r.adapter.createNetworks(ctx); err != nil {
return err
}

// Make sure all the volumes that the task needs are created.
if err := r.adapter.createVolumes(ctx, r.client); err != nil {
return err
}

if err := r.adapter.pullImage(ctx); err != nil {
// NOTE(stevvooe): We always try to pull the image to make sure we have
// the most up to date version. This will return an error, but we only
Expand All @@ -101,6 +103,27 @@ func (r *controller) Prepare(ctx context.Context) error {
log.G(ctx).WithError(err).Error("pulling image failed")
}

// Make sure the controller has not been removed. Because we
// might lost race to a remover.
r.mtx.Lock()
defer r.mtx.Unlock()

if r.checkState(stateRemoved) {
return exec.ErrControllerRemoved
}

r.updateState(stateCreated)

// Make sure all the networks that the task needs are created.
if err := r.adapter.createNetworks(ctx); err != nil {
return err
}

// Make sure all the volumes that the task needs are created.
if err := r.adapter.createVolumes(ctx, r.client); err != nil {
return err
}

if err := r.adapter.create(ctx); err != nil {
if isContainerCreateNameConflict(err) {
if _, err := r.adapter.inspect(ctx); err != nil {
Expand Down Expand Up @@ -322,6 +345,16 @@ func (r *controller) Remove(ctx context.Context) error {
return err
}

r.mtx.Lock()
defer r.mtx.Unlock()

if !r.checkState(stateCreated) {
return nil //nothing to remove
}

//Transition to removed state no matter it will succeed or fail.
r.updateState(stateRemoved)

// It may be necessary to shut down the task before removing it.
if err := r.Shutdown(ctx); err != nil {
if isUnknownContainer(err) {
Expand Down Expand Up @@ -389,6 +422,16 @@ func (r *controller) checkClosed() error {
}
}

//Caller should lock if necessary
func (r *controller) checkState(desiredState controllerState) bool {
return r.state == desiredState
}

//Caller should lock if necessary
func (r *controller) updateState(newState controllerState) {
r.state = newState
}

type exitError struct {
code int
cause error
Expand Down
3 changes: 3 additions & 0 deletions agent/exec/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var (
// ErrControllerClosed returned when a task controller has been closed.
ErrControllerClosed = errors.New("exec: controller closed")

// ErrControllerRemoved returned when a task controller has been removed.
ErrControllerRemoved = errors.New("exec: controller removed")

// ErrTaskRetry is returned by Do when an operation failed by should be
// retried. The status should still be reported in this case.
ErrTaskRetry = errors.New("exec: task retry")
Expand Down
9 changes: 5 additions & 4 deletions agent/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,6 @@ func (tm *taskManager) run(ctx context.Context) {
}
}
case <-shutdown:
if cancel != nil {
// cancel outstanding operation.
cancel()
}

// TODO(stevvooe): This should be left for the repear.

Expand All @@ -213,6 +209,11 @@ func (tm *taskManager) run(ctx context.Context) {
log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
}

if cancel != nil {
// cancel outstanding operation.
cancel()
}

if err := tm.ctlr.Close(); err != nil {
log.G(ctx).WithError(err).Error("error closing controller")
}
Expand Down