From bae9267751357d1e902c424d4c1c1fbd32abd158 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Gronowski?= Date: Wed, 6 Jul 2022 11:42:37 +0200 Subject: [PATCH] containerd: Push progress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Gronowski --- daemon/containerd/progress.go | 195 ++++++++++++++++++++++------------ daemon/containerd/service.go | 40 ++++--- 2 files changed, 157 insertions(+), 78 deletions(-) diff --git a/daemon/containerd/progress.go b/daemon/containerd/progress.go index 31367a2f64bc0..b71f0971fca55 100644 --- a/daemon/containerd/progress.go +++ b/daemon/containerd/progress.go @@ -7,106 +7,169 @@ import ( "time" "github.com/containerd/containerd/content" - "github.com/containerd/containerd/errdefs" + cerrdefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/log" "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" "github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/stringid" "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/sirupsen/logrus" ) -func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, w io.Writer, stop chan struct{}) { +type updateProgressFunc func(ctx context.Context, ongoing *jobs, output progress.Output, start time.Time) error + +func showProgress(ctx context.Context, ongoing *jobs, w io.Writer, updateFunc updateProgressFunc) func() { + stop := make(chan struct{}) + ctx, cancelProgress := context.WithCancel(ctx) + var ( out = streamformatter.NewJSONProgressOutput(w, false) ticker = time.NewTicker(100 * time.Millisecond) start = time.Now() done bool ) - defer ticker.Stop() -outer: - for { - select { - case <-ticker.C: - if !ongoing.IsResolved() { - continue - } + for _, j := range ongoing.Jobs() { + id := stringid.TruncateID(j.Digest.Encoded()) + progress.Update(out, id, "Preparing") + } - pulling := map[string]content.Status{} - if !done { - actives, err := cs.ListStatuses(ctx, "") - if err != nil { - log.G(ctx).WithError(err).Error("status check failed") + go func() { + defer func() { + ticker.Stop() + stop <- struct{}{} + }() + + for { + select { + case <-ticker.C: + if !ongoing.IsResolved() { continue } - // update status of status entries! - for _, status := range actives { - pulling[status.Ref] = status + err := updateFunc(ctx, ongoing, out, start) + if err != nil { + logrus.WithError(err).Error("Updating progress failed") + return } - } - // update inactive jobs - for _, j := range ongoing.Jobs() { - key := remotes.MakeRefKey(ctx, j) - if info, ok := pulling[key]; ok { - out.WriteProgress(progress.Progress{ - ID: stringid.TruncateID(j.Digest.Encoded()), - Action: "Downloading", - Current: info.Offset, - Total: info.Total, - }) - continue + if done { + return } + case <-ctx.Done(): + done = true + } + } + }() - info, err := cs.Info(ctx, j.Digest) - if err != nil { - if !errdefs.IsNotFound(err) { - log.G(ctx).WithError(err).Error("failed to get content info") - continue outer - } - } else if info.CreatedAt.After(start) { - out.WriteProgress(progress.Progress{ - ID: stringid.TruncateID(j.Digest.Encoded()), - Action: "Download complete", - HideCounts: true, - LastUpdate: true, - }) - ongoing.Remove(j) + return func() { + cancelProgress() + <-stop + } +} + +func pushProgress(tracker docker.StatusTracker) updateProgressFunc { + return func(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error { + for _, j := range ongoing.Jobs() { + key := remotes.MakeRefKey(ctx, j) + id := stringid.TruncateID(j.Digest.Encoded()) + + status, err := tracker.GetStatus(key) + if err != nil { + if cerrdefs.IsNotFound(err) { + progress.Update(out, id, "Waiting") + continue } else { - out.WriteProgress(progress.Progress{ - ID: stringid.TruncateID(j.Digest.Encoded()), - Action: "Exists", - HideCounts: true, - LastUpdate: true, - }) - ongoing.Remove(j) + return err } + + } + + logrus.WithField("status", status).WithField("id", id).Debug("Status update") + + if status.Committed && status.Offset >= status.Total { + progress.Update(out, id, "Pushed") + ongoing.Remove(j) + continue + } + + out.WriteProgress(progress.Progress{ + ID: id, + Action: "Pushing", + Current: status.Offset, + Total: status.Total, + }) + } + + return nil + } +} + +func pullProgress(cs content.Store) updateProgressFunc { + return func(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error { + pulling := map[string]content.Status{} + actives, err := cs.ListStatuses(ctx, "") + if err != nil { + log.G(ctx).WithError(err).Error("status check failed") + return nil + } + // update status of status entries! + for _, status := range actives { + pulling[status.Ref] = status + } + + for _, j := range ongoing.Jobs() { + key := remotes.MakeRefKey(ctx, j) + if info, ok := pulling[key]; ok { + out.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(j.Digest.Encoded()), + Action: "Downloading", + Current: info.Offset, + Total: info.Total, + }) + continue } - if done { - return + + info, err := cs.Info(ctx, j.Digest) + if err != nil { + if !cerrdefs.IsNotFound(err) { + return err + } + } else if info.CreatedAt.After(start) { + out.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(j.Digest.Encoded()), + Action: "Download complete", + HideCounts: true, + LastUpdate: true, + }) + ongoing.Remove(j) + } else { + out.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(j.Digest.Encoded()), + Action: "Exists", + HideCounts: true, + LastUpdate: true, + }) + ongoing.Remove(j) } - case <-stop: - done = true // allow ui to update once more - case <-ctx.Done(): - return } + return nil } } -// jobs holds a list of layers being downloaded to pull reference set by name type jobs struct { name string - resolved bool // resolved is set to true once remote image metadata has been downloaded from registry - descs map[digest.Digest]v1.Descriptor + resolved bool // resolved is set to true once all jobs are added + descs map[digest.Digest]ocispec.Descriptor mu sync.Mutex } // newJobs creates a new instance of the job status tracker func newJobs() *jobs { return &jobs{ - descs: map[digest.Digest]v1.Descriptor{}, + descs: map[digest.Digest]ocispec.Descriptor{}, } } @@ -118,7 +181,7 @@ func (j *jobs) IsResolved() bool { } // Add adds a descriptor to be tracked -func (j *jobs) Add(desc v1.Descriptor) { +func (j *jobs) Add(desc ocispec.Descriptor) { j.mu.Lock() defer j.mu.Unlock() @@ -130,7 +193,7 @@ func (j *jobs) Add(desc v1.Descriptor) { } // Remove removes a descriptor -func (j *jobs) Remove(desc v1.Descriptor) { +func (j *jobs) Remove(desc ocispec.Descriptor) { j.mu.Lock() defer j.mu.Unlock() @@ -138,11 +201,11 @@ func (j *jobs) Remove(desc v1.Descriptor) { } // Jobs returns a list of all tracked descriptors -func (j *jobs) Jobs() []v1.Descriptor { +func (j *jobs) Jobs() []ocispec.Descriptor { j.mu.Lock() defer j.mu.Unlock() - descs := make([]v1.Descriptor, 0, len(j.descs)) + descs := make([]ocispec.Descriptor, 0, len(j.descs)) for _, d := range j.descs { descs = append(descs, d) } diff --git a/daemon/containerd/service.go b/daemon/containerd/service.go index 59e05d4319d0b..5ec15ce603f0a 100644 --- a/daemon/containerd/service.go +++ b/daemon/containerd/service.go @@ -74,7 +74,7 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla } } - resolver := newResolverFromAuthConfig(authConfig) + resolver, _ := newResolverFromAuthConfig(authConfig) opts = append(opts, containerd.WithResolver(resolver)) jobs := newJobs() @@ -86,11 +86,8 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla }) opts = append(opts, containerd.WithImageHandler(h)) - stop := make(chan struct{}) - go func() { - showProgress(ctx, jobs, cs.client.ContentStore(), outStream, stop) - stop <- struct{}{} - }() + finishProgress := showProgress(ctx, jobs, outStream, pullProgress(cs.client.ContentStore())) + defer finishProgress() img, err := cs.client.Pull(ctx, ref.String(), opts...) if err != nil { @@ -107,8 +104,6 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla return err } } - stop <- struct{}{} - <-stop return err } @@ -279,8 +274,9 @@ func (cs *containerdStore) setupFilters(ctx context.Context, opts types.ImageLis return filters, nil } -func newResolverFromAuthConfig(authConfig *types.AuthConfig) remotes.Resolver { +func newResolverFromAuthConfig(authConfig *types.AuthConfig) (remotes.Resolver, docker.StatusTracker) { opts := []docker.RegistryOpt{} + if authConfig != nil { authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(_ string) (string, string, error) { if authConfig.IdentityToken != "" { @@ -292,9 +288,12 @@ func newResolverFromAuthConfig(authConfig *types.AuthConfig) remotes.Resolver { opts = append(opts, docker.WithAuthorizer(authorizer)) } + tracker := docker.NewInMemoryTracker() + return docker.NewResolver(docker.ResolverOptions{ - Hosts: docker.ConfigureDefaultRegistries(opts...), - }) + Hosts: docker.ConfigureDefaultRegistries(opts...), + Tracker: tracker, + }), tracker } func (cs *containerdStore) LogImageEvent(imageID, refName, action string) { @@ -465,6 +464,7 @@ func (cs *containerdStore) PushImage(ctx context.Context, image, tag string, met } is := cs.client.ImageService() + store := cs.client.ContentStore() img, err := is.Get(ctx, ref.String()) if err != nil { @@ -486,13 +486,29 @@ func (cs *containerdStore) PushImage(ctx context.Context, image, tag string, met defer cs.client.ImageService().Delete(ctx, platformImg.Name, containerdimages.SynchronousDelete()) } + jobs := newJobs() + imageHandler := containerdimages.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { logrus.WithField("desc", desc).Debug("Pushing") + if desc.MediaType != containerdimages.MediaTypeDockerSchema1Manifest { + children, err := containerdimages.Children(ctx, store, desc) + + if err == nil { + for _, c := range children { + jobs.Add(c) + } + } + + jobs.Add(desc) + } return nil, nil }) imageHandler = remotes.SkipNonDistributableBlobs(imageHandler) - resolver := newResolverFromAuthConfig(authConfig) + resolver, tracker := newResolverFromAuthConfig(authConfig) + + finishProgress := showProgress(ctx, jobs, outStream, pushProgress(tracker)) + defer finishProgress() logrus.WithField("desc", target).WithField("ref", ref.String()).Info("Pushing desc to remote ref") err = cs.client.Push(ctx, ref.String(), target,