From 63187afb5f28dd52d38b04927afeef9177ba9d46 Mon Sep 17 00:00:00 2001 From: Nalin Dahyabhai Date: Fri, 28 Oct 2016 15:57:39 -0400 Subject: [PATCH] Attempt to mimic changes from upstream PR#27467 Problem: if a process that's been started for 'docker exec' exits fast enough, the daemon can receive a 'process exited' update from containerd before it starts passing stdio data back and forth, losing its output. Why it happens: the 'process exited' state change message is only processed after acquiring a lock for the container, and while most of the exec() setup is done while holding that lock, the lock is freed when libcontainerd calls back to set up the passing of stdio data. Proposed change: call getExecConfig() before AddProcess(), and modify AddProcess() to take a callback that the daemon can hand it that remembers the exec.Config value that was retrieved. The effect should be the same as that of the changes in https://github.com/docker/docker/pull/27467, but without as much refactoring. Signed-off-by: Nalin Dahyabhai --- daemon/exec.go | 2 +- daemon/monitor.go | 49 ++++++++++++++++++++++++++++-- libcontainerd/client_linux.go | 10 ++---- libcontainerd/client_solaris.go | 6 +++- libcontainerd/client_windows.go | 4 +-- libcontainerd/container_linux.go | 2 +- libcontainerd/container_windows.go | 2 +- libcontainerd/types.go | 6 ++-- 8 files changed, 64 insertions(+), 17 deletions(-) diff --git a/daemon/exec.go b/daemon/exec.go index d57b6875d8b8a..ce1ea6eb91873 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -204,7 +204,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys) - if err := d.containerd.AddProcess(ctx, c.ID, name, p); err != nil { + if err := d.containerd.AddProcess(ctx, c.ID, name, p, d.AttachExecStreams(ec)); err != nil { return err } diff --git a/daemon/monitor.go b/daemon/monitor.go index 1f97efb4726f2..2da5da57f240a 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -8,6 +8,7 @@ import ( "strconv" "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/exec" "github.com/docker/docker/libcontainerd" "github.com/docker/docker/runconfig" ) @@ -101,10 +102,18 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { return nil } -// AttachStreams is called by libcontainerd to connect the stdio. -func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error { +// AttachStreams is called by libcontainerd to connect the stdio to a container. +func (daemon *Daemon) AttachContainerStreams(id string, iop libcontainerd.IOPipe) error { var s *runconfig.StreamConfig c := daemon.containers.Get(id) + + // Ensure we're only called for the container's main process. There's + // logic to be simplified in this function after this, but we're trying + // to keep the patch small. + if c == nil { + return fmt.Errorf("no such container: %s", id) + } + if c == nil { ec, err := daemon.getExecConfig(id) if err != nil { @@ -154,3 +163,39 @@ func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error { return nil } + +// AttachExecStreams is used as a callback for libcontainerd to connect the +// stdio streams of an exec process to a dockerd client, using the StreamConfig +// in the exec.Config rather than the container's main PID's stdio. +func (daemon *Daemon) AttachExecStreams(config *exec.Config) func(id string, iop libcontainerd.IOPipe) error { + return func(id string, iop libcontainerd.IOPipe) error { + s := config.StreamConfig + copyFunc := func(w io.Writer, r io.Reader) { + s.Add(1) + go func() { + if _, err := io.Copy(w, r); err != nil { + logrus.Errorf("%v stream copy error: %v", id, err) + } + s.Done() + }() + } + + if iop.Stdout != nil { + copyFunc(s.Stdout(), iop.Stdout) + } + if iop.Stderr != nil { + copyFunc(s.Stderr(), iop.Stderr) + } + + if stdin := s.Stdin(); stdin != nil { + if iop.Stdin != nil { + go func() { + io.Copy(iop.Stdin, stdin) + iop.Stdin.Close() + }() + } + } + + return nil + } +} diff --git a/libcontainerd/client_linux.go b/libcontainerd/client_linux.go index ef504cad5939e..e4108ab9cc808 100644 --- a/libcontainerd/client_linux.go +++ b/libcontainerd/client_linux.go @@ -30,7 +30,7 @@ type client struct { liveRestore bool } -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio ProcessStreamAttacher) error { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -98,13 +98,9 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly container.processes[processFriendlyName] = p - clnt.unlock(containerID) - - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { - clnt.lock(containerID) + if err := attachStdio(processFriendlyName, *iopipe); err != nil { return err } - clnt.lock(containerID) return nil } @@ -422,7 +418,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev return err } - if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil { + if err := clnt.backend.AttachContainerStreams(containerID, *iopipe); err != nil { return err } diff --git a/libcontainerd/client_solaris.go b/libcontainerd/client_solaris.go index 1c14d301b5925..b755f0d72022b 100644 --- a/libcontainerd/client_solaris.go +++ b/libcontainerd/client_solaris.go @@ -8,7 +8,7 @@ type client struct { // Platform specific properties below here. } -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio ProcessStreamAttacher) error { return nil } @@ -32,6 +32,10 @@ func (clnt *client) Resume(containerID string) error { return nil } +func (clnt *client) SignalProcess(containerID string, processFriendlyName string, sig int) error { + return nil +} + func (clnt *client) Stats(containerID string) (*Stats, error) { return nil, nil } diff --git a/libcontainerd/client_windows.go b/libcontainerd/client_windows.go index 431574a4d3f83..ea3d00c5f694d 100644 --- a/libcontainerd/client_windows.go +++ b/libcontainerd/client_windows.go @@ -171,7 +171,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio // AddProcess is the handler for adding a process to an already running // container. It's called through docker exec. -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio ProcessStreamAttacher) error { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -254,7 +254,7 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly clnt.unlock(containerID) // Tell the engine to attach streams back to the client - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { + if err := attachStdio(processFriendlyName, *iopipe); err != nil { clnt.lock(containerID) return err } diff --git a/libcontainerd/container_linux.go b/libcontainerd/container_linux.go index 454478b5c2a03..1ac68981407ab 100644 --- a/libcontainerd/container_linux.go +++ b/libcontainerd/container_linux.go @@ -116,7 +116,7 @@ func (ctr *container) start() error { } ctr.startedAt = time.Now() - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := ctr.client.backend.AttachContainerStreams(ctr.containerID, *iopipe); err != nil { return err } ctr.systemPid = systemPid(resp.Container) diff --git a/libcontainerd/container_windows.go b/libcontainerd/container_windows.go index 31c2227df0ae8..7944be41e85a7 100644 --- a/libcontainerd/container_windows.go +++ b/libcontainerd/container_windows.go @@ -142,7 +142,7 @@ func (ctr *container) start() error { ctr.client.appendContainer(ctr) - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := ctr.client.backend.AttachContainerStreams(ctr.containerID, *iopipe); err != nil { // OK to return the error here, as waitExit will handle tear-down in HCS return err } diff --git a/libcontainerd/types.go b/libcontainerd/types.go index 6f452c1c3b5c0..66060ddd2f3e0 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -31,15 +31,17 @@ type CommonStateInfo struct { // FIXME: event? // Backend defines callbacks that the client of the library needs to implement. type Backend interface { StateChanged(containerID string, state StateInfo) error - AttachStreams(processFriendlyName string, io IOPipe) error + AttachContainerStreams(processFriendlyName string, io IOPipe) error } +type ProcessStreamAttacher func(processFriendlyName string, io IOPipe) error + // Client provides access to containerd features. type Client interface { Create(containerID string, spec Spec, options ...CreateOption) error Signal(containerID string, sig int) error SignalProcess(containerID string, processFriendlyName string, sig int) error - AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) error + AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio ProcessStreamAttacher) error Resize(containerID, processFriendlyName string, width, height int) error Pause(containerID string) error Resume(containerID string) error