Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 238 additions & 15 deletions daemon/containerd/image_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@ package containerd

import (
"context"
"encoding/json"
"io"
"strings"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
containerdimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/errdefs"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

// PushImage initiates a push operation on the repository named localName.
func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
// TODO: Pass this from user?
platformMatcher := platforms.DefaultStrict()
platformMatcher := platforms.All

ref, err := reference.ParseNormalizedNamed(image)
if err != nil {
Expand Down Expand Up @@ -59,18 +66,21 @@ func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHea
jobs := newJobs()

imageHandler := containerdimages.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
logrus.WithField("desc", desc).Debug("Pushing")
logrus.WithField("digest", desc.Digest.String()).
WithField("mediaType", desc.MediaType).
Debug("Pushing")
if desc.MediaType != containerdimages.MediaTypeDockerSchema1Manifest {
children, err := containerdimages.Children(ctx, store, desc)

if err == nil {
for _, c := range children {
jobs.Add(c)
}
if err != nil {
return nil, err
}
for _, c := range children {
jobs.Add(c)
}

jobs.Add(desc)
}

return nil, nil
})
imageHandler = remotes.SkipNonDistributableBlobs(imageHandler)
Expand All @@ -80,12 +90,225 @@ func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHea
finishProgress := showProgress(ctx, jobs, outStream, pushProgress(tracker))
defer finishProgress()

logrus.WithField("desc", target).WithField("ref", ref.String()).Info("Pushing desc to remote ref")
err = i.client.Push(ctx, ref.String(), target,
containerd.WithResolver(resolver),
containerd.WithPlatformMatcher(platformMatcher),
containerd.WithImageHandler(imageHandler),
)
return lazyPush(ctx, store, ref.String(), target, resolver, imageHandler)
}

// lazyPush uploads the provided content to a remote resource. It also attempts to
// handle push of content, which is not present locally in the store.
func lazyPush(ctx context.Context, store content.Store, ref string, desc ocispec.Descriptor, resolver remotes.Resolver, imagesHandler containerdimages.HandlerFunc) error {
// Annotate ref with digest to push only push tag for single digest
if !strings.Contains(ref, "@") {
ref = ref + "@" + desc.Digest.String()
}

pusher, err := resolver.Pusher(ctx, ref)
if err != nil {
return err
}

wrapper := func(h images.Handler) images.Handler {
return images.Handlers(imagesHandler, h)
}

sources, err := collectSources(ctx, desc, store)
if err != nil {
return err
}

lazyStore := newLazyContentStore(store, sources)

var limiter *semaphore.Weighted
return remotes.PushContent(ctx, pusher, desc, lazyStore, limiter, platforms.All, wrapper)
}

func findLazyChildren(ctx context.Context, desc ocispec.Descriptor, store content.Store) ([]ocispec.Descriptor, error) {
// Collect to hashset to remove duplicates
set := map[string]ocispec.Descriptor{}

// Do a breadth-first search starting from this descriptor
queue := []ocispec.Descriptor{desc}
for len(queue) > 0 {
child := queue[0]
queue = queue[1:]

if containerdimages.IsNonDistributable(child.MediaType) {
continue
}

_, err := store.ReaderAt(ctx, child)
if err != nil {
if cerrdefs.IsNotFound(err) {
set[child.Digest.String()] = child
continue
}
return nil, err
}

newChildren, err := containerdimages.Children(ctx, store, child)
if err != nil {
return nil, err
}

if len(newChildren) > 0 {
queue = append(queue, newChildren...)
}
}

result := make([]ocispec.Descriptor, 0, len(set))
for _, desc := range set {
result = append(result, desc)
logrus.WithField("digest", desc.Digest.String()).
WithField("mediaType", desc.MediaType).
Debug("lazy children found")
}

return result, nil
}

// peekNotJson does a small peek of the content to check if content is definitely not JSON.
// It returns true if content is definitely not JSON, or false if it was unable to detect if it's
// JSON or not.
func peekNotJson(ctx context.Context, store content.Store, desc ocispec.Descriptor) (bool, error) {
readerAt, err := store.ReaderAt(ctx, desc)
if err != nil {
logrus.WithError(err).WithField("digest", desc.Digest).Debug("failed to create reader to peek for json")
return false, err
}

buffer := []byte{0}
n, err := readerAt.ReadAt(buffer, 0)
if n != 1 || err != nil {
logrus.WithError(err).WithField("digest", desc.Digest).Debug("failed to peek json")
return false, err
}

// It doesn't start with {, then it's not a json.
return rune(buffer[0]) != '{', nil
}

func collectSources(ctx context.Context, desc ocispec.Descriptor, store content.Store) (map[digest.Digest]distributionSource, error) {
lazyChildren, err := findLazyChildren(ctx, desc, store)
if err != nil {
logrus.WithField("digest", desc.Digest.String()).
WithField("mediaType", desc.MediaType).
WithError(err).Error("failed to find lazy children referenced by descriptor")
return nil, err
}

sources := map[digest.Digest]distributionSource{}

success := errors.New("success, found the source but can't return earlier without an error")
err = store.Walk(ctx, func(i content.Info) error {
source := extractDistributionSource(i.Labels)

// Nah, we're looking for a parent of this lazy child.
// This one will not provide us with the source.
if source.value == "" {
return nil
}

desc := ocispec.Descriptor{Digest: i.Digest}

// Do a simple peek of the content to avoid big blobs which definitely aren't json.
notJson, err := peekNotJson(ctx, store, desc)
Comment thread
rumpl marked this conversation as resolved.
if err != nil {
return err
}
if notJson {
logrus.WithField("digest", i.Digest).Debug("skipping, definitely not a json")
return nil
}

// Read the manifest
blob, err := content.ReadBlob(ctx, store, desc)
if err != nil {
logrus.WithError(err).WithField("digest", i.Digest).Error("error reading blob")
return err
}

// Manifests and indexes have different children.
// Index stores other manifests and manifests store layers.
// To avoid unmarshaling the blob separately as manifest and index
// this holds fields that contains them both and the media type.
var indexOrManifest struct {
Comment thread
rumpl marked this conversation as resolved.
MediaType string `json:"mediaType,omitempty"`
Manifests []ocispec.Descriptor `json:"manifests,omitempty"`
Layers []ocispec.Descriptor `json:"layers,omitempty"`
}

err = json.Unmarshal(blob, &indexOrManifest)
if err != nil {
return nil
}

mediaType := indexOrManifest.MediaType
// Just in case, check if it really is manifest or index.
if !containerdimages.IsManifestType(mediaType) && !containerdimages.IsIndexType(mediaType) {
return nil
}
if len(indexOrManifest.Layers) == 0 && len(indexOrManifest.Manifests) == 0 {
return nil
}

// Look if this manifest/index specifies any of the lazy children
children := append(indexOrManifest.Layers, indexOrManifest.Manifests...)
for _, layer := range children {
for idx, wanted := range lazyChildren {
if layer.Digest == wanted.Digest {
// Found it!
sources[wanted.Digest] = source

// Don't look for it anymore
if len(lazyChildren) > 1 {
lastIdx := len(lazyChildren) - 1
lazyChildren[idx] = lazyChildren[lastIdx]
lazyChildren = lazyChildren[:lastIdx]
} else {
// We found all lazy children, let's end the walk.
lazyChildren = lazyChildren[:0]
return success
}
}
}
}

return nil
})

if err == success {
err = nil
}
if len(lazyChildren) > 0 {
msg := "missing blobs with no source: "
for idx, c := range lazyChildren {
if idx != 0 {
msg += ", "
}
msg += c.Digest.String()
}
err = errdefs.NotFound(errors.New(msg))
}

return sources, err
}

func extractDistributionSource(labels map[string]string) distributionSource {
var source distributionSource

// Check if this blob has a distributionSource label
// if yes, read it as source
for k, v := range labels {
if strings.HasPrefix(k, "containerd.io/distribution.source.") {
source.key = k
source.value = v
break
}
}

return source
}

return err
type distributionSource struct {
key string
value string
}
87 changes: 87 additions & 0 deletions daemon/containerd/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package containerd

import (
"context"

"github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)

type lazyContentStore struct {
s content.Store
sources map[digest.Digest]distributionSource
}

func newLazyContentStore(s content.Store, sources map[digest.Digest]distributionSource) lazyContentStore {
return lazyContentStore{
s: s,
sources: sources,
}
}

// Delete implements content.Store
func (p lazyContentStore) Delete(ctx context.Context, dgst digest.Digest) error {
return p.s.Delete(ctx, dgst)
}

// Info implements content.Store
func (p lazyContentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
info, err := p.s.Info(ctx, dgst)
if err != nil {
if !cerrdefs.IsNotFound(err) {
return info, err
}
s, ok := p.sources[dgst]
if !ok {
return info, err
}

logrus.WithField("digest", dgst).WithField("source", s).Debug("faking")
return content.Info{
Digest: dgst,
Labels: map[string]string{
s.key: s.value,
},
}, nil
}

return info, nil
}

// Update implements content.Store
func (p lazyContentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
return p.s.Update(ctx, info, fieldpaths...)
}

// Walk implements content.Store
func (p lazyContentStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
return p.s.Walk(ctx, fn, filters...)
}

// ReaderAt implements content.Store
func (p lazyContentStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
return p.s.ReaderAt(ctx, desc)
}

// Abort implements content.Store
func (p lazyContentStore) Abort(ctx context.Context, ref string) error {
return p.s.Abort(ctx, ref)
}

// ListStatuses implements content.Store
func (p lazyContentStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
return p.s.ListStatuses(ctx, filters...)
}

// Status implements content.Store
func (p lazyContentStore) Status(ctx context.Context, ref string) (content.Status, error) {
return p.s.Status(ctx, ref)
}

// Writer implements content.Store
func (p lazyContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
return p.s.Writer(ctx, opts...)
}