Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 124 additions & 116 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"fmt"

"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/leases"
Expand Down Expand Up @@ -44,29 +45,19 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo
}

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) error {
baseCtx := ctx
eg, ctx := errgroup.WithContext(ctx)
var currentDescr ocispecs.Descriptor
if sr.parent != nil {
eg.Go(func() error {
return computeBlobChain(ctx, sr.parent, createIfNeeded, compressionType, forceCompression, s)
})
}

eg.Go(func() error {
dp, err := g.Do(ctx, sr.ID(), func(ctx context.Context) (interface{}, error) {
refInfo := sr.Info()
if refInfo.Blob != "" {
if forceCompression {
desc, err := sr.ociDesc()
if err != nil {
return nil, err
}
if err := ensureCompression(ctx, sr, desc, compressionType, s); err != nil {
return nil, err
}
}
return nil, nil
} else if !createIfNeeded {
v, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
Comment thread
tonistiigi marked this conversation as resolved.
if getBlob(sr.md) != "" {
return sr.ociDesc()
}
if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
}

Expand All @@ -80,91 +71,83 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}

var descr ocispecs.Descriptor
var err error

if descr.Digest == "" {
// reference needs to be committed
var lower []mount.Mount
if sr.parent != nil {
m, err := sr.parent.Mount(ctx, true, s)
if err != nil {
return nil, err
}
var release func() error
lower, release, err = m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
}
m, err := sr.Mount(ctx, true, s)
var lower []mount.Mount
if sr.parent != nil {
m, err := sr.parent.Mount(ctx, true, s)
if err != nil {
return nil, err
}
upper, release, err := m.Mount()
var release func() error
lower, release, err = m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
descr, err = sr.cm.Differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
)
if err != nil {
return nil, err
}
}
m, err := sr.Mount(ctx, true, s)
if err != nil {
return nil, err
}
upper, release, err := m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
desc, err := sr.cm.Differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
)
if err != nil {
return nil, err
}

if descr.Annotations == nil {
descr.Annotations = map[string]string{}
if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}

info, err := sr.cm.ContentStore.Info(ctx, descr.Digest)
info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}

if diffID, ok := info.Labels[containerdUncompressed]; ok {
descr.Annotations[containerdUncompressed] = diffID
} else if compressionType == compression.Uncompressed {
descr.Annotations[containerdUncompressed] = descr.Digest.String()
desc.Annotations[containerdUncompressed] = diffID
} else if mediaType == ocispecs.MediaTypeImageLayer {
desc.Annotations[containerdUncompressed] = desc.Digest.String()
} else {
return nil, errors.Errorf("unknown layer compression type")
}

if forceCompression {
if err := ensureCompression(ctx, sr, descr, compressionType, s); err != nil {
return nil, err
}
if err := sr.setBlob(ctx, desc); err != nil {
return nil, err
}

return descr, nil

return desc, nil
})
if err != nil {
return err
}
descr, ok := v.(ocispecs.Descriptor)
if !ok {
return fmt.Errorf("invalid descriptor returned by differ while computing blob for %s", sr.ID())
}

if dp != nil {
currentDescr = dp.(ocispecs.Descriptor)
if forceCompression {
if err := ensureCompression(ctx, sr, descr, compressionType, s); err != nil {
return err
}
}
return nil
})
err := eg.Wait()
if err != nil {

if err := eg.Wait(); err != nil {
return err
}
if currentDescr.Digest != "" {
if err := sr.setBlob(baseCtx, currentDescr); err != nil {
return err
}
}
return nil
return sr.setChains(ctx)
}

// setBlob associates a blob with the cache record.
Expand All @@ -183,29 +166,22 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e
return err
}

compressionType := compression.FromMediaType(desc.MediaType)
if compressionType == compression.UnknownCompression {
return errors.Errorf("unhandled layer media type: %q", desc.MediaType)
}

sr.mu.Lock()
defer sr.mu.Unlock()

if getChainID(sr.md) != "" {
if getBlob(sr.md) != "" {
return nil
}

if err := sr.finalize(ctx); err != nil {
return err
}

p := sr.parent
var parentChainID digest.Digest
var parentBlobChainID digest.Digest
if p != nil {
pInfo := p.Info()
if pInfo.ChainID == "" || pInfo.BlobChainID == "" {
return errors.Errorf("failed to set blob for reference with non-addressable parent")
}
parentChainID = pInfo.ChainID
parentBlobChainID = pInfo.BlobChainID
}

if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{
ID: desc.Digest.String(),
Type: "content",
Expand All @@ -215,16 +191,45 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e

queueDiffID(sr.md, diffID.String())
queueBlob(sr.md, desc.Digest.String())
chainID := diffID
blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID})
if parentChainID != "" {
chainID = imagespecidentity.ChainID([]digest.Digest{parentChainID, chainID})
blobChainID = imagespecidentity.ChainID([]digest.Digest{parentBlobChainID, blobChainID})
queueMediaType(sr.md, desc.MediaType)
queueBlobSize(sr.md, desc.Size)
if err := sr.md.Commit(); err != nil {
return err
}

if err := sr.addCompressionBlob(ctx, desc.Digest, compressionType); err != nil {
return err
}
return nil
}

func (sr *immutableRef) setChains(ctx context.Context) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for setChains")
}

sr.mu.Lock()
defer sr.mu.Unlock()

if getChainID(sr.md) != "" {
return nil
}

var chainIDs []digest.Digest
var blobChainIDs []digest.Digest
if sr.parent != nil {
chainIDs = append(chainIDs, digest.Digest(getChainID(sr.parent.md)))
blobChainIDs = append(blobChainIDs, digest.Digest(getBlobChainID(sr.parent.md)))
}
diffID := digest.Digest(getDiffID(sr.md))
chainIDs = append(chainIDs, diffID)
blobChainIDs = append(blobChainIDs, imagespecidentity.ChainID([]digest.Digest{digest.Digest(getBlob(sr.md)), diffID}))

chainID := imagespecidentity.ChainID(chainIDs)
blobChainID := imagespecidentity.ChainID(blobChainIDs)

queueChainID(sr.md, chainID.String())
queueBlobChainID(sr.md, blobChainID.String())
queueMediaType(sr.md, desc.MediaType)
queueBlobSize(sr.md, desc.Size)
if err := sr.md.Commit(); err != nil {
return err
}
Expand All @@ -243,36 +248,39 @@ func isTypeWindows(sr *immutableRef) bool {

// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, desc ocispecs.Descriptor, compressionType compression.Type, s session.Group) error {
// Resolve converters
layerConvertFunc, _, err := getConverters(desc, compressionType)
if err != nil {
return err
} else if layerConvertFunc == nil {
return nil // no need to convert
}
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", desc.Digest, compressionType), func(ctx context.Context) (interface{}, error) {
// Resolve converters
layerConvertFunc, _, err := getConverters(desc, compressionType)
if err != nil {
return nil, err
} else if layerConvertFunc == nil {
return nil, nil // no need to convert
}

// First, lookup local content store
if _, err := ref.getCompressionBlob(ctx, compressionType); err == nil {
return nil // found the compression variant. no need to convert.
}
// First, lookup local content store
if _, err := ref.getCompressionBlob(ctx, compressionType); err == nil {
return nil, nil // found the compression variant. no need to convert.
}

// Convert layer compression type
if err := (lazyRefProvider{
ref: ref,
desc: desc,
dh: ref.descHandlers[desc.Digest],
session: s,
}).Unlazy(ctx); err != nil {
return err
}
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
if err != nil {
return err
}
// Convert layer compression type
if err := (lazyRefProvider{
ref: ref,
desc: desc,
dh: ref.descHandlers[desc.Digest],
session: s,
}).Unlazy(ctx); err != nil {
return nil, err
}
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
if err != nil {
return nil, err
}

// Start to track converted layer
if err := ref.addCompressionBlob(ctx, newDesc.Digest, compressionType); err != nil {
return err
}
return nil
// Start to track converted layer
if err := ref.addCompressionBlob(ctx, newDesc.Digest, compressionType); err != nil {
return nil, err
}
return nil, nil
})
return err
}
Loading