Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 78 additions & 47 deletions executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex

root := opt.Root

if err := os.MkdirAll(root, 0711); err != nil {
if err := os.MkdirAll(root, 0o711); err != nil {
return nil, errors.Wrapf(err, "failed to create %s", root)
}

Expand Down Expand Up @@ -205,7 +205,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}
bundle := filepath.Join(w.root, id)

if err := os.Mkdir(bundle, 0711); err != nil {
if err := os.Mkdir(bundle, 0o711); err != nil {
return err
}
defer os.RemoveAll(bundle)
Expand All @@ -216,7 +216,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}

rootFSPath := filepath.Join(bundle, "rootfs")
if err := idtools.MkdirAllAndChown(rootFSPath, 0700, identity); err != nil {
if err := idtools.MkdirAllAndChown(rootFSPath, 0o700, identity); err != nil {
return err
}
if err := mount.All(rootMount, rootFSPath); err != nil {
Expand Down Expand Up @@ -270,7 +270,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
return errors.Wrapf(err, "working dir %s points to invalid target", newp)
}
if _, err := os.Stat(newp); err != nil {
if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil {
if err := idtools.MkdirAllAndChown(newp, 0o755, identity); err != nil {
return errors.Wrapf(err, "failed to create working directory %s", newp)
}
}
Expand All @@ -287,50 +287,17 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
return err
}

// runCtx/killCtx is used for extra check in case the kill command blocks
runCtx, cancelRun := context.WithCancel(context.Background())
defer cancelRun()

ended := make(chan struct{})
go func() {
for {
select {
case <-ctx.Done():
killCtx, timeout := context.WithTimeout(context.Background(), 7*time.Second)
if err := w.runc.Kill(killCtx, id, int(syscall.SIGKILL), nil); err != nil {
bklog.G(ctx).Errorf("failed to kill runc %s: %+v", id, err)
select {
case <-killCtx.Done():
timeout()
cancelRun()
return
default:
}
}
timeout()
select {
case <-time.After(50 * time.Millisecond):
case <-ended:
return
}
case <-ended:
return
}
}
}()

bklog.G(ctx).Debugf("> creating %s %v", id, meta.Args)

trace.SpanFromContext(ctx).AddEvent("Container created")
err = w.run(runCtx, id, bundle, process, func() {
err = w.run(ctx, id, bundle, process, func() {
startedOnce.Do(func() {
trace.SpanFromContext(ctx).AddEvent("Container started")
if started != nil {
close(started)
}
})
})
close(ended)
return exitError(ctx, err)
}

Expand Down Expand Up @@ -462,23 +429,87 @@ func (s *forwardIO) Stderr() io.ReadCloser {
return nil
}

// startingProcess is to track the os process so we can send signals to it.
type startingProcess struct {
Process *os.Process
ready chan struct{}
// procHandle is to track the os process so we can send signals to it.
type procHandle struct {
Process *os.Process
ready chan struct{}
ended chan struct{}
shutdown func()
}

// Release will free resources with a startingProcess.
func (p *startingProcess) Release() {
// runcProcessHandle will create a procHandle that will be monitored, where
// on ctx.Done the process will be killed. If the kill fails, then the cancel
// will be called. This is to allow for runc to go through its normal shutdown
// procedure if the ctx is canceled and to ensure there are no zombie processes
// left by runc.
func runcProcessHandle(ctx context.Context, id string) (*procHandle, context.Context) {
runcCtx, cancel := context.WithCancel(context.Background())
p := &procHandle{
ready: make(chan struct{}),
ended: make(chan struct{}),
shutdown: cancel,
}
// preserve the logger on the context used for the runc process handling
runcCtx = bklog.WithLogger(runcCtx, bklog.G(ctx))

go func() {
// Wait for pid
select {
case <-ctx.Done():
return // nothing to kill
case <-p.ready:
}

for {
select {
case <-ctx.Done():
killCtx, timeout := context.WithTimeout(context.Background(), 7*time.Second)
if err := p.Process.Kill(); err != nil {
bklog.G(ctx).Errorf("failed to kill runc %s: %+v", id, err)
select {
case <-killCtx.Done():
timeout()
cancel()
return
default:
}
}
timeout()
select {
case <-time.After(50 * time.Millisecond):
case <-p.ended:
return
}
case <-p.ended:
return
}
}
}()

return p, runcCtx
}

// Release will free resources with a procHandle.
func (p *procHandle) Release() {
close(p.ended)
if p.Process != nil {
p.Process.Release()
}
}

// Shutdown should be called after the runc process has exited. This will allow
// the signal handling and tty resize loops to exit, terminating the
// goroutines.
func (p *procHandle) Shutdown() {
if p.shutdown != nil {
p.shutdown()
}
}

// WaitForReady will wait until the Process has been populated or the
// provided context was cancelled. This should be called before using
// the Process field.
func (p *startingProcess) WaitForReady(ctx context.Context) error {
func (p *procHandle) WaitForReady(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -490,7 +521,7 @@ func (p *startingProcess) WaitForReady(ctx context.Context) error {
// WaitForStart will record the pid reported by Runc via the channel.
// We wait for up to 10s for the runc process to start. If the started
// callback is non-nil it will be called after receiving the pid.
func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error {
func (p *procHandle) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error {
startedCtx, timeout := context.WithTimeout(ctx, 10*time.Second)
defer timeout()
var err error
Expand All @@ -515,7 +546,7 @@ func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int

// handleSignals will wait until the runcProcess is ready then will
// send each signal received on the channel to the process.
func handleSignals(ctx context.Context, runcProcess *startingProcess, signals <-chan syscall.Signal) error {
func handleSignals(ctx context.Context, runcProcess *procHandle, signals <-chan syscall.Signal) error {
if signals == nil {
return nil
}
Expand Down
13 changes: 5 additions & 8 deletions executor/runcexecutor/executor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,20 @@ type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error
// is only supported for linux, so this really just handles signal propagation
// to the started runc process.
func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error {
runcProcess := &startingProcess{
ready: make(chan struct{}),
}
runcProcess, ctx := runcProcessHandle(ctx, id)
defer runcProcess.Release()

var eg errgroup.Group
egCtx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
defer eg.Wait()
defer cancel()
defer runcProcess.Shutdown()

startedCh := make(chan int, 1)
eg.Go(func() error {
return runcProcess.WaitForStart(egCtx, startedCh, started)
return runcProcess.WaitForStart(ctx, startedCh, started)
})

eg.Go(func() error {
return handleSignals(egCtx, runcProcess, process.Signal)
return handleSignals(ctx, runcProcess, process.Signal)
})

return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
Expand Down
19 changes: 8 additions & 11 deletions executor/runcexecutor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,20 @@ func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess
type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error

func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error {
runcProcess := &startingProcess{
ready: make(chan struct{}),
}
runcProcess, ctx := runcProcessHandle(ctx, id)
defer runcProcess.Release()

var eg errgroup.Group
egCtx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
defer eg.Wait()
defer cancel()
defer runcProcess.Shutdown()

startedCh := make(chan int, 1)
eg.Go(func() error {
return runcProcess.WaitForStart(egCtx, startedCh, started)
return runcProcess.WaitForStart(ctx, startedCh, started)
})

eg.Go(func() error {
return handleSignals(egCtx, runcProcess, process.Signal)
return handleSignals(ctx, runcProcess, process.Signal)
})

if !process.Meta.Tty {
Expand All @@ -84,7 +81,7 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
}
pts.Close()
ptm.Close()
cancel() // this will shutdown resize and signal loops
runcProcess.Shutdown()
err := eg.Wait()
if err != nil {
bklog.G(ctx).Warningf("error while shutting down tty io: %s", err)
Expand Down Expand Up @@ -119,13 +116,13 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
}

eg.Go(func() error {
err := runcProcess.WaitForReady(egCtx)
err := runcProcess.WaitForReady(ctx)
if err != nil {
return err
}
for {
select {
case <-egCtx.Done():
case <-ctx.Done():
return nil
case resize := <-process.Resize:
err = ptm.Resize(console.WinSize{
Expand Down