diff --git a/client/build_test.go b/client/build_test.go index 75ebce6d3b32..763207c00c92 100644 --- a/client/build_test.go +++ b/client/build_test.go @@ -45,7 +45,9 @@ func TestClientGatewayIntegration(t *testing.T) { testClientGatewayContainerPID1Exit, testClientGatewayContainerMounts, testClientGatewayContainerPID1Tty, + testClientGatewayContainerCancelPID1Tty, testClientGatewayContainerExecTty, + testClientGatewayContainerCancelExecTty, testClientSlowCacheRootfsRef, testClientGatewayContainerPlatformPATH, testClientGatewayExecError, @@ -923,6 +925,77 @@ func testClientGatewayContainerPID1Tty(t *testing.T, sb integration.Sandbox) { checkAllReleasable(t, c, sb, true) } +// testClientGatewayContainerCancelPID1Tty is testing that the tty will cleanly +// shutdown on context cancel +func testClientGatewayContainerCancelPID1Tty(t *testing.T, sb integration.Sandbox) { + requiresLinux(t) + ctx := sb.Context() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + product := "buildkit_test" + + inputR, inputW := io.Pipe() + output := bytes.NewBuffer(nil) + + b := func(ctx context.Context, c client.Client) (*client.Result, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + st := llb.Image("busybox:latest") + + def, err := st.Marshal(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal state") + } + + r, err := c.Solve(ctx, client.SolveRequest{ + Definition: def.ToPB(), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to solve") + } + + ctr, err := c.NewContainer(ctx, client.NewContainerRequest{ + Mounts: []client.Mount{{ + Dest: "/", + MountType: pb.MountType_BIND, + Ref: r.Ref, + }}, + }) + require.NoError(t, err) + defer ctr.Release(ctx) + + prompt := newTestPrompt(ctx, t, inputW, output) + pid1, err := ctr.Start(ctx, client.StartRequest{ + Args: []string{"sh"}, + Tty: true, + Stdin: inputR, + Stdout: &nopCloser{output}, + Stderr: &nopCloser{output}, + Env: []string{fmt.Sprintf("PS1=%s", prompt.String())}, + }) + require.NoError(t, err) + prompt.SendExpect("echo hi", "hi") + cancel() + + err = pid1.Wait() + require.ErrorIs(t, err, context.Canceled) + + return &client.Result{}, err + } + + _, err = c.Build(ctx, SolveOpt{}, product, b, nil) + require.Error(t, err) + + inputW.Close() + inputR.Close() + + checkAllReleasable(t, c, sb, true) +} + type testPrompt struct { ctx context.Context t *testing.T @@ -1071,6 +1144,87 @@ func testClientGatewayContainerExecTty(t *testing.T, sb integration.Sandbox) { checkAllReleasable(t, c, sb, true) } +// testClientGatewayContainerExecTty is testing the tty shuts down cleanly +// on context.Cancel +func testClientGatewayContainerCancelExecTty(t *testing.T, sb integration.Sandbox) { + requiresLinux(t) + ctx := sb.Context() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + product := "buildkit_test" + + inputR, inputW := io.Pipe() + output := bytes.NewBuffer(nil) + b := func(ctx context.Context, c client.Client) (*client.Result, error) { + ctx, timeout := context.WithTimeout(ctx, 10*time.Second) + defer timeout() + st := llb.Image("busybox:latest") + + def, err := st.Marshal(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal state") + } + + r, err := c.Solve(ctx, client.SolveRequest{ + Definition: def.ToPB(), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to solve") + } + + ctr, err := c.NewContainer(ctx, client.NewContainerRequest{ + Mounts: []client.Mount{{ + Dest: "/", + MountType: pb.MountType_BIND, + Ref: r.Ref, + }}, + }) + require.NoError(t, err) + + pid1, err := ctr.Start(ctx, client.StartRequest{ + Args: []string{"sleep", "10"}, + }) + require.NoError(t, err) + + defer pid1.Wait() + defer ctr.Release(ctx) + + execCtx, cancel := context.WithCancel(ctx) + defer cancel() + + prompt := newTestPrompt(execCtx, t, inputW, output) + pid2, err := ctr.Start(execCtx, client.StartRequest{ + Args: []string{"sh"}, + Tty: true, + Stdin: inputR, + Stdout: &nopCloser{output}, + Stderr: &nopCloser{output}, + Env: []string{fmt.Sprintf("PS1=%s", prompt.String())}, + }) + require.NoError(t, err) + + prompt.SendExpect("echo hi", "hi") + cancel() + + err = pid2.Wait() + require.ErrorIs(t, err, context.Canceled) + + return &client.Result{}, err + } + + _, err = c.Build(ctx, SolveOpt{}, product, b, nil) + require.Error(t, err) + require.Contains(t, err.Error(), context.Canceled.Error()) + + inputW.Close() + inputR.Close() + + checkAllReleasable(t, c, sb, true) +} + func testClientSlowCacheRootfsRef(t *testing.T, sb integration.Sandbox) { requiresLinux(t) diff --git a/client/client_test.go b/client/client_test.go index 6ca36d2a71a6..348d81070f96 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -195,6 +195,7 @@ func TestIntegration(t *testing.T) { testMountStubsDirectory, testMountStubsTimestamp, testSourcePolicy, + testLLBMountPerformance, ) } @@ -8945,3 +8946,31 @@ func testSourcePolicy(t *testing.T, sb integration.Sandbox) { require.ErrorContains(t, err, sourcepolicy.ErrSourceDenied.Error()) }) } + +func testLLBMountPerformance(t *testing.T, sb integration.Sandbox) { + c, err := New(sb.Context(), sb.Address()) + require.NoError(t, err) + defer c.Close() + + mntInput := llb.Image("busybox:latest") + st := llb.Image("busybox:latest") + var mnts []llb.State + for i := 0; i < 20; i++ { + execSt := st.Run( + llb.Args([]string{"true"}), + ) + mnts = append(mnts, mntInput) + for j := range mnts { + mnts[j] = execSt.AddMount(fmt.Sprintf("/tmp/bin%d", j), mnts[j], llb.SourcePath("/bin")) + } + st = execSt.Root() + } + + def, err := st.Marshal(sb.Context()) + require.NoError(t, err) + + timeoutCtx, cancel := context.WithTimeout(sb.Context(), time.Minute) + defer cancel() + _, err = c.Solve(timeoutCtx, def, SolveOpt{}, nil) + require.NoError(t, err) +} diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 213ebb73665a..19f7fbd812d6 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -92,7 +92,7 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex root := opt.Root - if err := os.MkdirAll(root, 0711); err != nil { + if err := os.MkdirAll(root, 0o711); err != nil { return nil, errors.Wrapf(err, "failed to create %s", root) } @@ -205,7 +205,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, } bundle := filepath.Join(w.root, id) - if err := os.Mkdir(bundle, 0711); err != nil { + if err := os.Mkdir(bundle, 0o711); err != nil { return err } defer os.RemoveAll(bundle) @@ -216,7 +216,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, } rootFSPath := filepath.Join(bundle, "rootfs") - if err := idtools.MkdirAllAndChown(rootFSPath, 0700, identity); err != nil { + if err := idtools.MkdirAllAndChown(rootFSPath, 0o700, identity); err != nil { return err } if err := mount.All(rootMount, rootFSPath); err != nil { @@ -270,7 +270,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, return errors.Wrapf(err, "working dir %s points to invalid target", newp) } if _, err := os.Stat(newp); err != nil { - if err := idtools.MkdirAllAndChown(newp, 0755, identity); err != nil { + if err := idtools.MkdirAllAndChown(newp, 0o755, identity); err != nil { return errors.Wrapf(err, "failed to create working directory %s", newp) } } @@ -287,42 +287,10 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, return err } - // runCtx/killCtx is used for extra check in case the kill command blocks - runCtx, cancelRun := context.WithCancel(context.Background()) - defer cancelRun() - - ended := make(chan struct{}) - go func() { - for { - select { - case <-ctx.Done(): - killCtx, timeout := context.WithTimeout(context.Background(), 7*time.Second) - if err := w.runc.Kill(killCtx, id, int(syscall.SIGKILL), nil); err != nil { - bklog.G(ctx).Errorf("failed to kill runc %s: %+v", id, err) - select { - case <-killCtx.Done(): - timeout() - cancelRun() - return - default: - } - } - timeout() - select { - case <-time.After(50 * time.Millisecond): - case <-ended: - return - } - case <-ended: - return - } - } - }() - bklog.G(ctx).Debugf("> creating %s %v", id, meta.Args) trace.SpanFromContext(ctx).AddEvent("Container created") - err = w.run(runCtx, id, bundle, process, func() { + err = w.run(ctx, id, bundle, process, func() { startedOnce.Do(func() { trace.SpanFromContext(ctx).AddEvent("Container started") if started != nil { @@ -330,7 +298,6 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, } }) }) - close(ended) return exitError(ctx, err) } @@ -462,23 +429,87 @@ func (s *forwardIO) Stderr() io.ReadCloser { return nil } -// startingProcess is to track the os process so we can send signals to it. -type startingProcess struct { - Process *os.Process - ready chan struct{} +// procHandle is to track the os process so we can send signals to it. +type procHandle struct { + Process *os.Process + ready chan struct{} + ended chan struct{} + shutdown func() } -// Release will free resources with a startingProcess. -func (p *startingProcess) Release() { +// runcProcessHandle will create a procHandle that will be monitored, where +// on ctx.Done the process will be killed. If the kill fails, then the cancel +// will be called. This is to allow for runc to go through its normal shutdown +// procedure if the ctx is canceled and to ensure there are no zombie processes +// left by runc. +func runcProcessHandle(ctx context.Context, id string) (*procHandle, context.Context) { + runcCtx, cancel := context.WithCancel(context.Background()) + p := &procHandle{ + ready: make(chan struct{}), + ended: make(chan struct{}), + shutdown: cancel, + } + // preserve the logger on the context used for the runc process handling + runcCtx = bklog.WithLogger(runcCtx, bklog.G(ctx)) + + go func() { + // Wait for pid + select { + case <-ctx.Done(): + return // nothing to kill + case <-p.ready: + } + + for { + select { + case <-ctx.Done(): + killCtx, timeout := context.WithTimeout(context.Background(), 7*time.Second) + if err := p.Process.Kill(); err != nil { + bklog.G(ctx).Errorf("failed to kill runc %s: %+v", id, err) + select { + case <-killCtx.Done(): + timeout() + cancel() + return + default: + } + } + timeout() + select { + case <-time.After(50 * time.Millisecond): + case <-p.ended: + return + } + case <-p.ended: + return + } + } + }() + + return p, runcCtx +} + +// Release will free resources with a procHandle. +func (p *procHandle) Release() { + close(p.ended) if p.Process != nil { p.Process.Release() } } +// Shutdown should be called after the runc process has exited. This will allow +// the signal handling and tty resize loops to exit, terminating the +// goroutines. +func (p *procHandle) Shutdown() { + if p.shutdown != nil { + p.shutdown() + } +} + // WaitForReady will wait until the Process has been populated or the // provided context was cancelled. This should be called before using // the Process field. -func (p *startingProcess) WaitForReady(ctx context.Context) error { +func (p *procHandle) WaitForReady(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() @@ -490,7 +521,7 @@ func (p *startingProcess) WaitForReady(ctx context.Context) error { // WaitForStart will record the pid reported by Runc via the channel. // We wait for up to 10s for the runc process to start. If the started // callback is non-nil it will be called after receiving the pid. -func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error { +func (p *procHandle) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error { startedCtx, timeout := context.WithTimeout(ctx, 10*time.Second) defer timeout() var err error @@ -515,7 +546,7 @@ func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int // handleSignals will wait until the runcProcess is ready then will // send each signal received on the channel to the process. -func handleSignals(ctx context.Context, runcProcess *startingProcess, signals <-chan syscall.Signal) error { +func handleSignals(ctx context.Context, runcProcess *procHandle, signals <-chan syscall.Signal) error { if signals == nil { return nil } diff --git a/executor/runcexecutor/executor_common.go b/executor/runcexecutor/executor_common.go index 447c4a96b958..44c696fff10b 100644 --- a/executor/runcexecutor/executor_common.go +++ b/executor/runcexecutor/executor_common.go @@ -49,23 +49,20 @@ type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error // is only supported for linux, so this really just handles signal propagation // to the started runc process. func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error { - runcProcess := &startingProcess{ - ready: make(chan struct{}), - } + runcProcess, ctx := runcProcessHandle(ctx, id) defer runcProcess.Release() - var eg errgroup.Group - egCtx, cancel := context.WithCancel(ctx) + eg, ctx := errgroup.WithContext(ctx) defer eg.Wait() - defer cancel() + defer runcProcess.Shutdown() startedCh := make(chan int, 1) eg.Go(func() error { - return runcProcess.WaitForStart(egCtx, startedCh, started) + return runcProcess.WaitForStart(ctx, startedCh, started) }) eg.Go(func() error { - return handleSignals(egCtx, runcProcess, process.Signal) + return handleSignals(ctx, runcProcess, process.Signal) }) return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr}) diff --git a/executor/runcexecutor/executor_linux.go b/executor/runcexecutor/executor_linux.go index 15ea812a5a23..dbf73069274e 100644 --- a/executor/runcexecutor/executor_linux.go +++ b/executor/runcexecutor/executor_linux.go @@ -44,23 +44,20 @@ func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error { - runcProcess := &startingProcess{ - ready: make(chan struct{}), - } + runcProcess, ctx := runcProcessHandle(ctx, id) defer runcProcess.Release() - var eg errgroup.Group - egCtx, cancel := context.WithCancel(ctx) + eg, ctx := errgroup.WithContext(ctx) defer eg.Wait() - defer cancel() + defer runcProcess.Shutdown() startedCh := make(chan int, 1) eg.Go(func() error { - return runcProcess.WaitForStart(egCtx, startedCh, started) + return runcProcess.WaitForStart(ctx, startedCh, started) }) eg.Go(func() error { - return handleSignals(egCtx, runcProcess, process.Signal) + return handleSignals(ctx, runcProcess, process.Signal) }) if !process.Meta.Tty { @@ -84,7 +81,7 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces } pts.Close() ptm.Close() - cancel() // this will shutdown resize and signal loops + runcProcess.Shutdown() err := eg.Wait() if err != nil { bklog.G(ctx).Warningf("error while shutting down tty io: %s", err) @@ -119,13 +116,13 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces } eg.Go(func() error { - err := runcProcess.WaitForReady(egCtx) + err := runcProcess.WaitForReady(ctx) if err != nil { return err } for { select { - case <-egCtx.Done(): + case <-ctx.Done(): return nil case resize := <-process.Resize: err = ptm.Resize(console.WinSize{ diff --git a/frontend/gateway/grpcclient/client.go b/frontend/gateway/grpcclient/client.go index 1b000a816e37..252617ffa047 100644 --- a/frontend/gateway/grpcclient/client.go +++ b/frontend/gateway/grpcclient/client.go @@ -927,11 +927,11 @@ func (ctr *container) Start(ctx context.Context, req client.StartRequest) (clien if msg == nil { // empty message from ctx cancel, so just start shutting down - // input, but continue processing more exit/done messages + // input closeDoneOnce.Do(func() { close(done) }) - continue + return ctx.Err() } if file := msg.GetFile(); file != nil { diff --git a/solver/llbsolver/history.go b/solver/llbsolver/history.go index c8310cc48ebb..09aa19855e07 100644 --- a/solver/llbsolver/history.go +++ b/solver/llbsolver/history.go @@ -102,13 +102,13 @@ func (h *HistoryQueue) gc() error { } // in order for record to get deleted by gc it exceed both maxentries and maxage criteria - if len(records) < int(h.CleanConfig.MaxEntries) { return nil } + // sort array by newest records first sort.Slice(records, func(i, j int) bool { - return records[i].CompletedAt.Before(*records[j].CompletedAt) + return records[i].CompletedAt.After(*records[j].CompletedAt) }) h.mu.Lock() diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 2f7ba61e5f8f..d65a9e6490c7 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -423,15 +423,6 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro if internal { defer j.CloseProgress() - } else { - rec, err1 := s.recordBuildHistory(ctx, id, req, exp, j) - if err != nil { - defer j.CloseProgress() - return nil, err1 - } - defer func() { - err = rec(resProv, descref, err) - }() } set, err := entitlements.WhiteList(ent, supportedEntitlements(s.entitlements)) @@ -447,14 +438,32 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro j.SessionID = sessionID br := s.bridge(j) + var fwd gateway.LLBBridgeForwarder if s.gatewayForwarder != nil && req.Definition == nil && req.Frontend == "" { - fwd := gateway.NewBridgeForwarder(ctx, br, s.workerController, req.FrontendInputs, sessionID, s.sm) + fwd = gateway.NewBridgeForwarder(ctx, br, s.workerController, req.FrontendInputs, sessionID, s.sm) defer fwd.Discard() + // Register build before calling s.recordBuildHistory, because + // s.recordBuildHistory can block for several seconds on + // LeaseManager calls, and there is a fixed 3s timeout in + // GatewayForwarder on build registration. if err := s.gatewayForwarder.RegisterBuild(ctx, id, fwd); err != nil { return nil, err } defer s.gatewayForwarder.UnregisterBuild(ctx, id) + } + + if !internal { + rec, err1 := s.recordBuildHistory(ctx, id, req, exp, j) + if err1 != nil { + defer j.CloseProgress() + return nil, err1 + } + defer func() { + err = rec(resProv, descref, err) + }() + } + if fwd != nil { var err error select { case <-fwd.Done(): diff --git a/solver/llbsolver/vertex.go b/solver/llbsolver/vertex.go index 6901332d2b65..41a31bb9bbba 100644 --- a/solver/llbsolver/vertex.go +++ b/solver/llbsolver/vertex.go @@ -210,6 +210,7 @@ func recomputeDigests(ctx context.Context, all map[digest.Digest]*pb.Op, visited } if !mutated { + visited[dgst] = dgst return dgst, nil } @@ -274,7 +275,7 @@ func loadLLB(ctx context.Context, def *pb.Definition, polEngine SourcePolicyEval for { newDgst, ok := mutatedDigests[lastDgst] - if !ok { + if !ok || newDgst == lastDgst { break } lastDgst = newDgst