From 23ec51611ff41c58db49a963f6915e63dc90356e Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Wed, 21 Jul 2021 22:53:16 +0000 Subject: [PATCH 1/4] cache: Fix flightcontrol use in computeBlobChain. Previously, the flightcontrol group was being given a key just set to the ref's ID, which meant that concurrent calls using different values of compressionType, createIfNeeded and forceCompression would incorrectly be de-duplicated. The change here splits up the flightcontrol group into a few separate calls and ensures that all the correct input variables are put into the flightcontrol keys. Signed-off-by: Erik Sipsma --- cache/blobs.go | 240 ++++++++++++++++--------------- cache/manager_test.go | 244 +++++++++++++++++++++++++++++++- cache/refs.go | 6 +- cache/remote.go | 11 +- util/compression/compression.go | 11 ++ 5 files changed, 389 insertions(+), 123 deletions(-) diff --git a/cache/blobs.go b/cache/blobs.go index f0a11c6277a8..136d2cd14f99 100644 --- a/cache/blobs.go +++ b/cache/blobs.go @@ -2,6 +2,7 @@ package cache import ( "context" + "fmt" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/leases" @@ -44,29 +45,19 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo } func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) error { - baseCtx := ctx eg, ctx := errgroup.WithContext(ctx) - var currentDescr ocispecs.Descriptor if sr.parent != nil { eg.Go(func() error { return computeBlobChain(ctx, sr.parent, createIfNeeded, compressionType, forceCompression, s) }) } + eg.Go(func() error { - dp, err := g.Do(ctx, sr.ID(), func(ctx context.Context) (interface{}, error) { - refInfo := sr.Info() - if refInfo.Blob != "" { - if forceCompression { - desc, err := sr.ociDesc() - if err != nil { - return nil, err - } - if err := ensureCompression(ctx, sr, desc, compressionType, s); err != nil { - return nil, err - } - } - return nil, nil - } else if !createIfNeeded { + v, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) { + if getBlob(sr.md) != "" { + return sr.ociDesc() + } + if !createIfNeeded { return nil, errors.WithStack(ErrNoBlobs) } @@ -80,91 +71,83 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool return nil, errors.Errorf("unknown layer compression type: %q", compressionType) } - var descr ocispecs.Descriptor - var err error - - if descr.Digest == "" { - // reference needs to be committed - var lower []mount.Mount - if sr.parent != nil { - m, err := sr.parent.Mount(ctx, true, s) - if err != nil { - return nil, err - } - var release func() error - lower, release, err = m.Mount() - if err != nil { - return nil, err - } - if release != nil { - defer release() - } - } - m, err := sr.Mount(ctx, true, s) + var lower []mount.Mount + if sr.parent != nil { + m, err := sr.parent.Mount(ctx, true, s) if err != nil { return nil, err } - upper, release, err := m.Mount() + var release func() error + lower, release, err = m.Mount() if err != nil { return nil, err } if release != nil { defer release() } - descr, err = sr.cm.Differ.Compare(ctx, lower, upper, - diff.WithMediaType(mediaType), - diff.WithReference(sr.ID()), - ) - if err != nil { - return nil, err - } + } + m, err := sr.Mount(ctx, true, s) + if err != nil { + return nil, err + } + upper, release, err := m.Mount() + if err != nil { + return nil, err + } + if release != nil { + defer release() + } + desc, err := sr.cm.Differ.Compare(ctx, lower, upper, + diff.WithMediaType(mediaType), + diff.WithReference(sr.ID()), + ) + if err != nil { + return nil, err } - if descr.Annotations == nil { - descr.Annotations = map[string]string{} + if desc.Annotations == nil { + desc.Annotations = map[string]string{} } - info, err := sr.cm.ContentStore.Info(ctx, descr.Digest) + info, err := sr.cm.ContentStore.Info(ctx, desc.Digest) if err != nil { return nil, err } if diffID, ok := info.Labels[containerdUncompressed]; ok { - descr.Annotations[containerdUncompressed] = diffID - } else if compressionType == compression.Uncompressed { - descr.Annotations[containerdUncompressed] = descr.Digest.String() + desc.Annotations[containerdUncompressed] = diffID + } else if mediaType == ocispecs.MediaTypeImageLayer { + desc.Annotations[containerdUncompressed] = desc.Digest.String() } else { return nil, errors.Errorf("unknown layer compression type") } - if forceCompression { - if err := ensureCompression(ctx, sr, descr, compressionType, s); err != nil { - return nil, err - } + if err := sr.setBlob(ctx, desc); err != nil { + return nil, err } - return descr, nil - + return desc, nil }) if err != nil { return err } + descr, ok := v.(ocispecs.Descriptor) + if !ok { + return fmt.Errorf("invalid descriptor returned by differ while computing blob for %s", sr.ID()) + } - if dp != nil { - currentDescr = dp.(ocispecs.Descriptor) + if forceCompression { + if err := ensureCompression(ctx, sr, descr, compressionType, s); err != nil { + return err + } } return nil }) - err := eg.Wait() - if err != nil { + + if err := eg.Wait(); err != nil { return err } - if currentDescr.Digest != "" { - if err := sr.setBlob(baseCtx, currentDescr); err != nil { - return err - } - } - return nil + return sr.setChains(ctx) } // setBlob associates a blob with the cache record. @@ -183,10 +166,15 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e return err } + compressionType := compression.FromMediaType(desc.MediaType) + if compressionType == compression.UnknownCompression { + return errors.Errorf("unhandled layer media type: %q", desc.MediaType) + } + sr.mu.Lock() defer sr.mu.Unlock() - if getChainID(sr.md) != "" { + if getBlob(sr.md) != "" { return nil } @@ -194,18 +182,6 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e return err } - p := sr.parent - var parentChainID digest.Digest - var parentBlobChainID digest.Digest - if p != nil { - pInfo := p.Info() - if pInfo.ChainID == "" || pInfo.BlobChainID == "" { - return errors.Errorf("failed to set blob for reference with non-addressable parent") - } - parentChainID = pInfo.ChainID - parentBlobChainID = pInfo.BlobChainID - } - if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{ ID: desc.Digest.String(), Type: "content", @@ -215,16 +191,45 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e queueDiffID(sr.md, diffID.String()) queueBlob(sr.md, desc.Digest.String()) - chainID := diffID - blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID}) - if parentChainID != "" { - chainID = imagespecidentity.ChainID([]digest.Digest{parentChainID, chainID}) - blobChainID = imagespecidentity.ChainID([]digest.Digest{parentBlobChainID, blobChainID}) + queueMediaType(sr.md, desc.MediaType) + queueBlobSize(sr.md, desc.Size) + if err := sr.md.Commit(); err != nil { + return err } + + if err := sr.addCompressionBlob(ctx, desc.Digest, compressionType); err != nil { + return err + } + return nil +} + +func (sr *immutableRef) setChains(ctx context.Context) error { + if _, ok := leases.FromContext(ctx); !ok { + return errors.Errorf("missing lease requirement for setChains") + } + + sr.mu.Lock() + defer sr.mu.Unlock() + + if getChainID(sr.md) != "" { + return nil + } + + var chainIDs []digest.Digest + var blobChainIDs []digest.Digest + if sr.parent != nil { + chainIDs = append(chainIDs, digest.Digest(getChainID(sr.parent.md))) + blobChainIDs = append(blobChainIDs, digest.Digest(getBlobChainID(sr.parent.md))) + } + diffID := digest.Digest(getDiffID(sr.md)) + chainIDs = append(chainIDs, diffID) + blobChainIDs = append(blobChainIDs, imagespecidentity.ChainID([]digest.Digest{digest.Digest(getBlob(sr.md)), diffID})) + + chainID := imagespecidentity.ChainID(chainIDs) + blobChainID := imagespecidentity.ChainID(blobChainIDs) + queueChainID(sr.md, chainID.String()) queueBlobChainID(sr.md, blobChainID.String()) - queueMediaType(sr.md, desc.MediaType) - queueBlobSize(sr.md, desc.Size) if err := sr.md.Commit(); err != nil { return err } @@ -243,36 +248,39 @@ func isTypeWindows(sr *immutableRef) bool { // ensureCompression ensures the specified ref has the blob of the specified compression Type. func ensureCompression(ctx context.Context, ref *immutableRef, desc ocispecs.Descriptor, compressionType compression.Type, s session.Group) error { - // Resolve converters - layerConvertFunc, _, err := getConverters(desc, compressionType) - if err != nil { - return err - } else if layerConvertFunc == nil { - return nil // no need to convert - } + _, err := g.Do(ctx, fmt.Sprintf("%s-%d", desc.Digest, compressionType), func(ctx context.Context) (interface{}, error) { + // Resolve converters + layerConvertFunc, _, err := getConverters(desc, compressionType) + if err != nil { + return nil, err + } else if layerConvertFunc == nil { + return nil, nil // no need to convert + } - // First, lookup local content store - if _, err := ref.getCompressionBlob(ctx, compressionType); err == nil { - return nil // found the compression variant. no need to convert. - } + // First, lookup local content store + if _, err := ref.getCompressionBlob(ctx, compressionType); err == nil { + return nil, nil // found the compression variant. no need to convert. + } - // Convert layer compression type - if err := (lazyRefProvider{ - ref: ref, - desc: desc, - dh: ref.descHandlers[desc.Digest], - session: s, - }).Unlazy(ctx); err != nil { - return err - } - newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc) - if err != nil { - return err - } + // Convert layer compression type + if err := (lazyRefProvider{ + ref: ref, + desc: desc, + dh: ref.descHandlers[desc.Digest], + session: s, + }).Unlazy(ctx); err != nil { + return nil, err + } + newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc) + if err != nil { + return nil, err + } - // Start to track converted layer - if err := ref.addCompressionBlob(ctx, newDesc.Digest, compressionType); err != nil { - return err - } - return nil + // Start to track converted layer + if err := ref.addCompressionBlob(ctx, newDesc.Digest, compressionType); err != nil { + return nil, err + } + return nil, nil + }) + return err } diff --git a/cache/manager_test.go b/cache/manager_test.go index c5f2c0f9e6a9..34242990f830 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -5,16 +5,20 @@ import ( "bytes" "compress/gzip" "context" + "fmt" "io" "io/ioutil" "os" "path/filepath" "runtime" + "strconv" "testing" + "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/diff/apply" + "github.com/containerd/containerd/diff/walking" "github.com/containerd/containerd/leases" ctdmetadata "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/namespaces" @@ -22,14 +26,19 @@ import ( "github.com/containerd/containerd/snapshots/native" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" + "github.com/moby/buildkit/util/compression" + "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/leaseutil" + "github.com/moby/buildkit/util/winlayers" digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/stretchr/testify/require" bolt "go.etcd.io/bbolt" + "golang.org/x/sync/errgroup" ) type cmOpt struct { @@ -116,15 +125,17 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() return nil, nil, err } - lm := ctdmetadata.NewLeaseManager(mdb) + store = containerdsnapshot.NewContentStore(mdb.ContentStore(), ns) + lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), ns) cm, err := NewManager(ManagerOpt{ Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil), MetadataStore: md, - ContentStore: mdb.ContentStore(), - LeaseManager: leaseutil.WithNamespace(lm, ns), + ContentStore: store, + LeaseManager: lm, GarbageCollect: mdb.GarbageCollect, - Applier: apply.NewFileSystemApplier(mdb.ContentStore()), + Applier: winlayers.NewFileSystemApplierWithWindows(store, apply.NewFileSystemApplier(store)), + Differ: winlayers.NewWalkingDiffWithWindows(store, walking.NewWalkingDiff(store)), }) if err != nil { return nil, nil, err @@ -132,7 +143,7 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() return &cmOut{ manager: cm, lm: lm, - cs: mdb.ContentStore(), + cs: store, }, cleanup, nil } @@ -506,6 +517,8 @@ func TestExtractOnMutable(t *testing.T) { err = snap.(*immutableRef).setBlob(leaseCtx, desc) done(context.TODO()) require.NoError(t, err) + err = snap.(*immutableRef).setChains(leaseCtx) + require.NoError(t, err) snap2, err := cm.GetByBlob(ctx, desc2, snap) require.NoError(t, err) @@ -623,6 +636,8 @@ func TestSetBlob(t *testing.T) { err = snap.(*immutableRef).setBlob(ctx, desc) require.NoError(t, err) + err = snap.(*immutableRef).setChains(ctx) + require.NoError(t, err) info = snap.Info() require.Equal(t, desc.Annotations["containerd.io/uncompressed"], string(info.DiffID)) @@ -647,6 +662,8 @@ func TestSetBlob(t *testing.T) { err = snap2.(*immutableRef).setBlob(ctx, desc2) require.NoError(t, err) + err = snap2.(*immutableRef).setChains(ctx) + require.NoError(t, err) info2 := snap2.Info() require.Equal(t, desc2.Annotations["containerd.io/uncompressed"], string(info2.DiffID)) @@ -997,6 +1014,171 @@ func TestLazyCommit(t *testing.T) { require.Equal(t, true, errors.Is(err, errNotFound)) } +func TestGetRemote(t *testing.T) { + t.Parallel() + // windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'" + if runtime.GOOS != "linux" { + t.Skipf("unsupported GOOS: %s", runtime.GOOS) + } + + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") + + tmpdir, err := ioutil.TempDir("", "cachemanager") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) + require.NoError(t, err) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + defer cleanup() + cm := co.manager + + ctx, done, err := leaseutil.WithLease(ctx, co.lm, leaseutil.MakeTemporary) + require.NoError(t, err) + defer done(context.TODO()) + + contentBuffer := contentutil.NewBuffer() + + descHandlers := DescHandlers(map[digest.Digest]*DescHandler{}) + + // make some lazy refs from blobs + expectedContent := map[digest.Digest]struct{}{} + var descs []ocispecs.Descriptor + for i := 0; i < 2; i++ { + blobmap := map[string]string{"foo": strconv.Itoa(i)} + blobBytes, desc, err := mapToBlob(blobmap, true) + require.NoError(t, err) + + expectedContent[desc.Digest] = struct{}{} + descs = append(descs, desc) + + cw, err := contentBuffer.Writer(ctx) + require.NoError(t, err) + _, err = cw.Write(blobBytes) + require.NoError(t, err) + err = cw.Commit(ctx, 0, cw.Digest()) + require.NoError(t, err) + + descHandlers[desc.Digest] = &DescHandler{ + Provider: func(_ session.Group) content.Provider { return contentBuffer }, + } + + _, uncompressedDesc, err := mapToBlob(blobmap, false) + require.NoError(t, err) + expectedContent[uncompressedDesc.Digest] = struct{}{} + } + + // Create 3 levels of mutable refs, where each parent ref has 2 children (this tests parallel creation of + // overlapping blob chains). + lazyRef, err := cm.GetByBlob(ctx, descs[0], nil, descHandlers) + require.NoError(t, err) + + refs := []ImmutableRef{lazyRef} + for i := 0; i < 3; i++ { + var newRefs []ImmutableRef + for j, ir := range refs { + for k := 0; k < 2; k++ { + mutRef, err := cm.New(ctx, ir, nil, descHandlers) + require.NoError(t, err) + + m, err := mutRef.Mount(ctx, false, nil) + require.NoError(t, err) + + lm := snapshot.LocalMounter(m) + target, err := lm.Mount() + require.NoError(t, err) + + f, err := os.Create(filepath.Join(target, fmt.Sprintf("%d-%d-%d", i, j, k))) + require.NoError(t, err) + err = os.Chtimes(f.Name(), time.Unix(0, 0), time.Unix(0, 0)) + require.NoError(t, err) + + _, desc, err := fileToBlob(f, true) + require.NoError(t, err) + expectedContent[desc.Digest] = struct{}{} + _, desc, err = fileToBlob(f, false) + require.NoError(t, err) + expectedContent[desc.Digest] = struct{}{} + + f.Close() + err = lm.Unmount() + require.NoError(t, err) + + immutRef, err := mutRef.Commit(ctx) + require.NoError(t, err) + newRefs = append(newRefs, immutRef) + } + } + refs = newRefs + } + + // also test the original lazyRef to get coverage for refs that don't have to be extracted from the snapshotter + lazyRef2, err := cm.GetByBlob(ctx, descs[1], nil, descHandlers) + require.NoError(t, err) + refs = append(refs, lazyRef2) + + checkNumBlobs(ctx, t, co.cs, 1) + + // Call GetRemote on all the refs + eg, egctx := errgroup.WithContext(ctx) + for _, ir := range refs { + ir := ir.(*immutableRef) + for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip} { + compressionType := compressionType + eg.Go(func() error { + remote, err := ir.GetRemote(egctx, true, compressionType, true, nil) + require.NoError(t, err) + refChain := ir.parentRefChain() + for i, desc := range remote.Descriptors { + switch compressionType { + case compression.Uncompressed: + require.Equal(t, ocispecs.MediaTypeImageLayer, desc.MediaType) + case compression.Gzip: + require.Equal(t, ocispecs.MediaTypeImageLayerGzip, desc.MediaType) + default: + require.Fail(t, "unhandled media type", compressionType) + } + require.Contains(t, expectedContent, desc.Digest) + + r := refChain[i] + isLazy, err := r.isLazy(egctx) + require.NoError(t, err) + info, err := r.getCompressionBlob(egctx, compressionType) + if isLazy { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, info.Digest, desc.Digest) + } + } + return nil + }) + } + } + require.NoError(t, eg.Wait()) + + // verify there's a 1-to-1 mapping between the content store and what we expected to be there + err = co.cs.Walk(ctx, func(info content.Info) error { + var matched bool + for expected := range expectedContent { + if info.Digest == expected { + delete(expectedContent, expected) + matched = true + break + } + } + require.True(t, matched, "match for blob: %s", info.Digest) + return nil + }) + require.NoError(t, err) + require.Equal(t, map[digest.Digest]struct{}{}, expectedContent) +} + func checkDiskUsage(ctx context.Context, t *testing.T, cm Manager, inuse, unused int) { du, err := cm.DiskUsage(ctx, client.DiskUsageInfo{}) require.NoError(t, err) @@ -1093,3 +1275,55 @@ func mapToBlob(m map[string]string, compress bool) ([]byte, ocispecs.Descriptor, }, }, nil } + +func fileToBlob(file *os.File, compress bool) ([]byte, ocispecs.Descriptor, error) { + buf := bytes.NewBuffer(nil) + sha := digest.SHA256.Digester() + + var dest io.WriteCloser = bufferCloser{buf} + if compress { + dest = gzip.NewWriter(buf) + } + tw := tar.NewWriter(io.MultiWriter(sha.Hash(), dest)) + + info, err := file.Stat() + if err != nil { + return nil, ocispecs.Descriptor{}, err + } + + fi, err := tar.FileInfoHeader(info, "") + if err != nil { + return nil, ocispecs.Descriptor{}, err + } + fi.Format = tar.FormatPAX + fi.ModTime = fi.ModTime.Truncate(time.Second) + fi.AccessTime = time.Time{} + fi.ChangeTime = time.Time{} + + if err := tw.WriteHeader(fi); err != nil { + return nil, ocispecs.Descriptor{}, err + } + if _, err := io.Copy(tw, file); err != nil { + return nil, ocispecs.Descriptor{}, err + } + + if err := tw.Close(); err != nil { + return nil, ocispecs.Descriptor{}, err + } + if err := dest.Close(); err != nil { + return nil, ocispecs.Descriptor{}, err + } + + mediaType := ocispecs.MediaTypeImageLayer + if compress { + mediaType = ocispecs.MediaTypeImageLayerGzip + } + return buf.Bytes(), ocispecs.Descriptor{ + Digest: digest.FromBytes(buf.Bytes()), + MediaType: mediaType, + Size: int64(buf.Len()), + Annotations: map[string]string{ + "containerd.io/uncompressed": sha.Digest().String(), + }, + }, nil +} diff --git a/cache/refs.go b/cache/refs.go index 8257a7dabf58..19d5754d0b91 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -403,7 +403,11 @@ func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType if err != nil { return content.Info{}, err } - return cs.Info(ctx, dgst) + info, err := cs.Info(ctx, dgst) + if err != nil { + return content.Info{}, err + } + return info, nil } return content.Info{}, errdefs.ErrNotFound } diff --git a/cache/remote.go b/cache/remote.go index 630071ddd9e0..d19c3e9eb732 100644 --- a/cache/remote.go +++ b/cache/remote.go @@ -224,7 +224,16 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { } } } - return nil, err + + compressionType := compression.FromMediaType(p.desc.MediaType) + if compressionType == compression.UnknownCompression { + return nil, errors.Errorf("unhandled layer media type: %q", p.desc.MediaType) + } + + if err := p.ref.addCompressionBlob(ctx, p.desc.Digest, compressionType); err != nil { + return nil, err + } + return nil, nil }) return err } diff --git a/util/compression/compression.go b/util/compression/compression.go index 1773ad669ae7..8386e7a13a2a 100644 --- a/util/compression/compression.go +++ b/util/compression/compression.go @@ -40,6 +40,17 @@ func (ct Type) String() string { } } +func FromMediaType(mediaType string) Type { + switch toOCILayerType[mediaType] { + case ocispecs.MediaTypeImageLayer: + return Uncompressed + case ocispecs.MediaTypeImageLayerGzip: + return Gzip + default: + return UnknownCompression + } +} + // DetectLayerMediaType returns media type from existing blob data. func DetectLayerMediaType(ctx context.Context, cs content.Store, id digest.Digest, oci bool) (string, error) { ra, err := cs.ReaderAt(ctx, ocispecs.Descriptor{Digest: id}) From 808091a4e1918558c93447d6662fe43e3c84f5b9 Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Wed, 21 Jul 2021 23:05:55 +0000 Subject: [PATCH 2/4] solver: include cachemap index in flightcontrol. Signed-off-by: Erik Sipsma --- solver/jobs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solver/jobs.go b/solver/jobs.go index b5cc3ecedaa5..0533366b2248 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -715,7 +715,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp, if err != nil { return nil, err } - res, err := s.g.Do(ctx, "cachemap", func(ctx context.Context) (ret interface{}, retErr error) { + res, err := s.g.Do(ctx, fmt.Sprintf("cachemap-%d", index), func(ctx context.Context) (ret interface{}, retErr error) { if s.cacheRes != nil && s.cacheDone || index < len(s.cacheRes) { return s.cacheRes, nil } From 4237175ac7b0cf1b4ffbba8a5c2c87558d9ad3db Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Wed, 21 Jul 2021 23:22:28 +0000 Subject: [PATCH 3/4] pull: use resolvemode in flightcontrol key. Signed-off-by: Erik Sipsma --- source/containerimage/pull.go | 1 + 1 file changed, 1 insertion(+) diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index ad02a08d93b3..f060a156dbbb 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -83,6 +83,7 @@ func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.Re if err != nil { return "", nil, err } + key += rm.String() res, err := is.g.Do(ctx, key, func(ctx context.Context) (interface{}, error) { res := resolver.DefaultPool.GetResolver(is.RegistryHosts, ref, "pull", sm, g).WithImageStore(is.ImageStore, rm) From b087e19e05d5aa7b5b931d5c97c79090bf206f37 Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Wed, 21 Jul 2021 23:51:14 +0000 Subject: [PATCH 4/4] util: remove outdated flightcontrol test assertion. The test was making an assertion that is no longer expected to always be true after #2195, which purposely made flightcontrol less deterministic. This lead to occasional failures. Signed-off-by: Erik Sipsma --- util/flightcontrol/flightcontrol_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/util/flightcontrol/flightcontrol_test.go b/util/flightcontrol/flightcontrol_test.go index 634a1e85b4e1..9953775442a8 100644 --- a/util/flightcontrol/flightcontrol_test.go +++ b/util/flightcontrol/flightcontrol_test.go @@ -193,15 +193,11 @@ func TestCancelBoth(t *testing.T) { assert.NoError(t, err) assert.Equal(t, ret1, "bar") - f2 := testFunc(100*time.Millisecond, "baz", &counter) - ret1, err = g.Do(context.TODO(), "foo", f2) - assert.NoError(t, err) - assert.Equal(t, ret1, "baz") ret1, err = g.Do(context.TODO(), "abc", f) assert.NoError(t, err) assert.Equal(t, ret1, "bar") - assert.Equal(t, counter, int64(4)) + assert.Equal(t, counter, int64(3)) } func TestContention(t *testing.T) {