diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 83de3b143bff..9e8dd1a71f2d 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -172,6 +172,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Moun return err } defer cleanup() + spec.Process.Terminal = meta.Tty container, err := w.client.NewContainer(ctx, id, containerd.WithSpec(spec), @@ -195,59 +196,24 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Moun if err != nil { return err } + defer func() { if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil { err = errors.Wrapf(err1, "failed to delete task %s", id) } }() - if err := task.Start(ctx); err != nil { - return err - } - - if started != nil { + err = w.runProcess(ctx, task, process.Resize, func() { startedOnce.Do(func() { - close(started) - }) - } - statusCh, err := task.Wait(context.Background()) - if err != nil { - return err - } - - var cancel func() - ctxDone := ctx.Done() - for { - select { - case <-ctxDone: - ctxDone = nil - var killCtx context.Context - killCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) - task.Kill(killCtx, syscall.SIGKILL) - case status := <-statusCh: - if cancel != nil { - cancel() - } - if status.ExitCode() != 0 { - var err error - if status.ExitCode() == containerd.UnknownExitStatus && status.Error() != nil { - err = errors.Wrap(status.Error(), "failure waiting for process") - } else { - err = errors.Errorf("process returned non-zero exit code: %d", status.ExitCode()) - } - select { - case <-ctx.Done(): - err = errors.Wrap(ctx.Err(), err.Error()) - default: - } - return err + if started != nil { + close(started) } - return nil - } - } + }) + }) + return err } -func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { +func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) { meta := process.Meta // first verify the container is running, if we get an error assume the container @@ -329,5 +295,87 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut if err != nil { return errors.WithStack(err) } - return taskProcess.Start(ctx) + + err = w.runProcess(ctx, taskProcess, process.Resize, nil) + return err +} + +func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) error { + // Not using `ctx` here because the context passed only affects the statusCh which we + // don't want cancelled when ctx.Done is sent. We want to process statusCh on cancel. + statusCh, err := p.Wait(context.Background()) + if err != nil { + return err + } + + err = p.Start(ctx) + if err != nil { + return err + } + + if started != nil { + started() + } + + p.CloseIO(ctx, containerd.WithStdinCloser) + + // resize in separate go loop so it does not potentially block + // the container cancel/exit status loop below. + resizeCtx, resizeCancel := context.WithCancel(ctx) + defer resizeCancel() + go func() { + for { + select { + case <-resizeCtx.Done(): + return + case size, ok := <-resize: + if !ok { + return // chan closed + } + err = p.Resize(resizeCtx, size.Cols, size.Rows) + if err != nil { + logrus.Warnf("Failed to resize %s: %s", p.ID(), err) + } + } + } + }() + + var cancel func() + var killCtxDone <-chan struct{} + ctxDone := ctx.Done() + for { + select { + case <-ctxDone: + ctxDone = nil + var killCtx context.Context + killCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + killCtxDone = killCtx.Done() + p.Kill(killCtx, syscall.SIGKILL) + case status := <-statusCh: + if cancel != nil { + cancel() + } + if status.ExitCode() != 0 { + exitErr := &executor.ExitError{ + ExitCode: status.ExitCode(), + Err: status.Error(), + } + if status.ExitCode() == containerd.UnknownExitStatus && status.Error() != nil { + exitErr.Err = errors.Wrap(status.Error(), "failure waiting for process") + } + select { + case <-ctx.Done(): + exitErr.Err = errors.Wrap(ctx.Err(), exitErr.Error()) + default: + } + return exitErr + } + return nil + case <-killCtxDone: + if cancel != nil { + cancel() + } + return errors.Errorf("failed to kill process on cancel") + } + } } diff --git a/executor/executor.go b/executor/executor.go index 5ab425253a1c..b0980f368905 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "fmt" "io" "net" @@ -28,10 +29,16 @@ type Mount struct { Readonly bool } +type WinSize struct { + Rows uint32 + Cols uint32 +} + type ProcessInfo struct { Meta Meta Stdin io.ReadCloser Stdout, Stderr io.WriteCloser + Resize <-chan WinSize } type Executor interface { @@ -48,3 +55,24 @@ type HostIP struct { Host string IP net.IP } + +// ExitError will be returned from Run and Exec when the container process exits with +// a non-zero exit code. +type ExitError struct { + ExitCode uint32 + Err error +} + +func (err *ExitError) Error() string { + if err.Err != nil { + return err.Err.Error() + } + return fmt.Sprintf("exit code: %d", err.ExitCode) +} + +func (err *ExitError) Unwrap() error { + if err.Err == nil { + return fmt.Errorf("exit code: %d", err.ExitCode) + } + return err.Err +} diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 5a27732bb946..dbe3964bee48 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -271,7 +271,10 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, } } - spec.Process.Terminal = meta.Tty + if meta.Tty { + return errors.New("tty with runc not implemented") + } + spec.Process.OOMScoreAdj = w.oomScoreAdj if w.rootless { if err := rootlessspecconv.ToRootless(spec); err != nil { @@ -329,12 +332,15 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, close(ended) if status != 0 || err != nil { - if err == nil { - err = errors.Errorf("exit code: %d", status) + exitErr := &executor.ExitError{ + ExitCode: uint32(status), + Err: err, } + err = exitErr select { case <-ctx.Done(): - return errors.Wrapf(ctx.Err(), err.Error()) + exitErr.Err = errors.Wrapf(ctx.Err(), exitErr.Error()) + return exitErr default: return stack.Enable(err) } @@ -343,7 +349,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, return nil } -func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { +func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) { // first verify the container is running, if we get an error assume the container // is in the process of being created and check again every 100ms or until // context is canceled. @@ -406,9 +412,21 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro spec.Process.Env = process.Meta.Env } - return w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{ + err = w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{ IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}, }) + + var exitError *exec.ExitError + if errors.As(err, &exitError) { + err = &executor.ExitError{ + ExitCode: uint32(exitError.ExitCode()), + Err: err, + } + return err + } else if err != nil { + return err + } + return nil } type forwardIO struct { diff --git a/worker/tests/common.go b/worker/tests/common.go index 82c9e1d94d8a..18b13408490d 100644 --- a/worker/tests/common.go +++ b/worker/tests/common.go @@ -46,9 +46,41 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { id := identity.NewID() + // verify pid1 exits when stdin sees EOF + ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 5*time.Second) + started := make(chan struct{}) + pipeR, pipeW := io.Pipe() + go func() { + select { + case <-ctxTimeout.Done(): + t.Error("Unexpected timeout waiting for pid1 to start") + case <-started: + pipeW.Write([]byte("hello")) + pipeW.Close() + } + }() + stdout := bytes.NewBuffer(nil) + stderr := bytes.NewBuffer(nil) + err = w.WorkerOpt.Executor.Run(ctxTimeout, id, root, nil, executor.ProcessInfo{ + Meta: executor.Meta{ + Args: []string{"cat"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"}, + }, + Stdin: pipeR, + Stdout: &nopCloser{stdout}, + Stderr: &nopCloser{stderr}, + }, started) + cancelTimeout() + t.Logf("Stdout: %s", stdout.String()) + t.Logf("Stderr: %s", stderr.String()) + require.NoError(t, err) + require.Equal(t, "hello", stdout.String()) + require.Empty(t, stderr.String()) + // first start pid1 in the background eg := errgroup.Group{} - started := make(chan struct{}) + started = make(chan struct{}) eg.Go(func() error { return w.WorkerOpt.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ Meta: executor.Meta{ @@ -65,8 +97,8 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { t.Error("Unexpected timeout waiting for pid1 to start") } - stdout := bytes.NewBuffer(nil) - stderr := bytes.NewBuffer(nil) + stdout.Reset() + stderr.Reset() // verify pid1 is the sleep command via Exec err = w.WorkerOpt.Executor.Exec(ctx, id, executor.ProcessInfo{ @@ -120,7 +152,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { err = eg.Wait() // we expect pid1 to get canceled after we test the exec - require.EqualError(t, errors.Cause(err), context.Canceled.Error()) + require.True(t, errors.Is(err, context.Canceled)) err = snap.Release(ctx) require.NoError(t, err)