diff --git a/cache/blobs.go b/cache/blobs.go index 548601f58ffa..22cda1863c2c 100644 --- a/cache/blobs.go +++ b/cache/blobs.go @@ -241,7 +241,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool return nil, errors.Errorf("unknown layer compression type") } - if err := sr.setBlob(ctx, comp.Type, desc); err != nil { + if err := sr.setBlob(ctx, desc); err != nil { return nil, err } return nil, nil @@ -267,10 +267,15 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool // setBlob associates a blob with the cache record. // A lease must be held for the blob when calling this function -func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression.Type, desc ocispecs.Descriptor) error { +func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) (rerr error) { if _, ok := leases.FromContext(ctx); !ok { return errors.Errorf("missing lease requirement for setBlob") } + defer func() { + if rerr == nil { + rerr = sr.linkBlob(ctx, desc) + } + }() diffID, err := diffIDFromDescriptor(desc) if err != nil { @@ -280,10 +285,6 @@ func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression return err } - if compressionType == compression.UnknownCompression { - return errors.Errorf("unhandled layer media type: %q", desc.MediaType) - } - sr.mu.Lock() defer sr.mu.Unlock() @@ -311,9 +312,6 @@ func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression return err } - if err := sr.addCompressionBlob(ctx, desc, compressionType); err != nil { - return err - } return nil } @@ -437,11 +435,11 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression. // This ref can be used as the specified compressionType. Keep it lazy. return nil, nil } - return nil, ref.addCompressionBlob(ctx, desc, comp.Type) + return nil, ref.linkBlob(ctx, desc) } // First, lookup local content store - if _, err := ref.getCompressionBlob(ctx, comp.Type); err == nil { + if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil { return nil, nil // found the compression variant. no need to convert. } @@ -460,7 +458,7 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression. } // Start to track converted layer - if err := ref.addCompressionBlob(ctx, *newDesc, comp.Type); err != nil { + if err := ref.linkBlob(ctx, *newDesc); err != nil { return nil, errors.Wrapf(err, "failed to add compression blob") } return nil, nil diff --git a/cache/converter.go b/cache/converter.go index 949883cefd7b..a7e4df193aff 100644 --- a/cache/converter.go +++ b/cache/converter.go @@ -14,6 +14,7 @@ import ( "github.com/containerd/containerd/images/converter" "github.com/containerd/containerd/labels" "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/compression" digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" @@ -129,6 +130,7 @@ var bufioPool = sync.Pool{ } func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) { + bklog.G(ctx).WithField("blob", desc).WithField("target", c.target).Debugf("converting blob to the target compression") // prepare the source and destination labelz := make(map[string]string) ref := fmt.Sprintf("convert-from-%s-to-%s-%s", desc.Digest, c.target.Type.String(), identity.NewID()) diff --git a/cache/manager_test.go b/cache/manager_test.go index dcde55aa13ec..78d31473c6e6 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -5,6 +5,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/binary" "fmt" "io" "io/ioutil" @@ -597,11 +598,7 @@ func TestExtractOnMutable(t *testing.T) { leaseCtx, done, err := leaseutil.WithLease(ctx, co.lm, leases.WithExpiration(0)) require.NoError(t, err) - compressionType := compression.FromMediaType(desc.MediaType) - if compressionType == compression.UnknownCompression { - t.Errorf("unhandled layer media type: %q", desc.MediaType) - } - err = snap.(*immutableRef).setBlob(leaseCtx, compressionType, desc) + err = snap.(*immutableRef).setBlob(leaseCtx, desc) done(context.TODO()) require.NoError(t, err) err = snap.(*immutableRef).computeChainMetadata(leaseCtx, map[string]struct{}{snap.ID(): {}}) @@ -713,7 +710,7 @@ func TestSetBlob(t *testing.T) { err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc) require.NoError(t, err) - err = snap.(*immutableRef).setBlob(ctx, compression.UnknownCompression, ocispecs.Descriptor{ + err = snap.(*immutableRef).setBlob(ctx, ocispecs.Descriptor{ Digest: digest.FromBytes([]byte("foobar")), Annotations: map[string]string{ "containerd.io/uncompressed": digest.FromBytes([]byte("foobar2")).String(), @@ -721,11 +718,7 @@ func TestSetBlob(t *testing.T) { }) require.Error(t, err) - compressionType := compression.FromMediaType(desc.MediaType) - if compressionType == compression.UnknownCompression { - t.Errorf("unhandled layer media type: %q", desc.MediaType) - } - err = snap.(*immutableRef).setBlob(ctx, compressionType, desc) + err = snap.(*immutableRef).setBlob(ctx, desc) require.NoError(t, err) err = snap.(*immutableRef).computeChainMetadata(ctx, map[string]struct{}{snap.ID(): {}}) require.NoError(t, err) @@ -751,11 +744,7 @@ func TestSetBlob(t *testing.T) { err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), desc2) require.NoError(t, err) - compressionType2 := compression.FromMediaType(desc2.MediaType) - if compressionType2 == compression.UnknownCompression { - t.Errorf("unhandled layer media type: %q", desc2.MediaType) - } - err = snap2.(*immutableRef).setBlob(ctx, compressionType2, desc2) + err = snap2.(*immutableRef).setBlob(ctx, desc2) require.NoError(t, err) err = snap2.(*immutableRef).computeChainMetadata(ctx, map[string]struct{}{snap.ID(): {}, snap2.ID(): {}}) require.NoError(t, err) @@ -1110,6 +1099,396 @@ func TestLazyCommit(t *testing.T) { require.Equal(t, true, errors.Is(err, errNotFound)) } +func TestLoopLeaseContent(t *testing.T) { + t.Parallel() + // windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'" + if runtime.GOOS != "linux" { + t.Skipf("unsupported GOOS: %s", runtime.GOOS) + } + + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") + + tmpdir, err := ioutil.TempDir("", "cachemanager") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) + require.NoError(t, err) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + defer cleanup() + cm := co.manager + + ctx, done, err := leaseutil.WithLease(ctx, co.lm, leaseutil.MakeTemporary) + require.NoError(t, err) + defer done(ctx) + + // store an uncompressed blob to the content store + compressionLoop := []compression.Type{compression.Uncompressed, compression.Gzip, compression.Zstd, compression.EStargz} + blobBytes, orgDesc, err := mapToBlob(map[string]string{"foo": "1"}, false) + require.NoError(t, err) + contentBuffer := contentutil.NewBuffer() + descHandlers := DescHandlers(map[digest.Digest]*DescHandler{}) + cw, err := contentBuffer.Writer(ctx, content.WithRef(fmt.Sprintf("write-test-blob-%s", orgDesc.Digest))) + require.NoError(t, err) + _, err = cw.Write(blobBytes) + require.NoError(t, err) + require.NoError(t, cw.Commit(ctx, 0, cw.Digest())) + descHandlers[orgDesc.Digest] = &DescHandler{ + Provider: func(_ session.Group) content.Provider { return contentBuffer }, + } + + // Create a compression loop + ref, err := cm.GetByBlob(ctx, orgDesc, nil, descHandlers) + require.NoError(t, err) + allRefs := []ImmutableRef{ref} + defer func() { + for _, ref := range allRefs { + ref.Release(ctx) + } + }() + var chain []ocispecs.Descriptor + for _, compressionType := range compressionLoop { + remotes, err := ref.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil) + require.NoError(t, err) + require.Equal(t, 1, len(remotes)) + require.Equal(t, 1, len(remotes[0].Descriptors)) + + desc := remotes[0].Descriptors[0] + chain = append(chain, desc) + ref, err = cm.GetByBlob(ctx, desc, nil, descHandlers) + require.NoError(t, err) + allRefs = append(allRefs, ref) + } + require.Equal(t, len(compressionLoop), len(chain)) + require.NoError(t, ref.(*immutableRef).linkBlob(ctx, chain[0])) // This creates a loop + + // Make sure a loop is created + visited := make(map[digest.Digest]struct{}) + gotChain := []digest.Digest{orgDesc.Digest} + cur := orgDesc + previous := chain[len(chain)-1].Digest + for i := 0; i < 1000; i++ { + dgst := cur.Digest + visited[dgst] = struct{}{} + info, err := co.cs.Info(ctx, dgst) + if err != nil && !errors.Is(err, errdefs.ErrNotFound) { + require.NoError(t, err) + } + var children []ocispecs.Descriptor + for k, dgstS := range info.Labels { + if !strings.HasPrefix(k, blobVariantGCLabel) { + continue + } + cDgst, err := digest.Parse(dgstS) + if err != nil || cDgst == dgst || previous == cDgst { + continue + } + cDesc, err := getBlobDesc(ctx, co.cs, cDgst) + require.NoError(t, err) + children = append(children, cDesc) + } + require.Equal(t, 1, len(children), "previous=%v, cur=%v, labels: %+v", previous, cur, info.Labels) + previous = cur.Digest + cur = children[0] + if _, ok := visited[cur.Digest]; ok { + break + } + gotChain = append(gotChain, cur.Digest) + } + require.Equal(t, len(chain), len(gotChain)) + + // Prune all refs + require.NoError(t, done(ctx)) + for _, ref := range allRefs { + ref.Release(ctx) + } + ensurePrune(ctx, t, cm, len(gotChain)-1, 10) + + // Check if contents are cleaned up + for _, d := range gotChain { + _, err := co.cs.Info(ctx, d) + require.ErrorIs(t, err, errdefs.ErrNotFound) + } +} + +func TestSharingCompressionVariant(t *testing.T) { + t.Parallel() + // windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'" + if runtime.GOOS != "linux" { + t.Skipf("unsupported GOOS: %s", runtime.GOOS) + } + + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") + + tmpdir, err := ioutil.TempDir("", "cachemanager") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) + require.NoError(t, err) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + defer cleanup() + + ctx, done, err := leaseutil.WithLease(ctx, co.lm, leaseutil.MakeTemporary) + require.NoError(t, err) + defer done(context.TODO()) + + allCompressions := []compression.Type{compression.Uncompressed, compression.Gzip, compression.Zstd, compression.EStargz} + + do := func(test func(testCaseSharingCompressionVariant)) { + for _, a := range exclude(allCompressions, compression.Uncompressed) { + for _, aV1 := range exclude(allCompressions, a) { + for _, aV2 := range exclude(allCompressions, a, aV1) { + for _, b := range []compression.Type{aV1, aV2} { + for _, bV1 := range exclude(allCompressions, a, aV1, aV2) { + test(testCaseSharingCompressionVariant{ + a: a, + aVariants: []compression.Type{aV1, aV2}, + b: b, + bVariants: []compression.Type{bV1, a}, + }) + } + } + } + } + } + } + + t.Logf("Test cases with possible compression types") + do(func(testCase testCaseSharingCompressionVariant) { + testCase.checkPrune = true + testSharingCompressionVariant(ctx, t, co, testCase) + require.NoError(t, co.manager.Prune(ctx, nil, client.PruneInfo{All: true})) + checkDiskUsage(ctx, t, co.manager, 0, 0) + }) + + t.Logf("Test case with many parallel operation") + eg, egctx := errgroup.WithContext(ctx) + do(func(testCase testCaseSharingCompressionVariant) { + eg.Go(func() error { + testCase.checkPrune = false + testSharingCompressionVariant(egctx, t, co, testCase) + return nil + }) + }) + require.NoError(t, eg.Wait()) +} + +func exclude(s []compression.Type, ts ...compression.Type) (res []compression.Type) { +EachElem: + for _, v := range s { + for _, t := range ts { + if v == t { + continue EachElem + } + } + res = append(res, v) + } + return +} + +// testCaseSharingCompressionVariant is one test case configuration for testSharingCompressionVariant. +// This configures two refs A and B. +// A creates compression variants configured by aVariants and +// B creates compression variants configured by bVariants. +// This test checks if aVariants are visible from B and bVariants are visible from A. +type testCaseSharingCompressionVariant struct { + // a is the compression of the initial immutableRef's (called A) blob + a compression.Type + + // aVariants are the compression variants created from A + aVariants []compression.Type + + // b is another immutableRef (called B) which has one of the compression variants of A + b compression.Type + + // bVariants are compression variants created from B + bVariants []compression.Type + + // checkPrune is whether checking prune API. must be false if run tests in parallel. + checkPrune bool +} + +func testSharingCompressionVariant(ctx context.Context, t *testing.T, co *cmOut, testCase testCaseSharingCompressionVariant) { + var ( + cm = co.manager + allCompressions = append(append([]compression.Type{testCase.a, testCase.b}, testCase.aVariants...), testCase.bVariants...) + orgContent = map[string]string{"foo": "1"} + ) + test := func(customized bool) { + defer cm.Prune(ctx, nil, client.PruneInfo{}) + + // Prepare the original content + _, orgContentDesc, err := mapToBlob(orgContent, false) + require.NoError(t, err) + blobBytes, aDesc, err := mapToBlobWithCompression(orgContent, func(w io.Writer) (io.WriteCloser, string, error) { + cw, err := getCompressor(w, testCase.a, customized) + if err != nil { + return nil, "", err + } + return cw, testCase.a.DefaultMediaType(), nil + }) + require.NoError(t, err) + contentBuffer := contentutil.NewBuffer() + descHandlers := DescHandlers(map[digest.Digest]*DescHandler{}) + cw, err := contentBuffer.Writer(ctx, content.WithRef(fmt.Sprintf("write-test-blob-%s", aDesc.Digest))) + require.NoError(t, err) + _, err = cw.Write(blobBytes) + require.NoError(t, err) + require.NoError(t, cw.Commit(ctx, 0, cw.Digest())) + descHandlers[aDesc.Digest] = &DescHandler{ + Provider: func(_ session.Group) content.Provider { return contentBuffer }, + } + + // Create compression variants + aRef, err := cm.GetByBlob(ctx, aDesc, nil, descHandlers) + require.NoError(t, err) + defer aRef.Release(ctx) + var bDesc ocispecs.Descriptor + for _, compressionType := range testCase.aVariants { + remotes, err := aRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil) + require.NoError(t, err) + require.Equal(t, 1, len(remotes)) + require.Equal(t, 1, len(remotes[0].Descriptors)) + if compressionType == testCase.b { + bDesc = remotes[0].Descriptors[0] + } + } + require.NotEqual(t, "", bDesc.Digest, "compression B must be chosen from the variants of A") + bRef, err := cm.GetByBlob(ctx, bDesc, nil, descHandlers) + require.NoError(t, err) + defer bRef.Release(ctx) + for _, compressionType := range testCase.bVariants { + remotes, err := bRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil) + require.NoError(t, err) + require.Equal(t, 1, len(remotes)) + require.Equal(t, 1, len(remotes[0].Descriptors)) + } + + // check if all compression variables are available on the both refs + checkCompression := func(desc ocispecs.Descriptor, compressionType compression.Type) { + require.Equal(t, compressionType.DefaultMediaType(), desc.MediaType, "compression: %v", compressionType) + if compressionType == compression.EStargz { + ok, err := isEStargz(ctx, co.cs, desc.Digest) + require.NoError(t, err, "compression: %v", compressionType) + require.True(t, ok, "compression: %v", compressionType) + } + } + for _, c := range allCompressions { + aDesc, err := aRef.(*immutableRef).getBlobWithCompression(ctx, c) + require.NoError(t, err, "compression: %v", c) + bDesc, err := bRef.(*immutableRef).getBlobWithCompression(ctx, c) + require.NoError(t, err, "compression: %v", c) + checkCompression(aDesc, c) + checkCompression(bDesc, c) + } + + // check if compression variables are availalbe on B still after A is released + if testCase.checkPrune && aRef.ID() != bRef.ID() { + require.NoError(t, aRef.Release(ctx)) + ensurePrune(ctx, t, cm, 1, 10) + checkDiskUsage(ctx, t, co.manager, 1, 0) + for _, c := range allCompressions { + _, err = bRef.(*immutableRef).getBlobWithCompression(ctx, c) + require.NoError(t, err) + } + } + + // check if contents are valid + for _, c := range allCompressions { + bDesc, err := bRef.(*immutableRef).getBlobWithCompression(ctx, c) + require.NoError(t, err, "compression: %v", c) + uDgst := bDesc.Digest + if c != compression.Uncompressed { + convertFunc, err := getConverter(ctx, co.cs, bDesc, compression.New(compression.Uncompressed)) + require.NoError(t, err, "compression: %v", c) + uDesc, err := convertFunc(ctx, co.cs, bDesc) + require.NoError(t, err, "compression: %v", c) + uDgst = uDesc.Digest + } + require.Equal(t, uDgst, orgContentDesc.Digest, "compression: %v", c) + } + } + for _, customized := range []bool{true, false} { + // tests in two patterns: whether making the initial blob customized + test(customized) + } +} + +func ensurePrune(ctx context.Context, t *testing.T, cm Manager, pruneNum, maxRetry int) { + sum := 0 + for i := 0; i <= maxRetry; i++ { + buf := pruneResultBuffer() + require.NoError(t, cm.Prune(ctx, buf.C, client.PruneInfo{All: true})) + buf.close() + sum += len(buf.all) + if sum >= pruneNum { + return + } + time.Sleep(100 * time.Millisecond) + t.Logf("Retrying to prune (%v)", i) + } + require.Equal(t, true, sum >= pruneNum, "actual=%v, expected=%v", sum, pruneNum) +} + +func getCompressor(w io.Writer, compressionType compression.Type, customized bool) (io.WriteCloser, error) { + switch compressionType { + case compression.Uncompressed: + return nil, fmt.Errorf("compression is not requested: %v", compressionType) + case compression.Gzip: + if customized { + gz, _ := gzip.NewWriterLevel(w, gzip.NoCompression) + gz.Header.Comment = "hello" + gz.Close() + } + return gzip.NewWriter(w), nil + case compression.EStargz: + done := make(chan struct{}) + pr, pw := io.Pipe() + level := gzip.BestCompression + if customized { + level = gzip.BestSpeed + } + go func() { + defer close(done) + gw := estargz.NewWriterLevel(w, level) + if err := gw.AppendTarLossLess(pr); err != nil { + pr.CloseWithError(err) + return + } + if _, err := gw.Close(); err != nil { + pr.CloseWithError(err) + return + } + pr.Close() + }() + return &writeCloser{pw, func() error { <-done; return nil }}, nil + case compression.Zstd: + if customized { + skippableFrameMagic := []byte{0x50, 0x2a, 0x4d, 0x18} + s := []byte("hello") + size := make([]byte, 4) + binary.LittleEndian.PutUint32(size, uint32(len(s))) + if _, err := w.Write(append(append(skippableFrameMagic, size...), s...)); err != nil { + return nil, err + } + } + return zstd.NewWriter(w) + default: + return nil, fmt.Errorf("unknown compression type: %q", compressionType) + } +} + func TestConversion(t *testing.T) { t.Parallel() if runtime.GOOS != "linux" { @@ -1388,7 +1767,7 @@ func TestGetRemotes(t *testing.T) { if needs { require.False(t, isLazy, "layer %q requires conversion so it must be unlazied", desc.Digest) } - bDesc, err := r.getCompressionBlob(egctx, compressionType) + bDesc, err := r.getBlobWithCompression(egctx, compressionType) if isLazy { require.Error(t, err) } else { @@ -2076,12 +2455,26 @@ func (b bufferCloser) Close() error { } func mapToBlob(m map[string]string, compress bool) ([]byte, ocispecs.Descriptor, error) { + if !compress { + return mapToBlobWithCompression(m, nil) + } + return mapToBlobWithCompression(m, func(w io.Writer) (io.WriteCloser, string, error) { + return gzip.NewWriter(w), ocispecs.MediaTypeImageLayerGzip, nil + }) +} + +func mapToBlobWithCompression(m map[string]string, compress func(io.Writer) (io.WriteCloser, string, error)) ([]byte, ocispecs.Descriptor, error) { buf := bytes.NewBuffer(nil) sha := digest.SHA256.Digester() var dest io.WriteCloser = bufferCloser{buf} - if compress { - dest = gzip.NewWriter(buf) + mediaType := ocispecs.MediaTypeImageLayer + if compress != nil { + var err error + dest, mediaType, err = compress(buf) + if err != nil { + return nil, ocispecs.Descriptor{}, err + } } tw := tar.NewWriter(io.MultiWriter(sha.Hash(), dest)) @@ -2103,10 +2496,6 @@ func mapToBlob(m map[string]string, compress bool) ([]byte, ocispecs.Descriptor, return nil, ocispecs.Descriptor{}, err } - mediaType := ocispecs.MediaTypeImageLayer - if compress { - mediaType = ocispecs.MediaTypeImageLayerGzip - } return buf.Bytes(), ocispecs.Descriptor{ Digest: digest.FromBytes(buf.Bytes()), MediaType: mediaType, diff --git a/cache/refs.go b/cache/refs.go index 4cf5f2404b18..32f4ccff022b 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -338,24 +338,21 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) { } } if dgst := cr.getBlob(); dgst != "" { + added := make(map[digest.Digest]struct{}) info, err := cr.cm.ContentStore.Info(ctx, digest.Digest(dgst)) if err == nil { usage.Size += info.Size + added[digest.Digest(dgst)] = struct{}{} } - for k, v := range info.Labels { - // accumulate size of compression variant blobs - if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) { - if cdgst, err := digest.Parse(v); err == nil { - if digest.Digest(dgst) == cdgst { - // do not double count if the label points to this content itself. - continue - } - if info, err := cr.cm.ContentStore.Info(ctx, cdgst); err == nil { - usage.Size += info.Size - } + walkBlobVariantsOnly(ctx, cr.cm.ContentStore, digest.Digest(dgst), func(desc ocispecs.Descriptor) bool { + if _, ok := added[desc.Digest]; !ok { + if info, err := cr.cm.ContentStore.Info(ctx, desc.Digest); err == nil { + usage.Size += info.Size + added[desc.Digest] = struct{}{} } } - } + return true + }, nil) } cr.mu.Lock() cr.queueSize(usage.Size) @@ -687,109 +684,125 @@ func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers, preferNon } const ( - compressionVariantDigestLabelPrefix = "buildkit.io/compression/digest." - compressionVariantAnnotationsLabelPrefix = "buildkit.io/compression/annotation." - compressionVariantMediaTypeLabel = "buildkit.io/compression/mediatype" + blobVariantGCLabel = "containerd.io/gc.ref.content.blob-" + blobAnnotationsLabelPrefix = "buildkit.io/blob/annotation." + blobMediaTypeLabel = "buildkit.io/blob/mediatype" ) -func compressionVariantDigestLabel(compressionType compression.Type) string { - return compressionVariantDigestLabelPrefix + compressionType.String() -} - -func getCompressionVariants(ctx context.Context, cs content.Store, dgst digest.Digest) (res []compression.Type, _ error) { - info, err := cs.Info(ctx, dgst) - if errors.Is(err, errdefs.ErrNotFound) { - return nil, nil - } else if err != nil { - return nil, err - } - for k := range info.Labels { - if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) { - if t := compression.Parse(strings.TrimPrefix(k, compressionVariantDigestLabelPrefix)); t != compression.UnknownCompression { - res = append(res, t) - } - } +// linkBlob makes a link between this ref and the passed blob. The linked blob can be +// acquired during walkBlob. This is useful to associate a compression variant blob to +// this ref. This doesn't record the blob to the cache record (i.e. the passed blob can't +// be acquired through getBlob). Use setBlob for that purpose. +func (sr *immutableRef) linkBlob(ctx context.Context, desc ocispecs.Descriptor) error { + cs := sr.cm.ContentStore + blobDigest := sr.getBlob() + info, err := cs.Info(ctx, blobDigest) + if err != nil { + return err } - return -} - -func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) { - return getCompressionVariantBlob(ctx, sr.cm.ContentStore, sr.getBlob(), compressionType) -} - -func getCompressionVariantBlob(ctx context.Context, cs content.Store, dgst digest.Digest, compressionType compression.Type) (ocispecs.Descriptor, error) { - info, err := cs.Info(ctx, dgst) + vInfo, err := cs.Info(ctx, desc.Digest) if err != nil { - return ocispecs.Descriptor{}, err + return err } - dgstS, ok := info.Labels[compressionVariantDigestLabel(compressionType)] - if ok { - dgst, err := digest.Parse(dgstS) - if err != nil { - return ocispecs.Descriptor{}, err - } - return getBlobDesc(ctx, cs, dgst) + vInfo.Labels = map[string]string{ + blobVariantGCLabel + blobDigest.String(): blobDigest.String(), } - return ocispecs.Descriptor{}, errdefs.ErrNotFound -} - -func (sr *immutableRef) addCompressionBlob(ctx context.Context, desc ocispecs.Descriptor, compressionType compression.Type) error { - cs := sr.cm.ContentStore - if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{ - ID: desc.Digest.String(), - Type: "content", - }); err != nil { + vInfo = addBlobDescToInfo(desc, vInfo) + if _, err := cs.Update(ctx, vInfo, fieldsFromLabels(vInfo.Labels)...); err != nil { return err } - info, err := cs.Info(ctx, sr.getBlob()) - if err != nil { + // let the future call to size() recalcultate the new size + sr.mu.Lock() + sr.queueSize(sizeUnknown) + if err := sr.commitMetadata(); err != nil { + sr.mu.Unlock() return err } - if info.Labels == nil { - info.Labels = make(map[string]string) + sr.mu.Unlock() + if desc.Digest == blobDigest { + return nil } - cachedVariantLabel := compressionVariantDigestLabel(compressionType) - info.Labels[cachedVariantLabel] = desc.Digest.String() - if _, err := cs.Update(ctx, info, "labels."+cachedVariantLabel); err != nil { - return err + info.Labels = map[string]string{ + blobVariantGCLabel + desc.Digest.String(): desc.Digest.String(), } + _, err = cs.Update(ctx, info, fieldsFromLabels(info.Labels)...) + return err +} - info, err = cs.Info(ctx, desc.Digest) +func (sr *immutableRef) getBlobWithCompression(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) { + if _, err := sr.cm.ContentStore.Info(ctx, sr.getBlob()); err != nil { + return ocispecs.Descriptor{}, err + } + desc, err := sr.ociDesc(ctx, nil, true) if err != nil { - return err + return ocispecs.Descriptor{}, err } - var fields []string - info.Labels = map[string]string{ - compressionVariantMediaTypeLabel: desc.MediaType, + return getBlobWithCompression(ctx, sr.cm.ContentStore, desc, compressionType) +} + +func getBlobWithCompression(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, compressionType compression.Type) (ocispecs.Descriptor, error) { + if compressionType == compression.UnknownCompression { + return ocispecs.Descriptor{}, fmt.Errorf("cannot get unknown compression type") } - fields = append(fields, "labels."+compressionVariantMediaTypeLabel) - for k, v := range filterAnnotationsForSave(desc.Annotations) { - k2 := compressionVariantAnnotationsLabelPrefix + k - info.Labels[k2] = v - fields = append(fields, "labels."+k2) + var target *ocispecs.Descriptor + if err := walkBlob(ctx, cs, desc, func(desc ocispecs.Descriptor) bool { + if needs, err := needsConversion(ctx, cs, desc, compressionType); err == nil && !needs { + target = &desc + return false + } + return true + }); err != nil || target == nil { + return ocispecs.Descriptor{}, errdefs.ErrNotFound } - if _, err := cs.Update(ctx, info, fields...); err != nil { + return *target, nil +} + +func walkBlob(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, f func(ocispecs.Descriptor) bool) error { + if !f(desc) { + return nil + } + if _, err := walkBlobVariantsOnly(ctx, cs, desc.Digest, func(desc ocispecs.Descriptor) bool { return f(desc) }, nil); err != nil { return err } - return nil } -func filterAnnotationsForSave(a map[string]string) (b map[string]string) { - if a == nil { - return nil +func walkBlobVariantsOnly(ctx context.Context, cs content.Store, dgst digest.Digest, f func(ocispecs.Descriptor) bool, visited map[digest.Digest]struct{}) (bool, error) { + if visited == nil { + visited = make(map[digest.Digest]struct{}) } - for _, k := range append(eStargzAnnotations, containerdUncompressed) { - v, ok := a[k] - if !ok { + visited[dgst] = struct{}{} + info, err := cs.Info(ctx, dgst) + if errors.Is(err, errdefs.ErrNotFound) { + return true, nil + } else if err != nil { + return false, err + } + var children []digest.Digest + for k, dgstS := range info.Labels { + if !strings.HasPrefix(k, blobVariantGCLabel) { continue } - if b == nil { - b = make(map[string]string) + cDgst, err := digest.Parse(dgstS) + if err != nil || cDgst == dgst { + continue } - b[k] = v + if cDesc, err := getBlobDesc(ctx, cs, cDgst); err == nil { + if !f(cDesc) { + return false, nil + } + } + children = append(children, cDgst) } - return + for _, c := range children { + if _, isVisited := visited[c]; isVisited { + continue + } + if isContinue, err := walkBlobVariantsOnly(ctx, cs, c, f, visited); !isContinue || err != nil { + return isContinue, err + } + } + return true, nil } func getBlobDesc(ctx context.Context, cs content.Store, dgst digest.Digest) (ocispecs.Descriptor, error) { @@ -800,27 +813,68 @@ func getBlobDesc(ctx context.Context, cs content.Store, dgst digest.Digest) (oci if info.Labels == nil { return ocispecs.Descriptor{}, fmt.Errorf("no blob metadata is stored for %q", info.Digest) } - mt, ok := info.Labels[compressionVariantMediaTypeLabel] + mt, ok := info.Labels[blobMediaTypeLabel] if !ok { return ocispecs.Descriptor{}, fmt.Errorf("no media type is stored for %q", info.Digest) } - desc := ocispecs.Descriptor{ Digest: info.Digest, Size: info.Size, MediaType: mt, } for k, v := range info.Labels { - if strings.HasPrefix(k, compressionVariantAnnotationsLabelPrefix) { + if strings.HasPrefix(k, blobAnnotationsLabelPrefix) { if desc.Annotations == nil { desc.Annotations = make(map[string]string) } - desc.Annotations[strings.TrimPrefix(k, compressionVariantAnnotationsLabelPrefix)] = v + desc.Annotations[strings.TrimPrefix(k, blobAnnotationsLabelPrefix)] = v } } + if len(desc.URLs) == 0 { + // If there are no URL's, there is no reason to have this be non-dsitributable + desc.MediaType = layerToDistributable(desc.MediaType) + } return desc, nil } +func addBlobDescToInfo(desc ocispecs.Descriptor, info content.Info) content.Info { + if _, ok := info.Labels[blobMediaTypeLabel]; ok { + return info // descriptor information already stored + } + if info.Labels == nil { + info.Labels = make(map[string]string) + } + info.Labels[blobMediaTypeLabel] = desc.MediaType + for k, v := range filterAnnotationsForSave(desc.Annotations) { + info.Labels[blobAnnotationsLabelPrefix+k] = v + } + return info +} + +func filterAnnotationsForSave(a map[string]string) (b map[string]string) { + if a == nil { + return nil + } + for _, k := range append(eStargzAnnotations, containerdUncompressed) { + v, ok := a[k] + if !ok { + continue + } + if b == nil { + b = make(map[string]string) + } + b[k] = v + } + return +} + +func fieldsFromLabels(labels map[string]string) (fields []string) { + for k := range labels { + fields = append(fields, "labels."+k) + } + return +} + func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (_ snapshot.Mountable, rerr error) { if sr.equalMutable != nil && !readonly { if err := sr.Finalize(ctx); err != nil { diff --git a/cache/remote.go b/cache/remote.go index 6cc64c57be57..d0ac594b6ac8 100644 --- a/cache/remote.go +++ b/cache/remote.go @@ -12,6 +12,7 @@ import ( "github.com/moby/buildkit/cache/config" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/leaseutil" @@ -53,7 +54,7 @@ func (sr *immutableRef) GetRemotes(ctx context.Context, createIfNeeded bool, ref // compression with all combination of copmressions res := []*solver.Remote{remote} topmost, parentChain := remote.Descriptors[len(remote.Descriptors)-1], remote.Descriptors[:len(remote.Descriptors)-1] - vDesc, err := getCompressionVariantBlob(ctx, sr.cm.ContentStore, topmost.Digest, refCfg.Compression.Type) + vDesc, err := getBlobWithCompression(ctx, sr.cm.ContentStore, topmost, refCfg.Compression.Type) if err != nil { return res, nil // compression variant doesn't exist. return the main blob only. } @@ -108,16 +109,16 @@ func getAvailableBlobs(ctx context.Context, cs content.Store, chain *solver.Remo if err != nil { return nil, err } - compressions, err := getCompressionVariants(ctx, cs, target.Digest) - if err != nil { - return nil, err + var descs []ocispecs.Descriptor + if err := walkBlob(ctx, cs, target, func(desc ocispecs.Descriptor) bool { + descs = append(descs, desc) + return true + }); err != nil { + bklog.G(ctx).WithError(err).Warn("failed to walk variant blob") // is not a critical error at this moment. } var res []*solver.Remote - for _, c := range compressions { - desc, err := getCompressionVariantBlob(ctx, cs, target.Digest, c) - if err != nil { - return nil, err - } + for _, desc := range descs { + desc := desc if len(parents) == 0 { // bottommost ref res = append(res, &solver.Remote{ Descriptors: []ocispecs.Descriptor{desc}, @@ -217,9 +218,9 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC } else if needs { // ensure the compression type. // compressed blob must be created and stored in the content store. - blobDesc, err := ref.getCompressionBlob(ctx, refCfg.Compression.Type) + blobDesc, err := getBlobWithCompressionWithRetry(ctx, ref, refCfg.Compression, s) if err != nil { - return nil, errors.Wrapf(err, "compression blob for %q not found", refCfg.Compression.Type) + return nil, errors.Wrapf(err, "failed to get compression blob %q", refCfg.Compression.Type) } newDesc := desc newDesc.MediaType = blobDesc.MediaType @@ -251,6 +252,16 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC return remote, nil } +func getBlobWithCompressionWithRetry(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) (ocispecs.Descriptor, error) { + if blobDesc, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil { + return blobDesc, nil + } + if err := ensureCompression(ctx, ref, comp, s); err != nil { + return ocispecs.Descriptor{}, errors.Wrapf(err, "failed to get and ensure compression type of %q", comp.Type) + } + return ref.getBlobWithCompression(ctx, comp.Type) +} + type lazyMultiProvider struct { mprovider *contentutil.MultiProvider plist []lazyRefProvider @@ -300,6 +311,11 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { } else if !isLazy { return nil, nil } + defer func() { + if rerr == nil { + rerr = p.ref.linkBlob(ctx, p.desc) + } + }() if p.dh == nil { // shouldn't happen, if you have a lazy immutable ref it already should be validated @@ -334,14 +350,6 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { } } - compressionType := compression.FromMediaType(p.desc.MediaType) - if compressionType == compression.UnknownCompression { - return nil, errors.Errorf("unhandled layer media type: %q", p.desc.MediaType) - } - - if err := p.ref.addCompressionBlob(ctx, p.desc, compressionType); err != nil { - return nil, err - } return nil, nil }) return err