diff --git a/cache/opts.go b/cache/opts.go index 911def3e1f38..a66c0e094f3a 100644 --- a/cache/opts.go +++ b/cache/opts.go @@ -13,6 +13,7 @@ type DescHandler struct { Provider func(session.Group) content.Provider Progress progress.Controller SnapshotLabels map[string]string + Ref string // string representation of desc origin, can be used as a sync key } type DescHandlers map[digest.Digest]*DescHandler diff --git a/cache/remote.go b/cache/remote.go index 3d21a3fb3e66..d8611cb5f88a 100644 --- a/cache/remote.go +++ b/cache/remote.go @@ -208,7 +208,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{ Provider: p.dh.Provider(p.session), Manager: p.ref.cm.ContentStore, - }, p.desc, logs.LoggerFromContext(ctx)) + }, p.desc, p.dh.Ref, logs.LoggerFromContext(ctx)) if err != nil { return nil, err } diff --git a/cache/remotecache/export.go b/cache/remotecache/export.go index 64345f26ab6a..ded6d5cb9871 100644 --- a/cache/remotecache/export.go +++ b/cache/remotecache/export.go @@ -58,11 +58,12 @@ type contentCacheExporter struct { chains *v1.CacheChains ingester content.Ingester oci bool + ref string } -func NewExporter(ingester content.Ingester, oci bool) Exporter { +func NewExporter(ingester content.Ingester, ref string, oci bool) Exporter { cc := v1.NewCacheChains() - return &contentCacheExporter{CacheExporterTarget: cc, chains: cc, ingester: ingester, oci: oci} + return &contentCacheExporter{CacheExporterTarget: cc, chains: cc, ingester: ingester, oci: oci, ref: ref} } func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string, error) { @@ -95,7 +96,7 @@ func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string return nil, errors.Errorf("missing blob %s", l.Blob) } layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob)) - if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, logs.LoggerFromContext(ctx)); err != nil { + if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, ce.ref, logs.LoggerFromContext(ctx)); err != nil { return nil, layerDone(errors.Wrap(err, "error writing layer blob")) } layerDone(nil) diff --git a/cache/remotecache/local/local.go b/cache/remotecache/local/local.go index 2ee194afdfd0..3971d0202138 100644 --- a/cache/remotecache/local/local.go +++ b/cache/remotecache/local/local.go @@ -42,7 +42,7 @@ func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExpor if err != nil { return nil, err } - return remotecache.NewExporter(cs, ociMediatypes), nil + return remotecache.NewExporter(cs, "", ociMediatypes), nil } } diff --git a/cache/remotecache/registry/registry.go b/cache/remotecache/registry/registry.go index 281d9fa4a3f3..a704693e92a4 100644 --- a/cache/remotecache/registry/registry.go +++ b/cache/remotecache/registry/registry.go @@ -52,7 +52,7 @@ func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) r if err != nil { return nil, err } - return remotecache.NewExporter(contentutil.FromPusher(pusher), ociMediatypes), nil + return remotecache.NewExporter(contentutil.FromPusher(pusher), ref, ociMediatypes), nil } } diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index 57264beb7026..4236650ad5fd 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -237,6 +237,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach Provider: p.manifest.Provider, Progress: progressController, SnapshotLabels: labels, + Ref: p.manifest.Ref, } } } diff --git a/util/contentutil/copy.go b/util/contentutil/copy.go index b471d8b94850..b4d68d73d5a6 100644 --- a/util/contentutil/copy.go +++ b/util/contentutil/copy.go @@ -13,8 +13,8 @@ import ( "github.com/pkg/errors" ) -func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, logger func([]byte)) error { - if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), logger)(ctx, desc); err != nil { +func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, ref string, logger func([]byte)) error { + if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), ref, logger)(ctx, desc); err != nil { return err } return nil @@ -65,7 +65,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content. handlers := []images.Handler{ images.ChildrenHandler(provider), filterHandler, - retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), func(_ []byte) {}), + retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), "", func(_ []byte) {}), } if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { @@ -73,7 +73,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content. } for i := len(manifestStack) - 1; i >= 0; i-- { - if err := Copy(ctx, ingester, provider, manifestStack[i], nil); err != nil { + if err := Copy(ctx, ingester, provider, manifestStack[i], "", nil); err != nil { return errors.WithStack(err) } } diff --git a/util/contentutil/copy_test.go b/util/contentutil/copy_test.go index 90ba55d34840..5a35b2b122a7 100644 --- a/util/contentutil/copy_test.go +++ b/util/contentutil/copy_test.go @@ -21,7 +21,7 @@ func TestCopy(t *testing.T) { err := content.WriteBlob(ctx, b0, "foo", bytes.NewBuffer([]byte("foobar")), ocispec.Descriptor{Size: -1}) require.NoError(t, err) - err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil) + err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, "", nil) require.NoError(t, err) dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))}) diff --git a/util/contentutil/fetcher_test.go b/util/contentutil/fetcher_test.go index 57ba6cc966a6..e96272a10580 100644 --- a/util/contentutil/fetcher_test.go +++ b/util/contentutil/fetcher_test.go @@ -26,7 +26,7 @@ func TestFetcher(t *testing.T) { p := FromFetcher(f) b1 := NewBuffer() - err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil) + err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, "", nil) require.NoError(t, err) dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))}) diff --git a/util/imageutil/config.go b/util/imageutil/config.go index 0be587058a6c..f514c9dedfec 100644 --- a/util/imageutil/config.go +++ b/util/imageutil/config.go @@ -101,7 +101,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co children := childrenConfigHandler(cache, platform) handlers := []images.Handler{ - retryhandler.New(remotes.FetchHandler(cache, fetcher), func(_ []byte) {}), + retryhandler.New(remotes.FetchHandler(cache, fetcher), str, func(_ []byte) {}), children, } if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { diff --git a/util/pull/pull.go b/util/pull/pull.go index ee9d67ee46c2..befdb8c08d51 100644 --- a/util/pull/pull.go +++ b/util/pull/pull.go @@ -148,7 +148,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) { } handlers = append(handlers, filterLayerBlobs(metadata, &mu), - retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), logs.LoggerFromContext(ctx)), + retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), p.ref, logs.LoggerFromContext(ctx)), childrenHandler, dslHandler, ) diff --git a/util/push/push.go b/util/push/push.go index 4cc19daacdab..9cd1aaca2b61 100644 --- a/util/push/push.go +++ b/util/push/push.go @@ -86,7 +86,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content } }) - pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), logs.LoggerFromContext(ctx)) + pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), ref, logs.LoggerFromContext(ctx)) pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref) if err != nil { return err diff --git a/util/resolver/retryhandler/retry.go b/util/resolver/retryhandler/retry.go index 36cf6af99e5b..5b461259a7ff 100644 --- a/util/resolver/retryhandler/retry.go +++ b/util/resolver/retryhandler/retry.go @@ -5,17 +5,43 @@ import ( "fmt" "io" "net" + "sync" "syscall" "time" "github.com/containerd/containerd/images" remoteserrors "github.com/containerd/containerd/remotes/errors" + "github.com/docker/distribution/reference" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" ) -func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc { +var mu sync.Mutex +var sem = map[string]*semaphore.Weighted{} + +const connsPerHost = 4 + +func New(f images.HandlerFunc, ref string, logger func([]byte)) images.HandlerFunc { + if ref != "" { + if named, err := reference.ParseNormalizedNamed(ref); err == nil { + ref = reference.Domain(named) + } + } + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + mu.Lock() + s, ok := sem[ref] + if !ok { + s = semaphore.NewWeighted(connsPerHost) + sem[ref] = s + } + mu.Unlock() + if err := s.Acquire(ctx, 1); err != nil { + return nil, err + } + defer s.Release(1) + backoff := time.Second for { descs, err := f(ctx, desc)