Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 29 additions & 28 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"golang.org/x/sync/errgroup"
)

var g flightcontrol.Group
var g flightcontrol.Group[struct{}]
var gFileList flightcontrol.Group[[]string]

const containerdUncompressed = "containerd.io/uncompressed"

Expand Down Expand Up @@ -86,12 +87,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool

if _, ok := filter[sr.ID()]; ok {
eg.Go(func() error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (struct{}, error) {
if sr.getBlob() != "" {
return nil, nil
return struct{}{}, nil
}
if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
return struct{}{}, errors.WithStack(ErrNoBlobs)
}

compressorFunc, finalize := comp.Type.Compress(ctx, comp)
Expand All @@ -108,12 +109,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if lowerRef != nil {
m, err := lowerRef.Mount(ctx, true, s)
if err != nil {
return nil, err
return struct{}{}, err
}
var release func() error
lower, release, err = m.Mount()
if err != nil {
return nil, err
return struct{}{}, err
}
if release != nil {
defer release()
Expand All @@ -131,12 +132,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if upperRef != nil {
m, err := upperRef.Mount(ctx, true, s)
if err != nil {
return nil, err
return struct{}{}, err
}
var release func() error
upper, release, err = m.Mount()
if err != nil {
return nil, err
return struct{}{}, err
}
if release != nil {
defer release()
Expand All @@ -151,7 +152,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" && sr.kind() != Diff {
enableOverlay, err = strconv.ParseBool(forceOvlStr)
if err != nil {
return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
return struct{}{}, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
}
fallback = false // prohibit fallback on debug
} else if !isTypeWindows(sr) {
Expand All @@ -173,10 +174,10 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if !ok || err != nil {
if !fallback {
if !ok {
return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
return struct{}{}, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
}
if err != nil {
return nil, errors.Wrapf(err, "failed to compute overlay diff")
return struct{}{}, errors.Wrapf(err, "failed to compute overlay diff")
}
}
if logWarnOnErr {
Expand Down Expand Up @@ -209,7 +210,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
diff.WithCompressor(compressorFunc),
)
if err != nil {
return nil, err
return struct{}{}, err
}
}

Expand All @@ -219,29 +220,29 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if finalize != nil {
a, err := finalize(ctx, sr.cm.ContentStore)
if err != nil {
return nil, errors.Wrapf(err, "failed to finalize compression")
return struct{}{}, errors.Wrapf(err, "failed to finalize compression")
}
for k, v := range a {
desc.Annotations[k] = v
}
}
info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
if err != nil {
return nil, err
return struct{}{}, err
}

if diffID, ok := info.Labels[containerdUncompressed]; ok {
desc.Annotations[containerdUncompressed] = diffID
} else if mediaType == ocispecs.MediaTypeImageLayer {
desc.Annotations[containerdUncompressed] = desc.Digest.String()
} else {
return nil, errors.Errorf("unknown layer compression type")
return struct{}{}, errors.Errorf("unknown layer compression type")
}

if err := sr.setBlob(ctx, desc); err != nil {
return nil, err
return struct{}{}, err
}
return nil, nil
return struct{}{}, nil
})
if err != nil {
return err
Expand Down Expand Up @@ -415,29 +416,29 @@ func isTypeWindows(sr *immutableRef) bool {

// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (struct{}, error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers, true)
if err != nil {
return nil, err
return struct{}{}, err
}

// Resolve converters
layerConvertFunc, err := getConverter(ctx, ref.cm.ContentStore, desc, comp)
if err != nil {
return nil, err
return struct{}{}, err
} else if layerConvertFunc == nil {
if isLazy, err := ref.isLazy(ctx); err != nil {
return nil, err
return struct{}{}, err
} else if isLazy {
// This ref can be used as the specified compressionType. Keep it lazy.
return nil, nil
return struct{}{}, nil
}
return nil, ref.linkBlob(ctx, desc)
return struct{}{}, ref.linkBlob(ctx, desc)
}

// First, lookup local content store
if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil {
return nil, nil // found the compression variant. no need to convert.
return struct{}{}, nil // found the compression variant. no need to convert.
}

// Convert layer compression type
Expand All @@ -447,18 +448,18 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
dh: ref.descHandlers[desc.Digest],
session: s,
}).Unlazy(ctx); err != nil {
return nil, err
return struct{}{}, err
}
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
if err != nil {
return nil, errors.Wrapf(err, "failed to convert")
return struct{}{}, errors.Wrapf(err, "failed to convert")
}

// Start to track converted layer
if err := ref.linkBlob(ctx, *newDesc); err != nil {
return nil, errors.Wrapf(err, "failed to add compression blob")
return struct{}{}, errors.Wrapf(err, "failed to add compression blob")
}
return nil, nil
return struct{}{}, nil
})
return err
}
9 changes: 1 addition & 8 deletions cache/filelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const keyFileList = "filelist"
// are in the tar stream (AUFS whiteout format). If the reference does not have a
// a blob associated with it, the list is empty.
func (sr *immutableRef) FileList(ctx context.Context, s session.Group) ([]string, error) {
res, err := g.Do(ctx, fmt.Sprintf("filelist-%s", sr.ID()), func(ctx context.Context) (interface{}, error) {
return gFileList.Do(ctx, fmt.Sprintf("filelist-%s", sr.ID()), func(ctx context.Context) ([]string, error) {
dt, err := sr.GetExternal(keyFileList)
if err == nil && dt != nil {
var files []string
Expand Down Expand Up @@ -80,11 +80,4 @@ func (sr *immutableRef) FileList(ctx context.Context, s session.Group) ([]string
}
return files, nil
})
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return res.([]string), nil
}
2 changes: 1 addition & 1 deletion cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type cacheManager struct {
mountPool sharableMountPool

muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
unlazyG flightcontrol.Group[struct{}]
}

func NewManager(opt ManagerOpt) (Manager, error) {
Expand Down
26 changes: 11 additions & 15 deletions cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type cacheRecord struct {

mountCache snapshot.Mountable

sizeG flightcontrol.Group
sizeG flightcontrol.Group[int64]

// these are filled if multiple refs point to same data
equalMutable *mutableRef
Expand Down Expand Up @@ -325,7 +325,7 @@ func (cr *cacheRecord) viewSnapshotID() string {

func (cr *cacheRecord) size(ctx context.Context) (int64, error) {
// this expects that usage() is implemented lazily
s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) {
return cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (int64, error) {
cr.mu.Lock()
s := cr.getSize()
if s != sizeUnknown {
Expand All @@ -346,7 +346,7 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) {
isDead := cr.isDead()
cr.mu.Unlock()
if isDead {
return int64(0), nil
return 0, nil
}
if !errors.Is(err, errdefs.ErrNotFound) {
return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID())
Expand Down Expand Up @@ -379,10 +379,6 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) {
cr.mu.Unlock()
return usage.Size, nil
})
if err != nil {
return 0, err
}
return s.(int64), nil
}

// caller must hold cr.mu
Expand Down Expand Up @@ -1057,7 +1053,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context,
}

func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s session.Group) error {
_, err := sr.sizeG.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ interface{}, rerr error) {
_, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ struct{}, rerr error) {
dhs := sr.descHandlers
for _, r := range sr.layerChain() {
r := r
Expand All @@ -1069,7 +1065,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
dh := dhs[digest.Digest(r.getBlob())]
if dh == nil {
// We cannot prepare remote snapshots without descHandler.
return nil, nil
return struct{}{}, nil
}

// tmpLabels contains dh.SnapshotLabels + session IDs. All keys contain
Expand Down Expand Up @@ -1121,7 +1117,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
break
}

return nil, nil
return struct{}{}, nil
})
return err
}
Expand All @@ -1144,18 +1140,18 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields
}

func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool) error {
_, err := sr.sizeG.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ interface{}, rerr error) {
_, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ struct{}, rerr error) {
if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil {
return nil, nil
return struct{}{}, nil
}

switch sr.kind() {
case Merge, Diff:
return nil, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel)
return struct{}{}, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel)
case Layer, BaseLayer:
return nil, sr.unlazyLayer(ctx, dhs, pg, s)
return struct{}{}, sr.unlazyLayer(ctx, dhs, pg, s)
}
return nil, nil
return struct{}{}, nil
})
return err
}
Expand Down
14 changes: 7 additions & 7 deletions cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,11 @@ func (p lazyRefProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor)
}

func (p lazyRefProvider) Unlazy(ctx context.Context) error {
_, err := p.ref.cm.unlazyG.Do(ctx, string(p.desc.Digest), func(ctx context.Context) (_ interface{}, rerr error) {
_, err := p.ref.cm.unlazyG.Do(ctx, string(p.desc.Digest), func(ctx context.Context) (_ struct{}, rerr error) {
if isLazy, err := p.ref.isLazy(ctx); err != nil {
return nil, err
return struct{}{}, err
} else if !isLazy {
return nil, nil
return struct{}{}, nil
}
defer func() {
if rerr == nil {
Expand All @@ -320,7 +320,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
if p.dh == nil {
// shouldn't happen, if you have a lazy immutable ref it already should be validated
// that descriptor handlers exist for it
return nil, errors.New("unexpected nil descriptor handler")
return struct{}{}, errors.New("unexpected nil descriptor handler")
}

if p.dh.Progress != nil {
Expand All @@ -337,20 +337,20 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
Manager: p.ref.cm.ContentStore,
}, p.desc, p.dh.Ref, logs.LoggerFromContext(ctx))
if err != nil {
return nil, err
return struct{}{}, err
}

if imageRefs := p.ref.getImageRefs(); len(imageRefs) > 0 {
// just use the first image ref, it's arbitrary
imageRef := imageRefs[0]
if p.ref.GetDescription() == "" {
if err := p.ref.SetDescription("pulled from " + imageRef); err != nil {
return nil, err
return struct{}{}, err
}
}
}

return nil, nil
return struct{}{}, nil
})
return err
}
4 changes: 2 additions & 2 deletions client/llb/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type asyncState struct {
target State
set bool
err error
g flightcontrol.Group
g flightcontrol.Group[State]
}

func (as *asyncState) Output() Output {
Expand Down Expand Up @@ -53,7 +53,7 @@ func (as *asyncState) ToInput(ctx context.Context, c *Constraints) (*pb.Input, e
}

func (as *asyncState) Do(ctx context.Context, c *Constraints) error {
_, err := as.g.Do(ctx, "", func(ctx context.Context) (interface{}, error) {
_, err := as.g.Do(ctx, "", func(ctx context.Context) (State, error) {
if as.set {
return as.target, as.err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/oci/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func GetHostsFile(ctx context.Context, stateDir string, extraHosts []executor.Ho
return makeHostsFile(stateDir, extraHosts, idmap, hostname)
}

_, err := g.Do(ctx, stateDir, func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, stateDir, func(ctx context.Context) (struct{}, error) {
_, _, err := makeHostsFile(stateDir, nil, idmap, hostname)
return nil, err
return struct{}{}, err
})
if err != nil {
return "", nil, err
Expand Down
Loading