From ce4f347898357e7dcf025409716a44789b1a7f5a Mon Sep 17 00:00:00 2001 From: Kevin Parsons Date: Tue, 11 May 2021 10:19:02 -0700 Subject: [PATCH] internal/cmd: Close individual IO pipes when the relay finishes The shim is expected to close its end of the IO pipes from the gcs when it is done using them. This is done to ensure that no data is left buffered in the pipes on the gcs's end. Previously, this was accomplished via the ioChannel closing its underlying connection if Read returned EOF. However, this is not sufficiently robust, as it will not work in cases where the shim's IO relay breaks on the write end (e.g. if CRI has gone away). To resolve this, we now expose individual methods on cow.Process to close each IO pipe (in/out/err), and call those from the Cmd implementation once the IO relay completes. This should be a good first-pass fix here, until we can apply some more focused cleanup to the IO relay code in the future. Some minor renaming/cleanup as well. Signed-off-by: Kevin Parsons --- internal/cmd/cmd.go | 23 ++++++++++----- internal/cmd/cmd_test.go | 8 +++++ internal/cow/cow.go | 6 ++++ internal/gcs/iochannel.go | 9 +----- internal/gcs/process.go | 31 +++++++++++++++---- internal/hcs/process.go | 49 +++++++++++++++++++++++++++++++ internal/jobcontainers/process.go | 14 +++++++++ 7 files changed, 118 insertions(+), 22 deletions(-) diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index eea00e8796..685e703c3d 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -101,6 +101,7 @@ func Command(host cow.ProcessHost, name string, arg ...string) *Cmd { Spec: &specs.Process{ Args: append([]string{name}, arg...), }, + Log: logrus.NewEntry(logrus.StandardLogger()), ExitState: &ExitState{}, } if host.OS() == "windows" { @@ -120,7 +121,8 @@ func CommandContext(ctx context.Context, host cow.ProcessHost, name string, arg return cmd } -func copyAndLog(w io.Writer, r io.Reader, log *logrus.Entry, name string) (int64, error) { +// relayIO is a glorified io.Copy that also logs when the copy has completed. +func relayIO(w io.Writer, r io.Reader, log *logrus.Entry, name string) (int64, error) { n, err := io.Copy(w, r) if log != nil { lvl := logrus.DebugLevel @@ -132,7 +134,7 @@ func copyAndLog(w io.Writer, r io.Reader, log *logrus.Entry, name string) (int64 lvl = logrus.ErrorLevel log = log.WithError(err) } - log.Log(lvl, "command copy complete") + log.Log(lvl, "Cmd IO relay complete") } return n, err } @@ -207,7 +209,7 @@ func (c *Cmd) Start() error { // us or the caller to reliably unblock the c.Stdin read when the // process exits. go func() { - _, err := copyAndLog(stdin, c.Stdin, c.Log, "stdin") + _, err := relayIO(stdin, c.Stdin, c.Log, "stdin") // Report the stdin copy error. If the process has exited, then the // caller may never see it, but if the error was due to a failure in // stdin read, then it is likely the process is still running. @@ -215,23 +217,28 @@ func (c *Cmd) Start() error { c.stdinErr.Store(err) } // Notify the process that there is no more input. - err = p.CloseStdin(context.TODO()) - if err != nil && c.Log != nil { - c.Log.WithError(err).Warn("failed to close pod stdin") + if err := p.CloseStdin(context.TODO()); err != nil && c.Log != nil { + c.Log.WithError(err).Warn("failed to close Cmd stdin") } }() } if c.Stdout != nil { c.iogrp.Go(func() error { - _, err := copyAndLog(c.Stdout, stdout, c.Log, "stdout") + _, err := relayIO(c.Stdout, stdout, c.Log, "stdout") + if err := p.CloseStdout(context.TODO()); err != nil { + c.Log.WithError(err).Warn("failed to close Cmd stdout") + } return err }) } if c.Stderr != nil { c.iogrp.Go(func() error { - _, err := copyAndLog(c.Stderr, stderr, c.Log, "stderr") + _, err := relayIO(c.Stderr, stderr, c.Log, "stderr") + if err := p.CloseStderr(context.TODO()); err != nil { + c.Log.WithError(err).Warn("failed to close Cmd stderr") + } return err }) } diff --git a/internal/cmd/cmd_test.go b/internal/cmd/cmd_test.go index e077ecf492..5a7b7b98c3 100644 --- a/internal/cmd/cmd_test.go +++ b/internal/cmd/cmd_test.go @@ -106,6 +106,14 @@ func (p *localProcess) CloseStdin(ctx context.Context) error { return p.stdin.Close() } +func (p *localProcess) CloseStdout(ctx context.Context) error { + return p.stdout.Close() +} + +func (p *localProcess) CloseStderr(ctx context.Context) error { + return p.stderr.Close() +} + func (p *localProcess) ExitCode() (int, error) { select { case <-p.ch: diff --git a/internal/cow/cow.go b/internal/cow/cow.go index 900d5fa8d7..27a62a7238 100644 --- a/internal/cow/cow.go +++ b/internal/cow/cow.go @@ -17,6 +17,12 @@ type Process interface { // CloseStdin causes the process's stdin handle to receive EOF/EPIPE/whatever // is appropriate to indicate that no more data is available. CloseStdin(ctx context.Context) error + // CloseStdout closes the stdout connection to the process. It is used to indicate + // that we are done receiving output on the shim side. + CloseStdout(ctx context.Context) error + // CloseStderr closes the stderr connection to the process. It is used to indicate + // that we are done receiving output on the shim side. + CloseStderr(ctx context.Context) error // Pid returns the process ID. Pid() int // Stdio returns the stdio streams for a process. These may be nil if a stream diff --git a/internal/gcs/iochannel.go b/internal/gcs/iochannel.go index 14a9077b2f..5af6b81aaf 100644 --- a/internal/gcs/iochannel.go +++ b/internal/gcs/iochannel.go @@ -1,7 +1,6 @@ package gcs import ( - "io" "net" ) @@ -56,13 +55,7 @@ func (c *ioChannel) Read(b []byte) (int, error) { if c.c == nil { return 0, c.err } - n, err := c.c.Read(b) - if err == io.EOF { - // Close the underlying connection so that the VM - // knows that all data has been read. - c.c.Close() - } - return n, err + return c.c.Read(b) } func (c *ioChannel) Write(b []byte) (int, error) { diff --git a/internal/gcs/process.go b/internal/gcs/process.go index f496522c87..628cb8b0d7 100644 --- a/internal/gcs/process.go +++ b/internal/gcs/process.go @@ -128,16 +128,13 @@ func (p *Process) Close() error { trace.StringAttribute("cid", p.cid), trace.Int64Attribute("pid", int64(p.id))) - err := p.stdin.Close() - if err != nil { + if err := p.stdin.Close(); err != nil { log.G(ctx).WithError(err).Warn("close stdin failed") } - err = p.stdout.Close() - if err != nil { + if err := p.stdout.Close(); err != nil { log.G(ctx).WithError(err).Warn("close stdout failed") } - err = p.stderr.Close() - if err != nil { + if err := p.stderr.Close(); err != nil { log.G(ctx).WithError(err).Warn("close stderr failed") } return nil @@ -158,6 +155,28 @@ func (p *Process) CloseStdin(ctx context.Context) (err error) { return p.stdinCloseWriteErr } +func (p *Process) CloseStdout(ctx context.Context) (err error) { + ctx, span := trace.StartSpan(ctx, "gcs::Process::CloseStdout") //nolint:ineffassign,staticcheck + defer span.End() + defer func() { oc.SetSpanStatus(span, err) }() + span.AddAttributes( + trace.StringAttribute("cid", p.cid), + trace.Int64Attribute("pid", int64(p.id))) + + return p.stdout.Close() +} + +func (p *Process) CloseStderr(ctx context.Context) (err error) { + ctx, span := trace.StartSpan(ctx, "gcs::Process::CloseStderr") //nolint:ineffassign,staticcheck + defer span.End() + defer func() { oc.SetSpanStatus(span, err) }() + span.AddAttributes( + trace.StringAttribute("cid", p.cid), + trace.Int64Attribute("pid", int64(p.id))) + + return p.stderr.Close() +} + // ExitCode returns the process's exit code, or an error if the process is still // running or the exit code is otherwise unknown. func (p *Process) ExitCode() (_ int, err error) { diff --git a/internal/hcs/process.go b/internal/hcs/process.go index 2707a1d214..bd34e37292 100644 --- a/internal/hcs/process.go +++ b/internal/hcs/process.go @@ -361,6 +361,55 @@ func (process *Process) CloseStdin(ctx context.Context) error { return nil } +func (process *Process) CloseStdout(ctx context.Context) (err error) { + ctx, span := trace.StartSpan(ctx, "hcs::Process::CloseStdout") //nolint:ineffassign,staticcheck + defer span.End() + defer func() { oc.SetSpanStatus(span, err) }() + span.AddAttributes( + trace.StringAttribute("cid", process.SystemID()), + trace.Int64Attribute("pid", int64(process.processID))) + + process.handleLock.Lock() + defer process.handleLock.Unlock() + + if process.handle == 0 { + return nil + } + + process.stdioLock.Lock() + defer process.stdioLock.Unlock() + if process.stdout != nil { + process.stdout.Close() + process.stdout = nil + } + return nil +} + +func (process *Process) CloseStderr(ctx context.Context) (err error) { + ctx, span := trace.StartSpan(ctx, "hcs::Process::CloseStderr") //nolint:ineffassign,staticcheck + defer span.End() + defer func() { oc.SetSpanStatus(span, err) }() + span.AddAttributes( + trace.StringAttribute("cid", process.SystemID()), + trace.Int64Attribute("pid", int64(process.processID))) + + process.handleLock.Lock() + defer process.handleLock.Unlock() + + if process.handle == 0 { + return nil + } + + process.stdioLock.Lock() + defer process.stdioLock.Unlock() + if process.stderr != nil { + process.stderr.Close() + process.stderr = nil + + } + return nil +} + // Close cleans up any state associated with the process but does not kill // or wait on it. func (process *Process) Close() (err error) { diff --git a/internal/jobcontainers/process.go b/internal/jobcontainers/process.go index 8d8e9e0f3b..2cccc8a1d3 100644 --- a/internal/jobcontainers/process.go +++ b/internal/jobcontainers/process.go @@ -95,6 +95,20 @@ func (p *JobProcess) CloseStdin(ctx context.Context) error { return p.stdin.Close() } +// CloseStdout closes the stdout pipe of the process. +func (p *JobProcess) CloseStdout(ctx context.Context) error { + p.stdioLock.Lock() + defer p.stdioLock.Unlock() + return p.stdout.Close() +} + +// CloseStderr closes the stderr pipe of the process. +func (p *JobProcess) CloseStderr(ctx context.Context) error { + p.stdioLock.Lock() + defer p.stdioLock.Unlock() + return p.stderr.Close() +} + // Wait waits for the process to exit. If the process has already exited returns // the previous error (if any). func (p *JobProcess) Wait() error {