From 4b456f17f4cf708a5b84bf8f3d594165a4ccaaf8 Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Thu, 30 Jul 2020 23:44:42 +0000 Subject: [PATCH 1/5] wrap errors from executor Run/Exec to allow access to exit code Signed-off-by: Cory Bennett --- executor/containerdexecutor/executor.go | 111 ++++++++++++++---------- executor/executor.go | 30 +++++++ executor/runcexecutor/executor.go | 26 ++++-- 3 files changed, 118 insertions(+), 49 deletions(-) diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 83de3b143bff..214bec6213c5 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -172,6 +172,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Moun return err } defer cleanup() + spec.Process.Terminal = meta.Tty container, err := w.client.NewContainer(ctx, id, containerd.WithSpec(spec), @@ -195,59 +196,24 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Moun if err != nil { return err } + defer func() { if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil { err = errors.Wrapf(err1, "failed to delete task %s", id) } }() - if err := task.Start(ctx); err != nil { - return err - } - - if started != nil { + err = w.runProcess(ctx, task, process.Resize, func() { startedOnce.Do(func() { - close(started) - }) - } - statusCh, err := task.Wait(context.Background()) - if err != nil { - return err - } - - var cancel func() - ctxDone := ctx.Done() - for { - select { - case <-ctxDone: - ctxDone = nil - var killCtx context.Context - killCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) - task.Kill(killCtx, syscall.SIGKILL) - case status := <-statusCh: - if cancel != nil { - cancel() - } - if status.ExitCode() != 0 { - var err error - if status.ExitCode() == containerd.UnknownExitStatus && status.Error() != nil { - err = errors.Wrap(status.Error(), "failure waiting for process") - } else { - err = errors.Errorf("process returned non-zero exit code: %d", status.ExitCode()) - } - select { - case <-ctx.Done(): - err = errors.Wrap(ctx.Err(), err.Error()) - default: - } - return err + if started != nil { + close(started) } - return nil - } - } + }) + }) + return err } -func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { +func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) { meta := process.Meta // first verify the container is running, if we get an error assume the container @@ -329,5 +295,62 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut if err != nil { return errors.WithStack(err) } - return taskProcess.Start(ctx) + + return w.runProcess(ctx, taskProcess, process.Resize, nil) +} + +func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) (err error) { + err = p.Start(ctx) + if err != nil { + return err + } + + if started != nil { + started() + } + + statusCh, err := p.Wait(context.Background()) + if err != nil { + return err + } + + var cancel func() + ctxDone := ctx.Done() + for { + select { + case <-ctxDone: + ctxDone = nil + var killCtx context.Context + killCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + p.Kill(killCtx, syscall.SIGKILL) + case size := <-resize: + err := p.Resize(ctx, size.Cols, size.Rows) + if err != nil { + cancel() + return err + } + case status := <-statusCh: + if cancel != nil { + cancel() + } + if status.ExitCode() != 0 { + var err error + if status.ExitCode() == containerd.UnknownExitStatus && status.Error() != nil { + err = errors.Wrap(status.Error(), "failure waiting for process") + } else { + err = &executor.ExitError{ + ExitCode: status.ExitCode(), + Err: status.Error(), + } + } + select { + case <-ctx.Done(): + err = errors.Wrap(ctx.Err(), err.Error()) + default: + } + return err + } + return nil + } + } } diff --git a/executor/executor.go b/executor/executor.go index 5ab425253a1c..54435bca89d6 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "fmt" "io" "net" @@ -28,10 +29,18 @@ type Mount struct { Readonly bool } +type WinSize struct { + Rows uint32 + Cols uint32 + Xpixel uint32 + Ypixel uint32 +} + type ProcessInfo struct { Meta Meta Stdin io.ReadCloser Stdout, Stderr io.WriteCloser + Resize <-chan WinSize } type Executor interface { @@ -48,3 +57,24 @@ type HostIP struct { Host string IP net.IP } + +// ExitError will be returned from Run and Exec when the container process exits with +// a non-zero exit code. +type ExitError struct { + ExitCode uint32 + Err error +} + +func (err *ExitError) Error() string { + if err.Err != nil { + return err.Err.Error() + } + return fmt.Sprintf("exit code: %d", err.ExitCode) +} + +func (err *ExitError) Unwrap() error { + if err.Err == nil { + return fmt.Errorf("exit code: %d", err.ExitCode) + } + return err.Err +} diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 5a27732bb946..c0d2081956dd 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -271,7 +271,10 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } } - spec.Process.Terminal = meta.Tty + if meta.Tty { + return errors.New("tty with runc not implemented") + } + spec.Process.OOMScoreAdj = w.oomScoreAdj if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { @@ -329,8 +332,9 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, close(ended) if status != 0 || err != nil { - if err == nil { - err = errors.Errorf("exit code: %d", status) + err = &executor.ExitError{ + ExitCode: uint32(status), + Err: err, } select { case <-ctx.Done(): @@ -343,7 +347,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, return nil } -func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { +func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) { // first verify the container is running, if we get an error assume the container // is in the process of being created and check again every 100ms or until // context is canceled. @@ -406,9 +410,21 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro spec.Process.Env = process.Meta.Env } - return w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{ + err = w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{ IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, }) + + var exitError *exec.ExitError + if errors.As(err, &exitError) { + err = &executor.ExitError{ + ExitCode: uint32(exitError.ExitCode()), + Err: err, + } + return err + } else if err != nil { + return err + } + return nil } type forwardIO struct { From 93344a9d24743107b4d0f9daff3d9d77795e78ec Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Fri, 31 Jul 2020 20:20:59 +0000 Subject: [PATCH 2/5] remove *pixel from winsize struct, tweak ExitError handling for ctx.Err Signed-off-by: Cory Bennett --- executor/containerdexecutor/executor.go | 20 +++++++++----------- executor/executor.go | 6 ++---- executor/runcexecutor/executor.go | 6 ++++-- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 214bec6213c5..5f421da260b3 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -299,8 +299,8 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut return w.runProcess(ctx, taskProcess, process.Resize, nil) } -func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) (err error) { - err = p.Start(ctx) +func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) error { + err := p.Start(ctx) if err != nil { return err } @@ -334,21 +334,19 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces cancel() } if status.ExitCode() != 0 { - var err error + exitErr := &executor.ExitError{ + ExitCode: status.ExitCode(), + Err: status.Error(), + } if status.ExitCode() == containerd.UnknownExitStatus && status.Error() != nil { - err = errors.Wrap(status.Error(), "failure waiting for process") - } else { - err = &executor.ExitError{ - ExitCode: status.ExitCode(), - Err: status.Error(), - } + exitErr.Err = errors.Wrap(status.Error(), "failure waiting for process") } select { case <-ctx.Done(): - err = errors.Wrap(ctx.Err(), err.Error()) + exitErr.Err = errors.Wrap(ctx.Err(), exitErr.Error()) default: } - return err + return exitErr } return nil } diff --git a/executor/executor.go b/executor/executor.go index 54435bca89d6..b0980f368905 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -30,10 +30,8 @@ type Mount struct { } type WinSize struct { - Rows uint32 - Cols uint32 - Xpixel uint32 - Ypixel uint32 + Rows uint32 + Cols uint32 } type ProcessInfo struct { diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index c0d2081956dd..dbe3964bee48 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -332,13 +332,15 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, close(ended) if status != 0 || err != nil { - err = &executor.ExitError{ + exitErr := &executor.ExitError{ ExitCode: uint32(status), Err: err, } + err = exitErr select { case <-ctx.Done(): - return errors.Wrapf(ctx.Err(), err.Error()) + exitErr.Err = errors.Wrapf(ctx.Err(), exitErr.Error()) + return exitErr default: return stack.Enable(err) } From f781f83a89fc93a73554973fef96e16f5828ec1a Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Sat, 1 Aug 2020 06:09:14 +0000 Subject: [PATCH 3/5] fix containerd executor Run/Exec to close container input on eof from stdin Signed-off-by: Cory Bennett --- executor/containerdexecutor/executor.go | 15 ++++++---- worker/tests/common.go | 40 ++++++++++++++++++++++--- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 5f421da260b3..2e06c984ad93 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -296,11 +296,17 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut return errors.WithStack(err) } - return w.runProcess(ctx, taskProcess, process.Resize, nil) + err = w.runProcess(ctx, taskProcess, process.Resize, nil) + return err } func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) error { - err := p.Start(ctx) + statusCh, err := p.Wait(context.Background()) + if err != nil { + return err + } + + err = p.Start(ctx) if err != nil { return err } @@ -309,10 +315,7 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces started() } - statusCh, err := p.Wait(context.Background()) - if err != nil { - return err - } + p.CloseIO(ctx, containerd.WithStdinCloser) var cancel func() ctxDone := ctx.Done() diff --git a/worker/tests/common.go b/worker/tests/common.go index 82c9e1d94d8a..18b13408490d 100644 --- a/worker/tests/common.go +++ b/worker/tests/common.go @@ -46,9 +46,41 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { id := identity.NewID() + // verify pid1 exits when stdin sees EOF + ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 5*time.Second) + started := make(chan struct{}) + pipeR, pipeW := io.Pipe() + go func() { + select { + case <-ctxTimeout.Done(): + t.Error("Unexpected timeout waiting for pid1 to start") + case <-started: + pipeW.Write([]byte("hello")) + pipeW.Close() + } + }() + stdout := bytes.NewBuffer(nil) + stderr := bytes.NewBuffer(nil) + err = w.WorkerOpt.Executor.Run(ctxTimeout, id, root, nil, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"cat"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"}, + }, + Stdin: pipeR, + Stdout: &nopCloser{stdout}, + Stderr: &nopCloser{stderr}, + }, started) + cancelTimeout() + t.Logf("Stdout: %s", stdout.String()) + t.Logf("Stderr: %s", stderr.String()) + require.NoError(t, err) + require.Equal(t, "hello", stdout.String()) + require.Empty(t, stderr.String()) + // first start pid1 in the background eg := errgroup.Group{} - started := make(chan struct{}) + started = make(chan struct{}) eg.Go(func() error { return w.WorkerOpt.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ Meta: executor.Meta{ @@ -65,8 +97,8 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { t.Error("Unexpected timeout waiting for pid1 to start") } - stdout := bytes.NewBuffer(nil) - stderr := bytes.NewBuffer(nil) + stdout.Reset() + stderr.Reset() // verify pid1 is the sleep command via Exec err = w.WorkerOpt.Executor.Exec(ctx, id, executor.ProcessInfo{ @@ -120,7 +152,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { err = eg.Wait() // we expect pid1 to get canceled after we test the exec - require.EqualError(t, errors.Cause(err), context.Canceled.Error()) + require.True(t, errors.Is(err, context.Canceled)) err = snap.Release(ctx) require.NoError(t, err) From 86e246a874f66697aefe48c58e4a9d39d6e0af22 Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Sat, 1 Aug 2020 22:39:26 +0000 Subject: [PATCH 4/5] only warn on resize errors prevent resize from blocking exit fix edgecase where kill signal never reaches process Signed-off-by: Cory Bennett --- executor/containerdexecutor/executor.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 2e06c984ad93..3d5d4b3eaa4f 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -2,6 +2,7 @@ package containerdexecutor import ( "context" + "fmt" "os" "path/filepath" "strings" @@ -318,6 +319,7 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces p.CloseIO(ctx, containerd.WithStdinCloser) var cancel func() + var killCtxDone <-chan struct{} ctxDone := ctx.Done() for { select { @@ -325,13 +327,8 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces ctxDone = nil var killCtx context.Context killCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + killCtxDone = killCtx.Done() p.Kill(killCtx, syscall.SIGKILL) - case size := <-resize: - err := p.Resize(ctx, size.Cols, size.Rows) - if err != nil { - cancel() - return err - } case status := <-statusCh: if cancel != nil { cancel() @@ -352,6 +349,20 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces return exitErr } return nil + case <-killCtxDone: + if cancel != nil { + cancel() + } + return fmt.Errorf("failed to kill process on cancel") + case size := <-resize: + ctxTimeout, cancelTimeout := context.WithTimeout(ctx, time.Second) + go func() { + defer cancelTimeout() + err = p.Resize(ctxTimeout, size.Cols, size.Rows) + if err != nil { + logrus.Warnf("Failed to resize %s: %s", p.ID(), err) + } + }() } } } From 19c0077b49ca3061314194c7d4e0b59fc184ff37 Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Mon, 3 Aug 2020 00:27:28 +0000 Subject: [PATCH 5/5] update container resize events in sequence, also move it out of exit/cancel loop to prevent blocking. Signed-off-by: Cory Bennett --- executor/containerdexecutor/executor.go | 35 +++++++++++++++++-------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 3d5d4b3eaa4f..9e8dd1a71f2d 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -2,7 +2,6 @@ package containerdexecutor import ( "context" - "fmt" "os" "path/filepath" "strings" @@ -302,6 +301,8 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut } func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) error { + // Not using `ctx` here because the context passed only affects the statusCh which we + // don't want cancelled when ctx.Done is sent. We want to process statusCh on cancel. statusCh, err := p.Wait(context.Background()) if err != nil { return err @@ -318,6 +319,27 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces p.CloseIO(ctx, containerd.WithStdinCloser) + // resize in separate go loop so it does not potentially block + // the container cancel/exit status loop below. + resizeCtx, resizeCancel := context.WithCancel(ctx) + defer resizeCancel() + go func() { + for { + select { + case <-resizeCtx.Done(): + return + case size, ok := <-resize: + if !ok { + return // chan closed + } + err = p.Resize(resizeCtx, size.Cols, size.Rows) + if err != nil { + logrus.Warnf("Failed to resize %s: %s", p.ID(), err) + } + } + } + }() + var cancel func() var killCtxDone <-chan struct{} ctxDone := ctx.Done() @@ -353,16 +375,7 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Proces if cancel != nil { cancel() } - return fmt.Errorf("failed to kill process on cancel") - case size := <-resize: - ctxTimeout, cancelTimeout := context.WithTimeout(ctx, time.Second) - go func() { - defer cancelTimeout() - err = p.Resize(ctxTimeout, size.Cols, size.Rows) - if err != nil { - logrus.Warnf("Failed to resize %s: %s", p.ID(), err) - } - }() + return errors.Errorf("failed to kill process on cancel") } } }