Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bbb6722
Update to containerd 7e3b7dead60d96e9a7b13b8813d1712c7761e327
Jun 19, 2017
39b778a
Update to OCI image-spec f03dbe35d449c54915d235f1a3cf8f585a24babe
Jun 15, 2017
543daf1
containerd: Fix a typo in a comment.
Jun 19, 2017
522e4b8
containerd: Implement ContainerStatuser
May 11, 2017
66d0b33
containerd: Rename containerAdapter.container -> spec
Jun 16, 2017
b142570
containerd: Drop c.task, it is only used for naming, so store the nam…
Jun 16, 2017
1b2aa14
containerd: Port to containerd client library.
Jun 6, 2017
6cbf81b
containerd: Use containerd.GenerateSpec
Jun 20, 2017
88a62bf
containerd: Use containerd.WithPullUnpack and containerd.WithNewRootFS
Jun 8, 2017
4d702a5
containerd: Rework and simplify mount handling
Jun 15, 2017
e59c131
containerd: use client lib for stdio
Jun 15, 2017
a0548ed
containerd: Inline specOpts into create
Jun 15, 2017
60883cc
containerd: Drop container specific directory
Jun 15, 2017
a7fdc45
containerd: rename adapter.create to adapter.prepare
Jun 20, 2017
e41100d
containerd: Check that the adapter has been "prepared" at entry points
Jun 20, 2017
21b8ccf
containerd: Ensure container is destroyed on prepare failure
Jun 20, 2017
90c3b22
containerd: Rediscover existing containers and tasks
Jun 20, 2017
bf4ee98
containerd: consolidate production of the logger
Jun 20, 2017
3a03878
containerd: Avoid direct use of container api, use methods on container
Jun 20, 2017
80c76b2
containerd: Use task.Start instead of open coding
Jun 20, 2017
16fb22b
containerd: move the container delete to the remove method
Jun 20, 2017
112d617
containerd: log tag which fits in with the others
Jun 21, 2017
1bcaaad
containerd: Do not shutdown task on OOM
Jun 21, 2017
236d1a6
containerd: Only return an error from adapter.shutdown()
Jun 21, 2017
585b3b1
containerd: Define a local struct as result of adapter.inspect()
Jun 21, 2017
a751488
containerd: Use Task.Status() instead of opencoded Info()
Jun 21, 2017
f2699f8
containerd: Stop shutting down the container in ContainerStatus()
Jun 21, 2017
8129a34
containerd: Do not shutdown container in Wait() method
Jun 21, 2017
cf2fe62
containerd: Make adapter.shutdown() non-idempotent
Jun 21, 2017
3b814d9
containerd: Use Task.Delete() in adapter.shutdown()
Jun 21, 2017
a214aeb
containerd: Use Task.Wait() rather than open coded event stream
Jun 22, 2017
5e20939
containerd: Drop github.com/docker/docker/pkg/mount.MergeTmpfsOptions
Jun 22, 2017
505f593
Update to containerd 76697ac8cbf357a19beb58e4805a81fe48cf7974
Jun 22, 2017
b7714c6
containerd: Require a namespace from the application.
Jun 23, 2017
33000f1
containerd: Wait indefinitely after sending fallback SIGKILL on shutd…
Jun 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
796 changes: 266 additions & 530 deletions agent/exec/containerd/adapter.go

Large diffs are not rendered by default.

114 changes: 34 additions & 80 deletions agent/exec/containerd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package containerd
import (
"fmt"

"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

type controller struct {
Expand All @@ -25,8 +24,8 @@ type controller struct {

var _ exec.Controller = &controller{}

func newController(conn *grpc.ClientConn, containerDir string, task *api.Task, secrets exec.SecretGetter) (exec.Controller, error) {
adapter, err := newContainerAdapter(conn, containerDir, task, secrets)
func newController(client *containerd.Client, task *api.Task, secrets exec.SecretGetter) (exec.Controller, error) {
adapter, err := newContainerAdapter(client, task, secrets)
if err != nil {
return nil, err
}
Expand All @@ -38,6 +37,29 @@ func newController(conn *grpc.ClientConn, containerDir string, task *api.Task, s
}, nil
}

// ContainerStatus returns the container-specific status for the task.
func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) {
ctnr, err := r.adapter.inspect(ctx)
if err != nil {
if isUnknownContainer(err) {
return nil, nil
}

return nil, err
}

status := &api.ContainerStatus{
ContainerID: ctnr.ID,
PID: int32(ctnr.Pid),
}

if ec, ok := ctnr.ExitStatus.(exec.ExitCoder); ok {
status.ExitCode = int32(ec.ExitCode())
}

return status, nil
}

// Update takes a recent task update and applies it to the container.
func (r *controller) Update(ctx context.Context, t *api.Task) error {
log.G(ctx).Warnf("task updates not yet supported")
Expand Down Expand Up @@ -105,7 +127,7 @@ func (r *controller) Prepare(ctx context.Context) error {
}
}

if err := r.adapter.create(ctx); err != nil {
if err := r.adapter.prepare(ctx); err != nil {
if isContainerCreateNameConflict(err) {
if _, err := r.adapter.inspect(ctx); err != nil {
return err
Expand Down Expand Up @@ -139,15 +161,15 @@ func (r *controller) Start(ctx context.Context) error {
//
// TODO(stevvooe): This is very racy. While reading inspect, another could
// start the process and we could end up starting it twice.
if ctnr.Status != task.StatusCreated {
if ctnr.Status != containerd.Created {
return exec.ErrTaskStarted
}

if err := r.adapter.start(ctx); err != nil {
return errors.Wrap(err, "starting container failed")
}

// TODO(ijc): Wait for HealtCheck to report OK.
// TODO(ijc): Wait for HealthCheck to report OK.

return nil
}
Expand All @@ -160,78 +182,10 @@ func (r *controller) Wait(ctx context.Context) error {
return err
}

// check the initial state and report that.
ctnr, err := r.adapter.inspect(ctx)
if err != nil {
return errors.Wrap(err, "inspecting container failed")
}

shutdownWithExitStatus := func(reason string) error {
exitStatus, err := r.adapter.shutdown(ctx)
if err != nil {
return err
}
log.G(ctx).Errorf("EXIT STATUS %v", exitStatus)
return makeExitError(exitStatus, reason)
}
switch ctnr.Status {
case task.StatusStopped:
return shutdownWithExitStatus("")
}

// We do not disable FailFast for this initial call (like we
// do on the retry below) since we are still halfway through
// setting up the container and if containerd goes away half
// way through we consider that a failure.
eventq, closed, err := r.adapter.events(ctx)
if err != nil {
return err
}

for {
select {
case event := <-eventq:
log.G(ctx).Debugf("Event: %v", event)

switch event.Type {
case task.Event_EXIT:
return shutdownWithExitStatus("")
case task.Event_OOM:
return shutdownWithExitStatus("Container OOMd")
case task.Event_CREATE, task.Event_START, task.Event_EXEC_ADDED, task.Event_PAUSED:
continue
default:
return errors.Errorf("Unknown event type %s\n", event.Type.String())
}
case <-closed:
// restart!
log.G(ctx).Debugf("Restarting event stream")
// We disable FailFast for this call so that gRPC will keep
// retrying while we wait for containerd to come back. Otherwise
// a temporary glitch in the connection (e.g. a containerd restart)
// will result in the task being declared dead even though it is
// likely to be recoverable.
eventq, closed, err = r.adapter.events(ctx, grpc.FailFast(false))
if err != nil {
return err
}

// recheck the container state, if this fails then we may have missed a
ctnr, err := r.adapter.inspect(ctx)
if err != nil {
return errors.Wrap(err, "inspecting container on event restart failed")
}
switch ctnr.Status {
case task.StatusStopped:
return shutdownWithExitStatus("container had exited after event stream restart")
}

case <-ctx.Done():
return ctx.Err()
case <-r.closed:
return r.err
}
}
// TODO(ijc) HealthCheck
// TODO(ijc) Underlying wait is racy
// TODO(ijc) Underlying wait does not handle restart
return r.adapter.wait(ctx)
}

// Shutdown the container cleanly.
Expand All @@ -246,7 +200,7 @@ func (r *controller) Shutdown(ctx context.Context) error {
r.cancelPull()
}

if _, err := r.adapter.shutdown(ctx); err != nil {
if err := r.adapter.shutdown(ctx); err != nil {
if isUnknownContainer(err) {
return nil
}
Expand Down
45 changes: 10 additions & 35 deletions agent/exec/containerd/executor.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package containerd

import (
"net"
"os"
"path/filepath"
"runtime"
"time"

"github.com/containerd/containerd"
"github.com/docker/docker/pkg/sysinfo"
"github.com/docker/docker/pkg/system"
"github.com/docker/swarmkit/agent/exec"
Expand All @@ -15,53 +13,30 @@ import (
"github.com/docker/swarmkit/log"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)

type executor struct {
conn *grpc.ClientConn
client *containerd.Client
secrets exec.SecretsManager
genericResources []*api.GenericResource
containerDir string
}

var _ exec.Executor = &executor{}

func getGRPCConnection(ctx context.Context, sock string) (*grpc.ClientConn, error) {
grpclog.SetLogger(log.G(ctx))

dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second)}
dialOpts = append(dialOpts,
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", sock, timeout)
},
))

conn, err := grpc.Dial("unix://"+sock, dialOpts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", sock)
}

return conn, nil
}

// NewExecutor returns an executor using the given containerd control socket
func NewExecutor(sock, stateDir string, genericResources []*api.GenericResource) (exec.Executor, error) {
ctx := log.WithModule(context.Background(), "containerd")

conn, err := getGRPCConnection(ctx, sock)
func NewExecutor(sock, namespace string, genericResources []*api.GenericResource) (exec.Executor, error) {
if namespace == "" {
return nil, errors.New("A containerd namespace is required")
}
client, err := containerd.New(sock, containerd.WithDefaultNamespace(namespace))
if err != nil {
return nil, err
return nil, errors.Wrap(err, "creating containerd client")
}

containerDir := filepath.Join(stateDir, "containers")

return &executor{
conn: conn,
client: client,
secrets: secrets.NewManager(),
genericResources: genericResources,
containerDir: containerDir,
}, nil
}

Expand Down Expand Up @@ -104,7 +79,7 @@ func (e *executor) Configure(ctx context.Context, node *api.Node) error {

// Controller returns a docker container controller.
func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
ctlr, err := newController(e.conn, e.containerDir, t, secrets.Restrict(e.secrets, t))
ctlr, err := newController(e.client, t, secrets.Restrict(e.secrets, t))
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/swarmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ var (
if err != nil {
return err
}
containerdNamespace, err := cmd.Flags().GetString("containerd-namespace")
if err != nil {
return err
}

autolockManagers, err := cmd.Flags().GetBool("autolock")
if err != nil {
Expand Down Expand Up @@ -181,8 +185,8 @@ var (
var executor exec.Executor

if containerdAddr != "" {
logrus.Infof("Using containerd via %s", containerdAddr)
executor, err = containerd.NewExecutor(containerdAddr, stateDir, resources)
logrus.Infof("Using containerd via %q with namespace %q", containerdAddr, containerdNamespace)
executor, err = containerd.NewExecutor(containerdAddr, containerdNamespace, resources)
if err != nil {
return err
}
Expand Down Expand Up @@ -275,6 +279,7 @@ func init() {
mainCmd.Flags().StringP("join-token", "", "", "Specifies the secret token required to join the cluster")
mainCmd.Flags().String("engine-addr", "unix:///var/run/docker.sock", "Address of engine instance of agent.")
mainCmd.Flags().String("containerd-addr", "", "Address of containerd instance of agent.")
mainCmd.Flags().String("containerd-namespace", "swarmd", "Namespace to use when using containerd agent.")
mainCmd.Flags().String("hostname", "", "Override reported agent hostname")
mainCmd.Flags().String("advertise-remote-api", "", "Advertise address for remote API")
mainCmd.Flags().String("listen-remote-api", "0.0.0.0:4242", "Listen address for remote API")
Expand Down
8 changes: 3 additions & 5 deletions vendor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ github.com/docker/libnetwork 37e20af882e13dd01ade3658b7aabdae3412118b
github.com/docker/libtrust 9cbd2a1374f46905c68a4eb3694a130610adc62a
github.com/opencontainers/runc b6b70e53451794e8333e9b602cc096b47a20bd0f
github.com/opencontainers/go-digest a6d0ee40d4207ea02364bd3b9e8e77b9159ba1eb
github.com/opencontainers/image-spec f03dbe35d449c54915d235f1a3cf8f585a24babe
github.com/opencontainers/image-spec 372ad780f63454fbbbbcc7cf80e5b90245c13e13

# containerd executor
github.com/containerd/containerd 7fc91b05917e93d474fab9465547d44eacd10ce3
github.com/containerd/continuity f4ad4294c92f596c9241947c416d1297f9faf3ea
github.com/containerd/containerd 76697ac8cbf357a19beb58e4805a81fe48cf7974
github.com/containerd/fifo 69b99525e472735860a5269b75af1970142b3062
github.com/opencontainers/runtime-spec v1.0.0-rc5
github.com/nightlyone/lockfile 1d49c987357a327b5b03aa84cbddd582c328615d
golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c

github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
Expand All @@ -52,7 +50,7 @@ github.com/hashicorp/golang-lru a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4
github.com/inconshreveable/mousetrap 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75
github.com/phayes/permbits f7e3ac5e859d0b919c5068d581cc4c5d4f4f9bc5
github.com/pivotal-golang/clock 3fd3c1944c59d9742e1cd333672181cd1a6f9fa0
github.com/pkg/errors 01fa4104b9c248c8945d14d9f128454d5b28d595
github.com/pkg/errors 645ef00459ed84a119197bfb8d8205042c6df63d
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
github.com/rcrowley/go-metrics 51425a2415d21afadfd55cd93432c0bc69e9598d
github.com/spf13/cobra 8e91712f174ced10270cf66615e0a9127e7c4de5
Expand Down
Loading