diff --git a/cache/blobs.go b/cache/blobs.go index 88ea08b8a00b..33e9693f1982 100644 --- a/cache/blobs.go +++ b/cache/blobs.go @@ -22,7 +22,8 @@ import ( "golang.org/x/sync/errgroup" ) -var g flightcontrol.Group +var g flightcontrol.Group[struct{}] +var gFileList flightcontrol.Group[[]string] const containerdUncompressed = "containerd.io/uncompressed" @@ -86,12 +87,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if _, ok := filter[sr.ID()]; ok { eg.Go(func() error { - _, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) { + _, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (struct{}, error) { if sr.getBlob() != "" { - return nil, nil + return struct{}{}, nil } if !createIfNeeded { - return nil, errors.WithStack(ErrNoBlobs) + return struct{}{}, errors.WithStack(ErrNoBlobs) } compressorFunc, finalize := comp.Type.Compress(ctx, comp) @@ -108,12 +109,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if lowerRef != nil { m, err := lowerRef.Mount(ctx, true, s) if err != nil { - return nil, err + return struct{}{}, err } var release func() error lower, release, err = m.Mount() if err != nil { - return nil, err + return struct{}{}, err } if release != nil { defer release() @@ -131,12 +132,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if upperRef != nil { m, err := upperRef.Mount(ctx, true, s) if err != nil { - return nil, err + return struct{}{}, err } var release func() error upper, release, err = m.Mount() if err != nil { - return nil, err + return struct{}{}, err } if release != nil { defer release() @@ -151,7 +152,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" && sr.kind() != Diff { enableOverlay, err = strconv.ParseBool(forceOvlStr) if err != nil { - return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF") + return struct{}{}, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF") } fallback = false // prohibit fallback on debug } else if !isTypeWindows(sr) { @@ -173,10 +174,10 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if !ok || err != nil { if !fallback { if !ok { - return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper) + return struct{}{}, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper) } if err != nil { - return nil, errors.Wrapf(err, "failed to compute overlay diff") + return struct{}{}, errors.Wrapf(err, "failed to compute overlay diff") } } if logWarnOnErr { @@ -209,7 +210,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool diff.WithCompressor(compressorFunc), ) if err != nil { - return nil, err + return struct{}{}, err } } @@ -219,7 +220,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if finalize != nil { a, err := finalize(ctx, sr.cm.ContentStore) if err != nil { - return nil, errors.Wrapf(err, "failed to finalize compression") + return struct{}{}, errors.Wrapf(err, "failed to finalize compression") } for k, v := range a { desc.Annotations[k] = v @@ -227,7 +228,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool } info, err := sr.cm.ContentStore.Info(ctx, desc.Digest) if err != nil { - return nil, err + return struct{}{}, err } if diffID, ok := info.Labels[containerdUncompressed]; ok { @@ -235,13 +236,13 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool } else if mediaType == ocispecs.MediaTypeImageLayer { desc.Annotations[containerdUncompressed] = desc.Digest.String() } else { - return nil, errors.Errorf("unknown layer compression type") + return struct{}{}, errors.Errorf("unknown layer compression type") } if err := sr.setBlob(ctx, desc); err != nil { - return nil, err + return struct{}{}, err } - return nil, nil + return struct{}{}, nil }) if err != nil { return err @@ -415,29 +416,29 @@ func isTypeWindows(sr *immutableRef) bool { // ensureCompression ensures the specified ref has the blob of the specified compression Type. func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error { - _, err := g.Do(ctx, fmt.Sprintf("%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) { + _, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (struct{}, error) { desc, err := ref.ociDesc(ctx, ref.descHandlers, true) if err != nil { - return nil, err + return struct{}{}, err } // Resolve converters layerConvertFunc, err := getConverter(ctx, ref.cm.ContentStore, desc, comp) if err != nil { - return nil, err + return struct{}{}, err } else if layerConvertFunc == nil { if isLazy, err := ref.isLazy(ctx); err != nil { - return nil, err + return struct{}{}, err } else if isLazy { // This ref can be used as the specified compressionType. Keep it lazy. - return nil, nil + return struct{}{}, nil } - return nil, ref.linkBlob(ctx, desc) + return struct{}{}, ref.linkBlob(ctx, desc) } // First, lookup local content store if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil { - return nil, nil // found the compression variant. no need to convert. + return struct{}{}, nil // found the compression variant. no need to convert. } // Convert layer compression type @@ -447,18 +448,18 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression. dh: ref.descHandlers[desc.Digest], session: s, }).Unlazy(ctx); err != nil { - return nil, err + return struct{}{}, err } newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc) if err != nil { - return nil, errors.Wrapf(err, "failed to convert") + return struct{}{}, errors.Wrapf(err, "failed to convert") } // Start to track converted layer if err := ref.linkBlob(ctx, *newDesc); err != nil { - return nil, errors.Wrapf(err, "failed to add compression blob") + return struct{}{}, errors.Wrapf(err, "failed to add compression blob") } - return nil, nil + return struct{}{}, nil }) return err } diff --git a/cache/filelist.go b/cache/filelist.go index c2c7921fd5db..0cb2e9b60ac4 100644 --- a/cache/filelist.go +++ b/cache/filelist.go @@ -20,7 +20,7 @@ const keyFileList = "filelist" // are in the tar stream (AUFS whiteout format). If the reference does not have a // a blob associated with it, the list is empty. func (sr *immutableRef) FileList(ctx context.Context, s session.Group) ([]string, error) { - res, err := g.Do(ctx, fmt.Sprintf("filelist-%s", sr.ID()), func(ctx context.Context) (interface{}, error) { + return gFileList.Do(ctx, fmt.Sprintf("filelist-%s", sr.ID()), func(ctx context.Context) ([]string, error) { dt, err := sr.GetExternal(keyFileList) if err == nil && dt != nil { var files []string @@ -80,11 +80,4 @@ func (sr *immutableRef) FileList(ctx context.Context, s session.Group) ([]string } return files, nil }) - if err != nil { - return nil, err - } - if res == nil { - return nil, nil - } - return res.([]string), nil } diff --git a/cache/manager.go b/cache/manager.go index 48a6f612fb0c..64322055ef67 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -93,7 +93,7 @@ type cacheManager struct { mountPool sharableMountPool muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results - unlazyG flightcontrol.Group + unlazyG flightcontrol.Group[struct{}] } func NewManager(opt ManagerOpt) (Manager, error) { diff --git a/cache/refs.go b/cache/refs.go index 338dbd664c96..e448f94b29d2 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -90,7 +90,7 @@ type cacheRecord struct { mountCache snapshot.Mountable - sizeG flightcontrol.Group + sizeG flightcontrol.Group[int64] // these are filled if multiple refs point to same data equalMutable *mutableRef @@ -325,7 +325,7 @@ func (cr *cacheRecord) viewSnapshotID() string { func (cr *cacheRecord) size(ctx context.Context) (int64, error) { // this expects that usage() is implemented lazily - s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) { + return cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (int64, error) { cr.mu.Lock() s := cr.getSize() if s != sizeUnknown { @@ -346,7 +346,7 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) { isDead := cr.isDead() cr.mu.Unlock() if isDead { - return int64(0), nil + return 0, nil } if !errors.Is(err, errdefs.ErrNotFound) { return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID()) @@ -379,10 +379,6 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) { cr.mu.Unlock() return usage.Size, nil }) - if err != nil { - return 0, err - } - return s.(int64), nil } // caller must hold cr.mu @@ -1057,7 +1053,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, } func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s session.Group) error { - _, err := sr.sizeG.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ interface{}, rerr error) { + _, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ struct{}, rerr error) { dhs := sr.descHandlers for _, r := range sr.layerChain() { r := r @@ -1069,7 +1065,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s dh := dhs[digest.Digest(r.getBlob())] if dh == nil { // We cannot prepare remote snapshots without descHandler. - return nil, nil + return struct{}{}, nil } // tmpLabels contains dh.SnapshotLabels + session IDs. All keys contain @@ -1121,7 +1117,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s break } - return nil, nil + return struct{}{}, nil }) return err } @@ -1144,18 +1140,18 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields } func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool) error { - _, err := sr.sizeG.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ interface{}, rerr error) { + _, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ struct{}, rerr error) { if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil { - return nil, nil + return struct{}{}, nil } switch sr.kind() { case Merge, Diff: - return nil, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel) + return struct{}{}, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel) case Layer, BaseLayer: - return nil, sr.unlazyLayer(ctx, dhs, pg, s) + return struct{}{}, sr.unlazyLayer(ctx, dhs, pg, s) } - return nil, nil + return struct{}{}, nil }) return err } diff --git a/cache/remote.go b/cache/remote.go index b80bd79cfb0e..cfafef4cb57a 100644 --- a/cache/remote.go +++ b/cache/remote.go @@ -305,11 +305,11 @@ func (p lazyRefProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) } func (p lazyRefProvider) Unlazy(ctx context.Context) error { - _, err := p.ref.cm.unlazyG.Do(ctx, string(p.desc.Digest), func(ctx context.Context) (_ interface{}, rerr error) { + _, err := p.ref.cm.unlazyG.Do(ctx, string(p.desc.Digest), func(ctx context.Context) (_ struct{}, rerr error) { if isLazy, err := p.ref.isLazy(ctx); err != nil { - return nil, err + return struct{}{}, err } else if !isLazy { - return nil, nil + return struct{}{}, nil } defer func() { if rerr == nil { @@ -320,7 +320,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { if p.dh == nil { // shouldn't happen, if you have a lazy immutable ref it already should be validated // that descriptor handlers exist for it - return nil, errors.New("unexpected nil descriptor handler") + return struct{}{}, errors.New("unexpected nil descriptor handler") } if p.dh.Progress != nil { @@ -337,7 +337,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { Manager: p.ref.cm.ContentStore, }, p.desc, p.dh.Ref, logs.LoggerFromContext(ctx)) if err != nil { - return nil, err + return struct{}{}, err } if imageRefs := p.ref.getImageRefs(); len(imageRefs) > 0 { @@ -345,12 +345,12 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { imageRef := imageRefs[0] if p.ref.GetDescription() == "" { if err := p.ref.SetDescription("pulled from " + imageRef); err != nil { - return nil, err + return struct{}{}, err } } } - return nil, nil + return struct{}{}, nil }) return err } diff --git a/client/llb/async.go b/client/llb/async.go index 73d2a92fa11e..8771c71978f8 100644 --- a/client/llb/async.go +++ b/client/llb/async.go @@ -15,7 +15,7 @@ type asyncState struct { target State set bool err error - g flightcontrol.Group + g flightcontrol.Group[State] } func (as *asyncState) Output() Output { @@ -53,7 +53,7 @@ func (as *asyncState) ToInput(ctx context.Context, c *Constraints) (*pb.Input, e } func (as *asyncState) Do(ctx context.Context, c *Constraints) error { - _, err := as.g.Do(ctx, "", func(ctx context.Context) (interface{}, error) { + _, err := as.g.Do(ctx, "", func(ctx context.Context) (State, error) { if as.set { return as.target, as.err } diff --git a/executor/oci/hosts.go b/executor/oci/hosts.go index 0d193555c941..0de29f8a8d79 100644 --- a/executor/oci/hosts.go +++ b/executor/oci/hosts.go @@ -20,9 +20,9 @@ func GetHostsFile(ctx context.Context, stateDir string, extraHosts []executor.Ho return makeHostsFile(stateDir, extraHosts, idmap, hostname) } - _, err := g.Do(ctx, stateDir, func(ctx context.Context) (interface{}, error) { + _, err := g.Do(ctx, stateDir, func(ctx context.Context) (struct{}, error) { _, _, err := makeHostsFile(stateDir, nil, idmap, hostname) - return nil, err + return struct{}{}, err }) if err != nil { return "", nil, err diff --git a/executor/oci/resolvconf.go b/executor/oci/resolvconf.go index aa4c27c50b23..9db0b3dfaad6 100644 --- a/executor/oci/resolvconf.go +++ b/executor/oci/resolvconf.go @@ -11,7 +11,7 @@ import ( "github.com/pkg/errors" ) -var g flightcontrol.Group +var g flightcontrol.Group[struct{}] var notFirstRun bool var lastNotEmpty bool @@ -26,7 +26,7 @@ type DNSConfig struct { func GetResolvConf(ctx context.Context, stateDir string, idmap *idtools.IdentityMapping, dns *DNSConfig) (string, error) { p := filepath.Join(stateDir, "resolv.conf") - _, err := g.Do(ctx, stateDir, func(ctx context.Context) (interface{}, error) { + _, err := g.Do(ctx, stateDir, func(ctx context.Context) (struct{}, error) { generate := !notFirstRun notFirstRun = true @@ -34,7 +34,7 @@ func GetResolvConf(ctx context.Context, stateDir string, idmap *idtools.Identity fi, err := os.Stat(p) if err != nil { if !errors.Is(err, os.ErrNotExist) { - return "", err + return struct{}{}, err } generate = true } @@ -42,7 +42,7 @@ func GetResolvConf(ctx context.Context, stateDir string, idmap *idtools.Identity fiMain, err := os.Stat(resolvconfPath()) if err != nil { if !errors.Is(err, os.ErrNotExist) { - return nil, err + return struct{}{}, err } if lastNotEmpty { generate = true @@ -57,12 +57,12 @@ func GetResolvConf(ctx context.Context, stateDir string, idmap *idtools.Identity } if !generate { - return "", nil + return struct{}{}, nil } dt, err := os.ReadFile(resolvconfPath()) if err != nil && !errors.Is(err, os.ErrNotExist) { - return "", err + return struct{}{}, err } var f *resolvconf.File @@ -85,31 +85,31 @@ func GetResolvConf(ctx context.Context, stateDir string, idmap *idtools.Identity f, err = resolvconf.Build(tmpPath, dnsNameservers, dnsSearchDomains, dnsOptions) if err != nil { - return "", err + return struct{}{}, err } dt = f.Content } f, err = resolvconf.FilterResolvDNS(dt, true) if err != nil { - return "", err + return struct{}{}, err } if err := os.WriteFile(tmpPath, f.Content, 0644); err != nil { - return "", err + return struct{}{}, err } if idmap != nil { root := idmap.RootPair() if err := os.Chown(tmpPath, root.UID, root.GID); err != nil { - return "", err + return struct{}{}, err } } if err := os.Rename(tmpPath, p); err != nil { - return "", err + return struct{}{}, err } - return "", nil + return struct{}{}, nil }) if err != nil { return "", err diff --git a/frontend/dockerui/config.go b/frontend/dockerui/config.go index 99ddb80b1475..12ec2c6880e0 100644 --- a/frontend/dockerui/config.go +++ b/frontend/dockerui/config.go @@ -77,7 +77,7 @@ type Client struct { client client.Client ignoreCache []string bctx *buildContext - g flightcontrol.Group + g flightcontrol.Group[*buildContext] bopts client.BuildOpts dockerignore []byte @@ -280,7 +280,7 @@ func (bc *Client) init() error { } func (bc *Client) buildContext(ctx context.Context) (*buildContext, error) { - bctx, err := bc.g.Do(ctx, "initcontext", func(ctx context.Context) (interface{}, error) { + return bc.g.Do(ctx, "initcontext", func(ctx context.Context) (*buildContext, error) { if bc.bctx != nil { return bc.bctx, nil } @@ -290,10 +290,6 @@ func (bc *Client) buildContext(ctx context.Context) (*buildContext, error) { } return bctx, err }) - if err != nil { - return nil, err - } - return bctx.(*buildContext), nil } func (bc *Client) ReadEntrypoint(ctx context.Context, lang string, opts ...llb.LocalOption) (*Source, error) { diff --git a/solver/jobs.go b/solver/jobs.go index 6aff0c3cd4c9..ec203257e3cd 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -661,9 +661,11 @@ type execRes struct { } type sharedOp struct { - resolver ResolveOpFunc - st *state - g flightcontrol.Group + resolver ResolveOpFunc + st *state + gDigest flightcontrol.Group[digest.Digest] + gCacheRes flightcontrol.Group[[]*CacheMap] + gExecRes flightcontrol.Group[*execRes] opOnce sync.Once op Op @@ -725,7 +727,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF err = errdefs.WrapVertex(err, s.st.origDigest) }() flightControlKey := fmt.Sprintf("slow-compute-%d", index) - key, err := s.g.Do(ctx, flightControlKey, func(ctx context.Context) (interface{}, error) { + key, err := s.gDigest.Do(ctx, flightControlKey, func(ctx context.Context) (digest.Digest, error) { s.slowMu.Lock() // TODO: add helpers for these stored values if res, ok := s.slowCacheRes[index]; ok { @@ -734,7 +736,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF } if err := s.slowCacheErr[index]; err != nil { s.slowMu.Unlock() - return nil, err + return "", err } s.slowMu.Unlock() @@ -742,7 +744,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF if p != nil { st := s.st.solver.getState(s.st.vtx.Inputs()[index]) if st == nil { - return nil, errors.Errorf("failed to get state for index %d on %v", index, s.st.vtx.Name()) + return "", errors.Errorf("failed to get state for index %d on %v", index, s.st.vtx.Name()) } ctx2 := progress.WithProgress(ctx, st.mpw) if st.mspan.Span != nil { @@ -793,7 +795,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF notifyCompleted(err, false) return "", err } - return key.(digest.Digest), nil + return key, nil } func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp, err error) { @@ -806,7 +808,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp, return nil, err } flightControlKey := fmt.Sprintf("cachemap-%d", index) - res, err := s.g.Do(ctx, flightControlKey, func(ctx context.Context) (ret interface{}, retErr error) { + res, err := s.gCacheRes.Do(ctx, flightControlKey, func(ctx context.Context) (ret []*CacheMap, retErr error) { if s.cacheRes != nil && s.cacheDone || index < len(s.cacheRes) { return s.cacheRes, nil } @@ -862,11 +864,11 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp, return nil, err } - if len(res.([]*CacheMap)) <= index { + if len(res) <= index { return s.CacheMap(ctx, index) } - return &cacheMapResp{CacheMap: res.([]*CacheMap)[index], complete: s.cacheDone}, nil + return &cacheMapResp{CacheMap: res[index], complete: s.cacheDone}, nil } func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error) { @@ -879,7 +881,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, return nil, nil, err } flightControlKey := "exec" - res, err := s.g.Do(ctx, flightControlKey, func(ctx context.Context) (ret interface{}, retErr error) { + res, err := s.gExecRes.Do(ctx, flightControlKey, func(ctx context.Context) (ret *execRes, retErr error) { if s.execDone { if s.execErr != nil { return nil, s.execErr @@ -941,8 +943,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, if res == nil || err != nil { return nil, nil, err } - r := res.(*execRes) - return unwrapShared(r.execRes), r.execExporters, nil + return unwrapShared(res.execRes), res.execExporters, nil } func (s *sharedOp) getOp() (Op, error) { diff --git a/solver/llbsolver/bridge.go b/solver/llbsolver/bridge.go index 2ba05d724e70..c34f8c061345 100644 --- a/solver/llbsolver/bridge.go +++ b/solver/llbsolver/bridge.go @@ -155,7 +155,7 @@ type resultProxy struct { id string b *provenanceBridge req frontend.SolveRequest - g flightcontrol.Group + g flightcontrol.Group[solver.CachedResult] mu sync.Mutex released bool v solver.CachedResult @@ -244,7 +244,7 @@ func (rp *resultProxy) Result(ctx context.Context) (res solver.CachedResult, err defer func() { err = rp.wrapError(err) }() - r, err := rp.g.Do(ctx, "result", func(ctx context.Context) (interface{}, error) { + return rp.g.Do(ctx, "result", func(ctx context.Context) (solver.CachedResult, error) { rp.mu.Lock() if rp.released { rp.mu.Unlock() @@ -288,10 +288,6 @@ func (rp *resultProxy) Result(ctx context.Context) (res solver.CachedResult, err rp.mu.Unlock() return v, err }) - if r != nil { - return r.(solver.CachedResult), nil - } - return nil, err } func (b *llbBridge) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (dgst digest.Digest, config []byte, err error) { diff --git a/solver/llbsolver/ops/file.go b/solver/llbsolver/ops/file.go index 4f80ddfb65b5..db81201f1ac5 100644 --- a/solver/llbsolver/ops/file.go +++ b/solver/llbsolver/ops/file.go @@ -296,7 +296,7 @@ type FileOpSolver struct { mu sync.Mutex outs map[int]int ins map[int]input - g flightcontrol.Group + g flightcontrol.Group[input] } type input struct { @@ -405,7 +405,7 @@ func (s *FileOpSolver) validate(idx int, inputs []fileoptypes.Ref, actions []*pb } func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptypes.Ref, actions []*pb.FileAction, g session.Group) (input, error) { - inp, err := s.g.Do(ctx, fmt.Sprintf("inp-%d", idx), func(ctx context.Context) (_ interface{}, err error) { + return s.g.Do(ctx, fmt.Sprintf("inp-%d", idx), func(ctx context.Context) (_ input, err error) { s.mu.Lock() inp := s.ins[idx] s.mu.Unlock() @@ -547,17 +547,17 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp eg.Go(loadInput(ctx)) eg.Go(loadSecondaryInput(ctx)) if err := eg.Wait(); err != nil { - return nil, err + return input{}, err } } else { if action.Input != -1 { if err := loadInput(ctx)(); err != nil { - return nil, err + return input{}, err } } if action.SecondaryInput != -1 { if err := loadSecondaryInput(ctx)(); err != nil { - return nil, err + return input{}, err } } } @@ -565,7 +565,7 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp if inpMount == nil { m, err := s.r.Prepare(ctx, nil, false, g) if err != nil { - return nil, err + return input{}, err } inpMount = m } @@ -574,46 +574,46 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp case *pb.FileAction_Mkdir: user, group, err := loadOwner(ctx, a.Mkdir.Owner) if err != nil { - return nil, err + return input{}, err } if err := s.b.Mkdir(ctx, inpMount, user, group, *a.Mkdir); err != nil { - return nil, err + return input{}, err } case *pb.FileAction_Mkfile: user, group, err := loadOwner(ctx, a.Mkfile.Owner) if err != nil { - return nil, err + return input{}, err } if err := s.b.Mkfile(ctx, inpMount, user, group, *a.Mkfile); err != nil { - return nil, err + return input{}, err } case *pb.FileAction_Rm: if err := s.b.Rm(ctx, inpMount, *a.Rm); err != nil { - return nil, err + return input{}, err } case *pb.FileAction_Copy: if inpMountSecondary == nil { m, err := s.r.Prepare(ctx, nil, true, g) if err != nil { - return nil, err + return input{}, err } inpMountSecondary = m } user, group, err := loadOwner(ctx, a.Copy.Owner) if err != nil { - return nil, err + return input{}, err } if err := s.b.Copy(ctx, inpMountSecondary, inpMount, user, group, *a.Copy); err != nil { - return nil, err + return input{}, err } default: - return nil, errors.Errorf("invalid action type %T", action.Action) + return input{}, errors.Errorf("invalid action type %T", action.Action) } if inp.requiresCommit { ref, err := s.r.Commit(ctx, inpMount) if err != nil { - return nil, err + return input{}, err } inp.ref = ref } else { @@ -624,10 +624,6 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp s.mu.Unlock() return inp, nil }) - if err != nil { - return input{}, err - } - return inp.(input), err } func isDefaultIndexes(idxs [][]int) bool { diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index 509d2a994660..08b1bcc678bc 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -60,9 +60,14 @@ type SourceOpt struct { LeaseManager leases.Manager } +type resolveImageResult struct { + dgst digest.Digest + dt []byte +} + type Source struct { SourceOpt - g flightcontrol.Group + g flightcontrol.Group[*resolveImageResult] } var _ source.Source = &Source{} @@ -83,11 +88,6 @@ func (is *Source) ID() string { } func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) { - type t struct { - dgst digest.Digest - dt []byte - } - var typed *t key := ref if platform := opt.Platform; platform != nil { key += platforms.Format(*platform) @@ -110,18 +110,17 @@ func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.Re rslvr = getOCILayoutResolver(opt.Store, sm, g) } key += rm.String() - res, err := is.g.Do(ctx, key, func(ctx context.Context) (interface{}, error) { + res, err := is.g.Do(ctx, key, func(ctx context.Context) (*resolveImageResult, error) { dgst, dt, err := imageutil.Config(ctx, ref, rslvr, is.ContentStore, is.LeaseManager, opt.Platform) if err != nil { return nil, err } - return &t{dgst: dgst, dt: dt}, nil + return &resolveImageResult{dgst: dgst, dt: dt}, nil }) if err != nil { return "", nil, err } - typed = res.(*t) - return typed.dgst, typed.dt, nil + return res.dgst, res.dt, nil } func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, vtx solver.Vertex) (source.SourceInstance, error) { @@ -205,7 +204,7 @@ type puller struct { ResolverType store llb.ResolveImageConfigOptStore - g flightcontrol.Group + g flightcontrol.Group[struct{}] cacheKeyErr error cacheKeyDone bool releaseTmpLeases func(context.Context) error @@ -255,9 +254,9 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach // be canceled before the progress output is complete progressFactory := progress.FromContext(ctx) - _, err = p.g.Do(ctx, "", func(ctx context.Context) (_ interface{}, err error) { + _, err = p.g.Do(ctx, "", func(ctx context.Context) (_ struct{}, err error) { if p.cacheKeyErr != nil || p.cacheKeyDone { - return nil, p.cacheKeyErr + return struct{}{}, p.cacheKeyErr } defer func() { if !errdefs.IsCanceled(ctx, err) { @@ -266,7 +265,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach }() ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager, leases.WithExpiration(5*time.Minute), leaseutil.MakeTemporary) if err != nil { - return nil, err + return struct{}{}, err } p.releaseTmpLeases = done defer imageutil.AddLease(done) @@ -278,12 +277,12 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach p.manifest, err = p.PullManifests(ctx, getResolver) if err != nil { - return nil, err + return struct{}{}, err } if ll := p.layerLimit; ll != nil { if *ll > len(p.manifest.Descriptors) { - return nil, errors.Errorf("layer limit %d is greater than the number of layers in the image %d", *ll, len(p.manifest.Descriptors)) + return struct{}{}, errors.Errorf("layer limit %d is greater than the number of layers in the image %d", *ll, len(p.manifest.Descriptors)) } p.manifest.Descriptors = p.manifest.Descriptors[:*ll] } @@ -320,21 +319,21 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach desc := p.manifest.MainManifestDesc k, err := mainManifestKey(ctx, desc, p.Platform, p.layerLimit) if err != nil { - return nil, err + return struct{}{}, err } p.manifestKey = k.String() dt, err := content.ReadBlob(ctx, p.ContentStore, p.manifest.ConfigDesc) if err != nil { - return nil, err + return struct{}{}, err } ck, err := cacheKeyFromConfig(dt, p.layerLimit) if err != nil { - return nil, err + return struct{}{}, err } p.configKey = ck.String() p.cacheKeyDone = true - return nil, nil + return struct{}{}, nil }) if err != nil { return "", "", nil, false, err diff --git a/util/flightcontrol/flightcontrol.go b/util/flightcontrol/flightcontrol.go index 3c1b673e15df..82ed25205fe4 100644 --- a/util/flightcontrol/flightcontrol.go +++ b/util/flightcontrol/flightcontrol.go @@ -25,13 +25,13 @@ type contextKeyT string var contextKey = contextKeyT("buildkit/util/flightcontrol.progress") // Group is a flightcontrol synchronization group -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized +type Group[T any] struct { + mu sync.Mutex // protects m + m map[string]*call[T] // lazily initialized } // Do executes a context function syncronized by the key -func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (v interface{}, err error) { +func (g *Group[T]) Do(ctx context.Context, key string, fn func(ctx context.Context) (T, error)) (v T, err error) { var backoff time.Duration for { v, err = g.do(ctx, key, fn) @@ -53,10 +53,10 @@ func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context) } } -func (g *Group) do(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (interface{}, error) { +func (g *Group[T]) do(ctx context.Context, key string, fn func(ctx context.Context) (T, error)) (T, error) { g.mu.Lock() if g.m == nil { - g.m = make(map[string]*call) + g.m = make(map[string]*call[T]) } if c, ok := g.m[key]; ok { // register 2nd waiter @@ -78,16 +78,16 @@ func (g *Group) do(ctx context.Context, key string, fn func(ctx context.Context) return c.wait(ctx) } -type call struct { +type call[T any] struct { mu sync.Mutex - result interface{} + result T err error ready chan struct{} cleaned chan struct{} - ctx *sharedContext + ctx *sharedContext[T] ctxs []context.Context - fn func(ctx context.Context) (interface{}, error) + fn func(ctx context.Context) (T, error) once sync.Once closeProgressWriter func() @@ -95,8 +95,8 @@ type call struct { progressCtx context.Context } -func newCall(fn func(ctx context.Context) (interface{}, error)) *call { - c := &call{ +func newCall[T any](fn func(ctx context.Context) (T, error)) *call[T] { + c := &call[T]{ fn: fn, ready: make(chan struct{}), cleaned: make(chan struct{}), @@ -114,7 +114,7 @@ func newCall(fn func(ctx context.Context) (interface{}, error)) *call { return c } -func (c *call) run() { +func (c *call[T]) run() { defer c.closeProgressWriter() ctx, cancel := context.WithCancel(c.ctx) defer cancel() @@ -126,7 +126,8 @@ func (c *call) run() { close(c.ready) } -func (c *call) wait(ctx context.Context) (v interface{}, err error) { +func (c *call[T]) wait(ctx context.Context) (v T, err error) { + var empty T c.mu.Lock() // detect case where caller has just returned, let it clean up before select { @@ -134,7 +135,7 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) { c.mu.Unlock() if c.err != nil { // on error retry <-c.cleaned - return nil, errRetry + return empty, errRetry } pw, ok, _ := progress.NewFromContext(ctx) if ok { @@ -145,7 +146,7 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) { case <-c.ctx.done: // could return if no error c.mu.Unlock() <-c.cleaned - return nil, errRetry + return empty, errRetry default: } @@ -174,13 +175,13 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) { if ok { c.progressState.close(pw) } - return nil, ctx.Err() + return empty, ctx.Err() case <-c.ready: return c.result, c.err // shared not implemented yet } } -func (c *call) Deadline() (deadline time.Time, ok bool) { +func (c *call[T]) Deadline() (deadline time.Time, ok bool) { c.mu.Lock() defer c.mu.Unlock() for _, ctx := range c.ctxs { @@ -196,11 +197,11 @@ func (c *call) Deadline() (deadline time.Time, ok bool) { return time.Time{}, false } -func (c *call) Done() <-chan struct{} { +func (c *call[T]) Done() <-chan struct{} { return c.ctx.done } -func (c *call) Err() error { +func (c *call[T]) Err() error { select { case <-c.ctx.Done(): return c.ctx.err @@ -209,7 +210,7 @@ func (c *call) Err() error { } } -func (c *call) Value(key interface{}) interface{} { +func (c *call[T]) Value(key interface{}) interface{} { if key == contextKey { return c.progressState } @@ -239,17 +240,17 @@ func (c *call) Value(key interface{}) interface{} { return nil } -type sharedContext struct { - *call +type sharedContext[T any] struct { + *call[T] done chan struct{} err error } -func newContext(c *call) *sharedContext { - return &sharedContext{call: c, done: make(chan struct{})} +func newContext[T any](c *call[T]) *sharedContext[T] { + return &sharedContext[T]{call: c, done: make(chan struct{})} } -func (sc *sharedContext) checkDone() bool { +func (sc *sharedContext[T]) checkDone() bool { sc.mu.Lock() select { case <-sc.done: diff --git a/util/flightcontrol/flightcontrol_test.go b/util/flightcontrol/flightcontrol_test.go index 9953775442a8..3c8aebdfd8aa 100644 --- a/util/flightcontrol/flightcontrol_test.go +++ b/util/flightcontrol/flightcontrol_test.go @@ -15,7 +15,7 @@ import ( func TestNoCancel(t *testing.T) { t.Parallel() - g := &Group{} + g := &Group[string]{} eg, ctx := errgroup.WithContext(context.Background()) var r1, r2 string var counter int64 @@ -25,7 +25,7 @@ func TestNoCancel(t *testing.T) { if err != nil { return err } - r1 = ret1.(string) + r1 = ret1 return nil }) eg.Go(func() error { @@ -33,7 +33,7 @@ func TestNoCancel(t *testing.T) { if err != nil { return err } - r2 = ret2.(string) + r2 = ret2 return nil }) err := eg.Wait() @@ -45,7 +45,7 @@ func TestNoCancel(t *testing.T) { func TestCancelOne(t *testing.T) { t.Parallel() - g := &Group{} + g := &Group[string]{} eg, ctx := errgroup.WithContext(context.Background()) var r1, r2 string var counter int64 @@ -56,7 +56,7 @@ func TestCancelOne(t *testing.T) { assert.Error(t, err) require.Equal(t, true, errors.Is(err, context.Canceled)) if err == nil { - r1 = ret1.(string) + r1 = ret1 } return nil }) @@ -65,7 +65,7 @@ func TestCancelOne(t *testing.T) { if err != nil { return err } - r2 = ret2.(string) + r2 = ret2 return nil }) eg.Go(func() error { @@ -87,7 +87,7 @@ func TestCancelOne(t *testing.T) { func TestCancelRace(t *testing.T) { // t.Parallel() // disabled for better timing consistency. works with parallel as well - g := &Group{} + g := &Group[struct{}]{} ctx, cancel := context.WithCancel(context.Background()) kick := make(chan struct{}) @@ -96,11 +96,11 @@ func TestCancelRace(t *testing.T) { count := 0 // first run cancels context, second returns cleanly - f := func(ctx context.Context) (interface{}, error) { + f := func(ctx context.Context) (struct{}, error) { done := ctx.Done() if count > 0 { time.Sleep(100 * time.Millisecond) - return nil, nil + return struct{}{}, nil } go func() { for { @@ -118,10 +118,10 @@ func TestCancelRace(t *testing.T) { time.Sleep(50 * time.Millisecond) select { case <-done: - return nil, ctx.Err() + return struct{}{}, ctx.Err() case <-time.After(200 * time.Millisecond): } - return nil, nil + return struct{}{}, nil } go func() { @@ -141,7 +141,7 @@ func TestCancelRace(t *testing.T) { func TestCancelBoth(t *testing.T) { t.Parallel() - g := &Group{} + g := &Group[string]{} eg, ctx := errgroup.WithContext(context.Background()) var r1, r2 string var counter int64 @@ -153,7 +153,7 @@ func TestCancelBoth(t *testing.T) { assert.Error(t, err) require.Equal(t, true, errors.Is(err, context.Canceled)) if err == nil { - r1 = ret1.(string) + r1 = ret1 } return nil }) @@ -162,7 +162,7 @@ func TestCancelBoth(t *testing.T) { assert.Error(t, err) require.Equal(t, true, errors.Is(err, context.Canceled)) if err == nil { - r2 = ret2.(string) + r2 = ret2 } return nil }) @@ -207,13 +207,13 @@ func TestContention(t *testing.T) { wg := sync.WaitGroup{} wg.Add(threads) - g := &Group{} + g := &Group[int]{} for i := 0; i < threads; i++ { for j := 0; j < perthread; j++ { - _, err := g.Do(context.TODO(), "foo", func(ctx context.Context) (interface{}, error) { + _, err := g.Do(context.TODO(), "foo", func(ctx context.Context) (int, error) { time.Sleep(time.Microsecond) - return nil, nil + return 0, nil }) require.NoError(t, err) } @@ -223,12 +223,12 @@ func TestContention(t *testing.T) { wg.Wait() } -func testFunc(wait time.Duration, ret string, counter *int64) func(ctx context.Context) (interface{}, error) { - return func(ctx context.Context) (interface{}, error) { +func testFunc(wait time.Duration, ret string, counter *int64) func(ctx context.Context) (string, error) { + return func(ctx context.Context) (string, error) { atomic.AddInt64(counter, 1) select { case <-ctx.Done(): - return nil, ctx.Err() + return "", ctx.Err() case <-time.After(wait): return ret, nil } diff --git a/util/pull/pull.go b/util/pull/pull.go index 74b843685dd8..9527953c48e3 100644 --- a/util/pull/pull.go +++ b/util/pull/pull.go @@ -32,7 +32,7 @@ type Puller struct { Src reference.Spec Platform ocispecs.Platform - g flightcontrol.Group + g flightcontrol.Group[struct{}] resolveErr error resolveDone bool desc ocispecs.Descriptor @@ -54,9 +54,9 @@ type PulledManifests struct { } func (p *Puller) resolve(ctx context.Context, resolver remotes.Resolver) error { - _, err := p.g.Do(ctx, "", func(ctx context.Context) (_ interface{}, err error) { + _, err := p.g.Do(ctx, "", func(ctx context.Context) (_ struct{}, err error) { if p.resolveErr != nil || p.resolveDone { - return nil, p.resolveErr + return struct{}{}, p.resolveErr } defer func() { if !errors.Is(err, context.Canceled) { @@ -68,12 +68,12 @@ func (p *Puller) resolve(ctx context.Context, resolver remotes.Resolver) error { } ref, desc, err := resolver.Resolve(ctx, p.Src.String()) if err != nil { - return nil, err + return struct{}{}, err } p.desc = desc p.ref = ref p.resolveDone = true - return nil, nil + return struct{}{}, nil }) return err } diff --git a/util/push/push.go b/util/push/push.go index e2d6b190e841..bef56e5ba33c 100644 --- a/util/push/push.go +++ b/util/push/push.go @@ -299,12 +299,12 @@ func updateDistributionSourceHandler(manager content.Manager, pushF images.Handl } func dedupeHandler(h images.HandlerFunc) images.HandlerFunc { - var g flightcontrol.Group + var g flightcontrol.Group[[]ocispecs.Descriptor] res := map[digest.Digest][]ocispecs.Descriptor{} var mu sync.Mutex return images.HandlerFunc(func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) { - res, err := g.Do(ctx, desc.Digest.String(), func(ctx context.Context) (interface{}, error) { + return g.Do(ctx, desc.Digest.String(), func(ctx context.Context) ([]ocispecs.Descriptor, error) { mu.Lock() if r, ok := res[desc.Digest]; ok { mu.Unlock() @@ -322,12 +322,5 @@ func dedupeHandler(h images.HandlerFunc) images.HandlerFunc { mu.Unlock() return children, nil }) - if err != nil { - return nil, err - } - if res == nil { - return nil, nil - } - return res.([]ocispecs.Descriptor), nil }) } diff --git a/util/resolver/authorizer.go b/util/resolver/authorizer.go index d97d32dd6f5d..6c89cf74191f 100644 --- a/util/resolver/authorizer.go +++ b/util/resolver/authorizer.go @@ -33,7 +33,7 @@ type authHandlerNS struct { hosts map[string][]docker.RegistryHost muHosts sync.Mutex sm *session.Manager - g flightcontrol.Group + g flightcontrol.Group[[]docker.RegistryHost] } func newAuthHandlerNS(sm *session.Manager) *authHandlerNS { @@ -230,7 +230,7 @@ type authResult struct { // authHandler is used to handle auth request per registry server. type authHandler struct { - g flightcontrol.Group + g flightcontrol.Group[*authResult] client *http.Client @@ -295,7 +295,7 @@ func (ah *authHandler) doBearerAuth(ctx context.Context, sm *session.Manager, g // Docs: https://docs.docker.com/registry/spec/auth/scope scoped := strings.Join(to.Scopes, " ") - res, err := ah.g.Do(ctx, scoped, func(ctx context.Context) (interface{}, error) { + res, err := ah.g.Do(ctx, scoped, func(ctx context.Context) (*authResult, error) { ah.scopedTokensMu.Lock() r, exist := ah.scopedTokens[scoped] ah.scopedTokensMu.Unlock() @@ -313,15 +313,10 @@ func (ah *authHandler) doBearerAuth(ctx context.Context, sm *session.Manager, g ah.scopedTokensMu.Unlock() return r, nil }) - if err != nil || res == nil { return "", err } - r := res.(*authResult) - if r == nil { - return "", nil - } - return r.token, nil + return res.token, nil } func (ah *authHandler) fetchToken(ctx context.Context, sm *session.Manager, g session.Group, to auth.TokenOptions) (r *authResult, err error) { diff --git a/util/resolver/pool.go b/util/resolver/pool.go index 292ca2e6142d..7b6a2ef50d1f 100644 --- a/util/resolver/pool.go +++ b/util/resolver/pool.go @@ -131,7 +131,7 @@ type Resolver struct { // HostsFunc implements registry configuration of this Resolver func (r *Resolver) HostsFunc(host string) ([]docker.RegistryHost, error) { return func(domain string) ([]docker.RegistryHost, error) { - v, err := r.handler.g.Do(context.TODO(), domain, func(ctx context.Context) (interface{}, error) { + v, err := r.handler.g.Do(context.TODO(), domain, func(ctx context.Context) ([]docker.RegistryHost, error) { // long lock not needed because flightcontrol.Do r.handler.muHosts.Lock() v, ok := r.handler.hosts[domain] @@ -151,13 +151,12 @@ func (r *Resolver) HostsFunc(host string) ([]docker.RegistryHost, error) { if err != nil || v == nil { return nil, err } - vv := v.([]docker.RegistryHost) - if len(vv) == 0 { + if len(v) == 0 { return nil, nil } // make a copy so authorizer is set on unique instance - res := make([]docker.RegistryHost, len(vv)) - copy(res, vv) + res := make([]docker.RegistryHost, len(v)) + copy(res, v) auth := newDockerAuthorizer(res[0].Client, r.handler, r.sm, r.g) for i := range res { res[i].Authorizer = auth