Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 92 additions & 44 deletions executor/containerdexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Moun
return err
}
defer cleanup()
spec.Process.Terminal = meta.Tty

container, err := w.client.NewContainer(ctx, id,
containerd.WithSpec(spec),
Expand All @@ -195,59 +196,24 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Moun
if err != nil {
return err
}

defer func() {
if _, err1 := task.Delete(context.TODO()); err == nil && err1 != nil {
err = errors.Wrapf(err1, "failed to delete task %s", id)
}
}()

if err := task.Start(ctx); err != nil {
return err
}

if started != nil {
err = w.runProcess(ctx, task, process.Resize, func() {
startedOnce.Do(func() {
close(started)
})
}
statusCh, err := task.Wait(context.Background())
if err != nil {
return err
}

var cancel func()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all this wait & error handling code I moved to a common runProcess function to share between Run and Exec. (I failed to realize the previous Exec usage was not synchronous and just returned immediately after starting, whoops)

ctxDone := ctx.Done()
for {
select {
case <-ctxDone:
ctxDone = nil
var killCtx context.Context
killCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
task.Kill(killCtx, syscall.SIGKILL)
case status := <-statusCh:
if cancel != nil {
cancel()
}
if status.ExitCode() != 0 {
var err error
if status.ExitCode() == containerd.UnknownExitStatus && status.Error() != nil {
err = errors.Wrap(status.Error(), "failure waiting for process")
} else {
err = errors.Errorf("process returned non-zero exit code: %d", status.ExitCode())
}
select {
case <-ctx.Done():
err = errors.Wrap(ctx.Err(), err.Error())
default:
}
return err
if started != nil {
close(started)
}
return nil
}
}
})
})
return err
}

func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
func (w *containerdExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) {
meta := process.Meta

// first verify the container is running, if we get an error assume the container
Expand Down Expand Up @@ -329,5 +295,87 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut
if err != nil {
return errors.WithStack(err)
}
return taskProcess.Start(ctx)

err = w.runProcess(ctx, taskProcess, process.Resize, nil)
return err
}

func (w *containerdExecutor) runProcess(ctx context.Context, p containerd.Process, resize <-chan executor.WinSize, started func()) error {
// Not using `ctx` here because the context passed only affects the statusCh which we
// don't want cancelled when ctx.Done is sent. We want to process statusCh on cancel.
statusCh, err := p.Wait(context.Background())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why context.Background? Maybe leave a comment for future readers as well.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I was wondering the same thing, this code was just moved from Run to share with Exec. We need to ask your 3yr younger self :)
https://github.com/moby/buildkit/blame/master/executor/containerdexecutor/executor.go#L213

My hunch is that we wanted to keep the statusCh alive if we get ctx.Done so that we can send sigkill and capture status from that?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at Wait implementation I now understand it. Wait is non-blocking irrelevant from the context passed and context only affects the statusCh that we shouldn't cancel.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment with your findings.

if err != nil {
return err
}

err = p.Start(ctx)
if err != nil {
return err
}

if started != nil {
started()
}

p.CloseIO(ctx, containerd.WithStdinCloser)

// resize in separate go loop so it does not potentially block
// the container cancel/exit status loop below.
resizeCtx, resizeCancel := context.WithCancel(ctx)
defer resizeCancel()
go func() {
for {
select {
case <-resizeCtx.Done():
return
case size, ok := <-resize:
if !ok {
return // chan closed
}
err = p.Resize(resizeCtx, size.Cols, size.Rows)
if err != nil {
logrus.Warnf("Failed to resize %s: %s", p.ID(), err)
}
}
}
}()

var cancel func()
var killCtxDone <-chan struct{}
ctxDone := ctx.Done()
for {
select {
case <-ctxDone:
ctxDone = nil
var killCtx context.Context
killCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
killCtxDone = killCtx.Done()
p.Kill(killCtx, syscall.SIGKILL)
case status := <-statusCh:
if cancel != nil {
cancel()
}
if status.ExitCode() != 0 {
exitErr := &executor.ExitError{
ExitCode: status.ExitCode(),
Err: status.Error(),
}
if status.ExitCode() == containerd.UnknownExitStatus && status.Error() != nil {
exitErr.Err = errors.Wrap(status.Error(), "failure waiting for process")
}
select {
case <-ctx.Done():
exitErr.Err = errors.Wrap(ctx.Err(), exitErr.Error())
default:
}
return exitErr
}
return nil
case <-killCtxDone:
if cancel != nil {
cancel()
}
return errors.Errorf("failed to kill process on cancel")
}
}
}
28 changes: 28 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"context"
"fmt"
"io"
"net"

Expand All @@ -28,10 +29,16 @@ type Mount struct {
Readonly bool
}

type WinSize struct {
Rows uint32
Cols uint32
}

type ProcessInfo struct {
Meta Meta
Stdin io.ReadCloser
Stdout, Stderr io.WriteCloser
Resize <-chan WinSize
}

type Executor interface {
Expand All @@ -48,3 +55,24 @@ type HostIP struct {
Host string
IP net.IP
}

// ExitError will be returned from Run and Exec when the container process exits with
// a non-zero exit code.
type ExitError struct {
ExitCode uint32
Err error
}

func (err *ExitError) Error() string {
if err.Err != nil {
return err.Err.Error()
}
return fmt.Sprintf("exit code: %d", err.ExitCode)
}

func (err *ExitError) Unwrap() error {
if err.Err == nil {
return fmt.Errorf("exit code: %d", err.ExitCode)
}
return err.Err
}
30 changes: 24 additions & 6 deletions executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
}
}

spec.Process.Terminal = meta.Tty
if meta.Tty {
return errors.New("tty with runc not implemented")
}

spec.Process.OOMScoreAdj = w.oomScoreAdj
if w.rootless {
if err := rootlessspecconv.ToRootless(spec); err != nil {
Expand Down Expand Up @@ -329,12 +332,15 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
close(ended)

if status != 0 || err != nil {
if err == nil {
err = errors.Errorf("exit code: %d", status)
exitErr := &executor.ExitError{
ExitCode: uint32(status),
Err: err,
}
err = exitErr
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), err.Error())
exitErr.Err = errors.Wrapf(ctx.Err(), exitErr.Error())
return exitErr
default:
return stack.Enable(err)
}
Expand All @@ -343,7 +349,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable,
return nil
}

func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) {
// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.
Expand Down Expand Up @@ -406,9 +412,21 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
spec.Process.Env = process.Meta.Env
}

return w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{
err = w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
})

var exitError *exec.ExitError
if errors.As(err, &exitError) {
err = &executor.ExitError{
ExitCode: uint32(exitError.ExitCode()),
Err: err,
}
return err
} else if err != nil {
return err
}
return nil
}

type forwardIO struct {
Expand Down
40 changes: 36 additions & 4 deletions worker/tests/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,41 @@ func TestWorkerExec(t *testing.T, w *base.Worker) {

id := identity.NewID()

// verify pid1 exits when stdin sees EOF
ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 5*time.Second)
started := make(chan struct{})
pipeR, pipeW := io.Pipe()
go func() {
select {
case <-ctxTimeout.Done():
t.Error("Unexpected timeout waiting for pid1 to start")
case <-started:
pipeW.Write([]byte("hello"))
pipeW.Close()
}
}()
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
err = w.WorkerOpt.Executor.Run(ctxTimeout, id, root, nil, executor.ProcessInfo{
Meta: executor.Meta{
Args: []string{"cat"},
Cwd: "/",
Env: []string{"PATH=/bin:/usr/bin:/sbin:/usr/sbin"},
},
Stdin: pipeR,
Stdout: &nopCloser{stdout},
Stderr: &nopCloser{stderr},
}, started)
cancelTimeout()
t.Logf("Stdout: %s", stdout.String())
t.Logf("Stderr: %s", stderr.String())
require.NoError(t, err)
require.Equal(t, "hello", stdout.String())
require.Empty(t, stderr.String())

// first start pid1 in the background
eg := errgroup.Group{}
started := make(chan struct{})
started = make(chan struct{})
eg.Go(func() error {
return w.WorkerOpt.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{
Meta: executor.Meta{
Expand All @@ -65,8 +97,8 @@ func TestWorkerExec(t *testing.T, w *base.Worker) {
t.Error("Unexpected timeout waiting for pid1 to start")
}

stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
stdout.Reset()
stderr.Reset()

// verify pid1 is the sleep command via Exec
err = w.WorkerOpt.Executor.Exec(ctx, id, executor.ProcessInfo{
Expand Down Expand Up @@ -120,7 +152,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) {

err = eg.Wait()
// we expect pid1 to get canceled after we test the exec
require.EqualError(t, errors.Cause(err), context.Canceled.Error())
require.True(t, errors.Is(err, context.Canceled))

err = snap.Release(ctx)
require.NoError(t, err)
Expand Down