Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion daemon/containerd/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/streamformatter"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
)
Expand Down Expand Up @@ -58,7 +59,8 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string,
opts = append(opts, containerd.WithImageHandler(h))
opts = i.applySnapshotterOpts(opts, ref)

finishProgress := showProgress(ctx, jobs, outStream, pullProgress(i.client.ContentStore()))
out := streamformatter.NewJSONProgressOutput(outStream, false)
finishProgress := showProgress(ctx, jobs, out, pullProgress(i.client.ContentStore(), true))
defer finishProgress()

_, err = i.client.Pull(ctx, ref.String(), opts...)
Expand Down
251 changes: 21 additions & 230 deletions daemon/containerd/image_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,24 @@ package containerd

import (
"context"
"encoding/json"
"io"
"strings"

"github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
containerdimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/errdefs"
"github.com/opencontainers/go-digest"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

// PushImage initiates a push operation on the repository named localName.
func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) (outerr error) {
// TODO: Pass this from user?
platformMatcher := platforms.All

Expand All @@ -50,8 +45,8 @@ func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHea

target := img.Target

// Create a temporary image which is stripped from content that references other platforms.
// We or the remote may not have them and referencing them will end with an error.
// If user requested specific platforms to push, then create a manifest
// list with only the matching platforms.
if platformMatcher != platforms.All {
tmpRef := ref.String() + "-tmp-platformspecific"
platformImg, err := converter.Convert(ctx, i.client, tmpRef, ref.String(), converter.WithPlatform(platformMatcher))
Expand All @@ -65,10 +60,13 @@ func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHea

jobs := newJobs()

resolver, tracker := newResolverFromAuthConfig(authConfig)

imageHandler := containerdimages.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
logrus.WithField("digest", desc.Digest.String()).
WithField("mediaType", desc.MediaType).
Debug("Pushing")

if desc.MediaType != containerdimages.MediaTypeDockerSchema1Manifest {
children, err := containerdimages.Children(ctx, store, desc)
if err != nil {
Expand All @@ -85,230 +83,23 @@ func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHea
})
imageHandler = remotes.SkipNonDistributableBlobs(imageHandler)

resolver, tracker := newResolverFromAuthConfig(authConfig)

finishProgress := showProgress(ctx, jobs, outStream, pushProgress(tracker))
defer finishProgress()

return lazyPush(ctx, store, ref.String(), target, resolver, imageHandler)
}

// lazyPush uploads the provided content to a remote resource. It also attempts to
// handle push of content, which is not present locally in the store.
func lazyPush(ctx context.Context, store content.Store, ref string, desc ocispec.Descriptor, resolver remotes.Resolver, imagesHandler containerdimages.HandlerFunc) error {
// Annotate ref with digest to push only push tag for single digest
if !strings.Contains(ref, "@") {
ref = ref + "@" + desc.Digest.String()
}

pusher, err := resolver.Pusher(ctx, ref)
if err != nil {
return err
}

wrapper := func(h images.Handler) images.Handler {
return images.Handlers(imagesHandler, h)
}

sources, err := collectSources(ctx, desc, store)
if err != nil {
return err
}

lazyStore := newLazyContentStore(store, sources)

var limiter *semaphore.Weighted
return remotes.PushContent(ctx, pusher, desc, lazyStore, limiter, platforms.All, wrapper)
}

func findLazyChildren(ctx context.Context, desc ocispec.Descriptor, store content.Store) ([]ocispec.Descriptor, error) {
// Collect to hashset to remove duplicates
set := map[string]ocispec.Descriptor{}

// Do a breadth-first search starting from this descriptor
queue := []ocispec.Descriptor{desc}
for len(queue) > 0 {
child := queue[0]
queue = queue[1:]

if containerdimages.IsNonDistributable(child.MediaType) {
continue
}

_, err := store.ReaderAt(ctx, child)
if err != nil {
if cerrdefs.IsNotFound(err) {
set[child.Digest.String()] = child
continue
}
return nil, err
out := streamformatter.NewJSONProgressOutput(outStream, false)
finishProgress := showProgress(ctx, jobs, out, combineProgress(pushProgress(tracker), pullProgress(store, false)))
defer func() {
finishProgress()
if outerr == nil {
progress.Messagef(out, "", "%s: digest: %s, size: %d", tag, target.Digest.String(), target.Size)
}
}()

newChildren, err := containerdimages.Children(ctx, store, child)
if err != nil {
return nil, err
}

if len(newChildren) > 0 {
queue = append(queue, newChildren...)
}
}
var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
pusher := newLazyPusher(store, resolver, jobs, limiter, limiter)

result := make([]ocispec.Descriptor, 0, len(set))
for _, desc := range set {
result = append(result, desc)
logrus.WithField("digest", desc.Digest.String()).
WithField("mediaType", desc.MediaType).
Debug("lazy children found")
}

return result, nil
}

// peekNotJson does a small peek of the content to check if content is definitely not JSON.
// It returns true if content is definitely not JSON, or false if it was unable to detect if it's
// JSON or not.
func peekNotJson(ctx context.Context, store content.Store, desc ocispec.Descriptor) (bool, error) {
readerAt, err := store.ReaderAt(ctx, desc)
leasedCtx, release, err := i.client.WithLease(ctx)
if err != nil {
logrus.WithError(err).WithField("digest", desc.Digest).Debug("failed to create reader to peek for json")
return false, err
}

buffer := []byte{0}
n, err := readerAt.ReadAt(buffer, 0)
if n != 1 || err != nil {
logrus.WithError(err).WithField("digest", desc.Digest).Debug("failed to peek json")
return false, err
}

// It doesn't start with {, then it's not a json.
return rune(buffer[0]) != '{', nil
}

func collectSources(ctx context.Context, desc ocispec.Descriptor, store content.Store) (map[digest.Digest]distributionSource, error) {
lazyChildren, err := findLazyChildren(ctx, desc, store)
if err != nil {
logrus.WithField("digest", desc.Digest.String()).
WithField("mediaType", desc.MediaType).
WithError(err).Error("failed to find lazy children referenced by descriptor")
return nil, err
}

sources := map[digest.Digest]distributionSource{}

success := errors.New("success, found the source but can't return earlier without an error")
err = store.Walk(ctx, func(i content.Info) error {
source := extractDistributionSource(i.Labels)

// Nah, we're looking for a parent of this lazy child.
// This one will not provide us with the source.
if source.value == "" {
return nil
}

desc := ocispec.Descriptor{Digest: i.Digest}

// Do a simple peek of the content to avoid big blobs which definitely aren't json.
notJson, err := peekNotJson(ctx, store, desc)
if err != nil {
return err
}
if notJson {
logrus.WithField("digest", i.Digest).Debug("skipping, definitely not a json")
return nil
}

// Read the manifest
blob, err := content.ReadBlob(ctx, store, desc)
if err != nil {
logrus.WithError(err).WithField("digest", i.Digest).Error("error reading blob")
return err
}

// Manifests and indexes have different children.
// Index stores other manifests and manifests store layers.
// To avoid unmarshaling the blob separately as manifest and index
// this holds fields that contains them both and the media type.
var indexOrManifest struct {
MediaType string `json:"mediaType,omitempty"`
Manifests []ocispec.Descriptor `json:"manifests,omitempty"`
Layers []ocispec.Descriptor `json:"layers,omitempty"`
}

err = json.Unmarshal(blob, &indexOrManifest)
if err != nil {
return nil
}

mediaType := indexOrManifest.MediaType
// Just in case, check if it really is manifest or index.
if !containerdimages.IsManifestType(mediaType) && !containerdimages.IsIndexType(mediaType) {
return nil
}
if len(indexOrManifest.Layers) == 0 && len(indexOrManifest.Manifests) == 0 {
return nil
}

// Look if this manifest/index specifies any of the lazy children
children := append(indexOrManifest.Layers, indexOrManifest.Manifests...)
for _, layer := range children {
for idx, wanted := range lazyChildren {
if layer.Digest == wanted.Digest {
// Found it!
sources[wanted.Digest] = source

// Don't look for it anymore
if len(lazyChildren) > 1 {
lastIdx := len(lazyChildren) - 1
lazyChildren[idx] = lazyChildren[lastIdx]
lazyChildren = lazyChildren[:lastIdx]
} else {
// We found all lazy children, let's end the walk.
lazyChildren = lazyChildren[:0]
return success
}
}
}
}

return nil
})

if err == success {
err = nil
}
if len(lazyChildren) > 0 {
msg := "missing blobs with no source: "
for idx, c := range lazyChildren {
if idx != 0 {
msg += ", "
}
msg += c.Digest.String()
}
err = errdefs.NotFound(errors.New(msg))
}

return sources, err
}

func extractDistributionSource(labels map[string]string) distributionSource {
var source distributionSource

// Check if this blob has a distributionSource label
// if yes, read it as source
for k, v := range labels {
if strings.HasPrefix(k, "containerd.io/distribution.source.") {
source.key = k
source.value = v
break
}
return err
}
defer release(leasedCtx)

return source
}

type distributionSource struct {
key string
value string
return pusher.push(leasedCtx, ref, target, imageHandler)
}
Loading