From 1da8b671c469d685b1bbb90e2209a6648d2316be Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Fri, 25 Jun 2021 16:23:32 +0000 Subject: [PATCH 1/2] [#2112] progress.Controller should own the progress.Writer to prevent leaks Signed-off-by: Cory Bennett --- source/containerimage/pull.go | 5 +-- util/progress/controller/controller.go | 43 ++++++++++++++------------ worker/base/worker.go | 4 +-- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index 588cce92fe98..2c27c2d02b5f 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -200,10 +200,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach } if len(p.manifest.Descriptors) > 0 { - pw, _, _ := progress.FromContext(ctx) - progressController := &controller.Controller{ - Writer: pw, - } + progressController := &controller.Controller{} if p.vtx != nil { progressController.Digest = p.vtx.Digest() progressController.Name = p.vtx.Name() diff --git a/util/progress/controller/controller.go b/util/progress/controller/controller.go index 0c44aa67e1f9..c61cab6b33ad 100644 --- a/util/progress/controller/controller.go +++ b/util/progress/controller/controller.go @@ -13,57 +13,60 @@ import ( type Controller struct { count int64 started *time.Time + writer progress.Writer Digest digest.Digest Name string - Writer progress.Writer } var _ progress.Controller = &Controller{} func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) { - if c.Digest == "" { - return progress.WithProgress(ctx, c.Writer), func(error) {} - } - if atomic.AddInt64(&c.count, 1) == 1 { if c.started == nil { now := time.Now() c.started = &now + c.writer, _, ctx = progress.FromContext(ctx) + } + + if c.Digest != "" { + c.writer.Write(c.Digest.String(), client.Vertex{ + Digest: c.Digest, + Name: c.Name, + Started: c.started, + }) } - c.Writer.Write(c.Digest.String(), client.Vertex{ - Digest: c.Digest, - Name: c.Name, - Started: c.started, - }) } - return progress.WithProgress(ctx, c.Writer), func(err error) { + return progress.WithProgress(ctx, c.writer), func(err error) { if atomic.AddInt64(&c.count, -1) == 0 { now := time.Now() var errString string if err != nil { errString = err.Error() } - c.Writer.Write(c.Digest.String(), client.Vertex{ - Digest: c.Digest, - Name: c.Name, - Started: c.started, - Completed: &now, - Error: errString, - }) + if c.Digest != "" { + c.writer.Write(c.Digest.String(), client.Vertex{ + Digest: c.Digest, + Name: c.Name, + Started: c.started, + Completed: &now, + Error: errString, + }) + } + c.writer.Close() } } } func (c *Controller) Status(id string, action string) func() { start := time.Now() - c.Writer.Write(id, progress.Status{ + c.writer.Write(id, progress.Status{ Action: action, Started: &start, }) return func() { complete := time.Now() - c.Writer.Write(id, progress.Status{ + c.writer.Write(id, progress.Status{ Action: action, Started: &start, Completed: &complete, diff --git a/worker/base/worker.go b/worker/base/worker.go index e078d105e1e2..c13c669f19f7 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -42,7 +42,6 @@ import ( "github.com/moby/buildkit/source/http" "github.com/moby/buildkit/source/local" "github.com/moby/buildkit/util/archutil" - "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/progress/controller" "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" @@ -373,10 +372,9 @@ func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, } func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cache.ImmutableRef, err error) { - pw, _, _ := progress.FromContext(ctx) descHandler := &cache.DescHandler{ Provider: func(session.Group) content.Provider { return remote.Provider }, - Progress: &controller.Controller{Writer: pw}, + Progress: &controller.Controller{}, } descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) for _, desc := range remote.Descriptors { From b1d441b17521416b66ff8a15ba144e44b93321b6 Mon Sep 17 00:00:00 2001 From: Cory Bennett Date: Mon, 28 Jun 2021 16:49:32 +0000 Subject: [PATCH 2/2] [#2112] progress.FromContext returns a writer factory this allows progress.Controller to manage the writer lifecycle Signed-off-by: Cory Bennett --- cache/remotecache/export.go | 2 +- exporter/containerimage/writer.go | 2 +- exporter/local/export.go | 2 +- exporter/oci/export.go | 2 +- exporter/tar/export.go | 2 +- solver/jobs.go | 6 ++-- solver/llbsolver/solver.go | 8 ++--- source/containerimage/pull.go | 6 ++-- source/local/local.go | 2 +- util/flightcontrol/flightcontrol.go | 4 +-- util/progress/controller/controller.go | 7 +++-- util/progress/logs/logs.go | 4 +-- util/progress/multireader.go | 2 +- util/progress/progress.go | 41 ++++++++++++++++++-------- util/progress/progress_test.go | 6 ++-- util/pull/pullprogress/progress.go | 2 +- util/push/push.go | 2 +- worker/base/worker.go | 5 +++- 18 files changed, 63 insertions(+), 42 deletions(-) diff --git a/cache/remotecache/export.go b/cache/remotecache/export.go index b39045c82584..64345f26ab6a 100644 --- a/cache/remotecache/export.go +++ b/cache/remotecache/export.go @@ -25,7 +25,7 @@ import ( type ResolveCacheExporterFunc func(ctx context.Context, g session.Group, attrs map[string]string) (Exporter, error) func oneOffProgress(ctx context.Context, id string) func(err error) error { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, diff --git a/exporter/containerimage/writer.go b/exporter/containerimage/writer.go index ff88f68ed5ad..58de967dea39 100644 --- a/exporter/containerimage/writer.go +++ b/exporter/containerimage/writer.go @@ -511,7 +511,7 @@ func getRefMetadata(ref cache.ImmutableRef, limit int) []refMetadata { } func oneOffProgress(ctx context.Context, id string) func(err error) error { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, diff --git a/exporter/local/export.go b/exporter/local/export.go index d772776a9abd..03f4b2df0283 100644 --- a/exporter/local/export.go +++ b/exporter/local/export.go @@ -143,7 +143,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, func newProgressHandler(ctx context.Context, id string) func(int, bool) { limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1) - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, diff --git a/exporter/oci/export.go b/exporter/oci/export.go index c79e0e441c71..c9cc1c3155f3 100644 --- a/exporter/oci/export.go +++ b/exporter/oci/export.go @@ -229,7 +229,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, } func oneOffProgress(ctx context.Context, id string) func(err error) error { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, diff --git a/exporter/tar/export.go b/exporter/tar/export.go index 79e98cd6a139..5a5d8ee01a7a 100644 --- a/exporter/tar/export.go +++ b/exporter/tar/export.go @@ -153,7 +153,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, } func oneOffProgress(ctx context.Context, id string) func(err error) error { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, diff --git a/solver/jobs.go b/solver/jobs.go index 0a3845f4272a..9e7a516e1527 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -433,7 +433,7 @@ func (jl *Solver) NewJob(id string) (*Job, error) { } pr, ctx, progressCloser := progress.NewContext(context.Background()) - pw, _, _ := progress.FromContext(ctx) // TODO: expose progress.Pipe() + pw, _, _ := progress.NewFromContext(ctx) // TODO: expose progress.Pipe() _, span := trace.NewNoopTracerProvider().Tracer("").Start(ctx, "") j := &Job{ @@ -881,7 +881,7 @@ func (v *vertexWithCacheOptions) Inputs() []Edge { } func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) defer pw.Close() now := time.Now() v.Started = &now @@ -891,7 +891,7 @@ func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) { } func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool) { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) defer pw.Close() now := time.Now() if v.Started == nil { diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 31e8afca7a5e..063cdfc9ee9a 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -327,7 +327,7 @@ func allWorkers(wc *worker.Controller) func(func(w worker.Worker) error) error { } func oneOffProgress(ctx context.Context, id string) func(err error) error { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, @@ -352,7 +352,7 @@ func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f Name: name, } return b.InContext(ctx, func(ctx context.Context, g session.Group) error { - pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest)) + pw, _, ctx := progress.NewFromContext(ctx, progress.WithMetadata("vertex", v.Digest)) notifyStarted(ctx, &v, false) defer pw.Close() err := f(ctx, g) @@ -362,7 +362,7 @@ func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f } func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) defer pw.Close() now := time.Now() v.Started = &now @@ -372,7 +372,7 @@ func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) { } func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool) { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) defer pw.Close() now := time.Now() if v.Started == nil { diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index 2c27c2d02b5f..57264beb7026 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -200,7 +200,9 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach } if len(p.manifest.Descriptors) > 0 { - progressController := &controller.Controller{} + progressController := &controller.Controller{ + WriterFactory: progress.FromContext(ctx), + } if p.vtx != nil { progressController.Digest = p.vtx.Digest() progressController.Name = p.vtx.Name() @@ -366,7 +368,7 @@ func cacheKeyFromConfig(dt []byte) digest.Digest { } func oneOffProgress(ctx context.Context, id string) func(err error) error { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, diff --git a/source/local/local.go b/source/local/local.go index 3d385ab18fd6..8ce22eefd421 100644 --- a/source/local/local.go +++ b/source/local/local.go @@ -247,7 +247,7 @@ func (ls *localSourceHandler) snapshot(ctx context.Context, s session.Group, cal func newProgressHandler(ctx context.Context, id string) func(int, bool) { limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1) - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, diff --git a/util/flightcontrol/flightcontrol.go b/util/flightcontrol/flightcontrol.go index 201dbc58b4bb..3c1b673e15df 100644 --- a/util/flightcontrol/flightcontrol.go +++ b/util/flightcontrol/flightcontrol.go @@ -136,7 +136,7 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) { <-c.cleaned return nil, errRetry } - pw, ok, _ := progress.FromContext(ctx) + pw, ok, _ := progress.NewFromContext(ctx) if ok { c.progressState.add(pw) } @@ -149,7 +149,7 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) { default: } - pw, ok, ctx := progress.FromContext(ctx) + pw, ok, ctx := progress.NewFromContext(ctx) if ok { c.progressState.add(pw) } diff --git a/util/progress/controller/controller.go b/util/progress/controller/controller.go index c61cab6b33ad..74bcb5bdce05 100644 --- a/util/progress/controller/controller.go +++ b/util/progress/controller/controller.go @@ -15,8 +15,9 @@ type Controller struct { started *time.Time writer progress.Writer - Digest digest.Digest - Name string + Digest digest.Digest + Name string + WriterFactory progress.WriterFactory } var _ progress.Controller = &Controller{} @@ -26,7 +27,7 @@ func (c *Controller) Start(ctx context.Context) (context.Context, func(error)) { if c.started == nil { now := time.Now() c.started = &now - c.writer, _, ctx = progress.FromContext(ctx) + c.writer, _, _ = c.WriterFactory(ctx) } if c.Digest != "" { diff --git a/util/progress/logs/logs.go b/util/progress/logs/logs.go index 15944d8c6205..07c7bd898c45 100644 --- a/util/progress/logs/logs.go +++ b/util/progress/logs/logs.go @@ -32,7 +32,7 @@ func NewLogStreams(ctx context.Context, printOutput bool) (io.WriteCloser, io.Wr } func newStreamWriter(ctx context.Context, stream int, printOutput bool) io.WriteCloser { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) return &streamWriter{ pw: pw, stream: stream, @@ -132,7 +132,7 @@ func (sw *streamWriter) Close() error { func LoggerFromContext(ctx context.Context) func([]byte) { return func(dt []byte) { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) defer pw.Close() pw.Write(identity.NewID(), client.VertexLog{ Stream: stderr, diff --git a/util/progress/multireader.go b/util/progress/multireader.go index 2bd3f2ca8616..8d8bbf54c505 100644 --- a/util/progress/multireader.go +++ b/util/progress/multireader.go @@ -28,7 +28,7 @@ func (mr *MultiReader) Reader(ctx context.Context) Reader { defer mr.mu.Unlock() pr, ctx, closeWriter := NewContext(ctx) - pw, _, ctx := FromContext(ctx) + pw, _, ctx := NewFromContext(ctx) w := pw.(*progressWriter) mr.writers[w] = closeWriter diff --git a/util/progress/progress.go b/util/progress/progress.go index 3ce212948c2d..83ca6672a890 100644 --- a/util/progress/progress.go +++ b/util/progress/progress.go @@ -18,22 +18,37 @@ type contextKeyT string var contextKey = contextKeyT("buildkit/util/progress") -// FromContext returns a progress writer from a context. -func FromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) { +// WriterFactory will generate a new progress Writer and return a new Context +// with the new Writer stored. It is the callers responsibility to Close the +// returned Writer to avoid resource leaks. +type WriterFactory func(ctx context.Context) (Writer, bool, context.Context) + +// FromContext returns a WriterFactory to generate new progress writers based +// on a Writer previously stored in the Context. +func FromContext(ctx context.Context, opts ...WriterOption) WriterFactory { v := ctx.Value(contextKey) - pw, ok := v.(*progressWriter) - if !ok { - if pw, ok := v.(*MultiWriter); ok { - return pw, true, ctx + return func(ctx context.Context) (Writer, bool, context.Context) { + pw, ok := v.(*progressWriter) + if !ok { + if pw, ok := v.(*MultiWriter); ok { + return pw, true, ctx + } + return &noOpWriter{}, false, ctx } - return &noOpWriter{}, false, ctx - } - pw = newWriter(pw) - for _, o := range opts { - o(pw) + pw = newWriter(pw) + for _, o := range opts { + o(pw) + } + ctx = context.WithValue(ctx, contextKey, pw) + return pw, true, ctx } - ctx = context.WithValue(ctx, contextKey, pw) - return pw, true, ctx +} + +// NewFromContext creates a new Writer based on a Writer previously stored +// in the Context and returns a new Context with the new Writer stored. It is +// the callers responsibility to Close the returned Writer to avoid resource leaks. +func NewFromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) { + return FromContext(ctx, opts...)(ctx) } type WriterOption func(Writer) diff --git a/util/progress/progress_test.go b/util/progress/progress_test.go index 39fe16c58aa4..ea9f57b1bfdf 100644 --- a/util/progress/progress_test.go +++ b/util/progress/progress_test.go @@ -25,7 +25,7 @@ func TestProgress(t *testing.T) { return saveProgress(ctx, pr, &trace) }) - pw, _, ctx := FromContext(ctx, WithMetadata("tag", "foo")) + pw, _, ctx := NewFromContext(ctx, WithMetadata("tag", "foo")) s, err = calc(ctx, 5, "calc") pw.Close() assert.NoError(t, err) @@ -66,7 +66,7 @@ func TestProgressNested(t *testing.T) { } func calc(ctx context.Context, total int, name string) (int, error) { - pw, _, ctx := FromContext(ctx) + pw, _, ctx := NewFromContext(ctx) defer pw.Close() sum := 0 @@ -91,7 +91,7 @@ func calc(ctx context.Context, total int, name string) (int, error) { func reduceCalc(ctx context.Context, total int) (int, error) { eg, ctx := errgroup.WithContext(ctx) - pw, _, ctx := FromContext(ctx) + pw, _, ctx := NewFromContext(ctx) defer pw.Close() pw.Write("reduce", Status{Action: "starting"}) diff --git a/util/pull/pullprogress/progress.go b/util/pull/pullprogress/progress.go index f16a06998d20..3f10098e79c5 100644 --- a/util/pull/pullprogress/progress.go +++ b/util/pull/pullprogress/progress.go @@ -96,7 +96,7 @@ func trackProgress(ctx context.Context, desc ocispec.Descriptor, manager PullMan ticker.Stop() }() - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) defer pw.Close() ingestRef := remotes.MakeRefKey(ctx, desc) diff --git a/util/push/push.go b/util/push/push.go index acef8a64bee4..4cc19daacdab 100644 --- a/util/push/push.go +++ b/util/push/push.go @@ -183,7 +183,7 @@ func annotateDistributionSourceHandler(manager content.Manager, annotations map[ } func oneOffProgress(ctx context.Context, id string) func(err error) error { - pw, _, _ := progress.FromContext(ctx) + pw, _, _ := progress.NewFromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, diff --git a/worker/base/worker.go b/worker/base/worker.go index c13c669f19f7..f174152725d3 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -42,6 +42,7 @@ import ( "github.com/moby/buildkit/source/http" "github.com/moby/buildkit/source/local" "github.com/moby/buildkit/util/archutil" + "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/progress/controller" "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" @@ -374,7 +375,9 @@ func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cache.ImmutableRef, err error) { descHandler := &cache.DescHandler{ Provider: func(session.Group) content.Provider { return remote.Provider }, - Progress: &controller.Controller{}, + Progress: &controller.Controller{ + WriterFactory: progress.FromContext(ctx), + }, } descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) for _, desc := range remote.Descriptors {