Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cache/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type DescHandler struct {
Provider func(session.Group) content.Provider
Progress progress.Controller
SnapshotLabels map[string]string
Ref string // string representation of desc origin, can be used as a sync key
}

type DescHandlers map[digest.Digest]*DescHandler
Expand Down
2 changes: 1 addition & 1 deletion cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{
Provider: p.dh.Provider(p.session),
Manager: p.ref.cm.ContentStore,
}, p.desc, logs.LoggerFromContext(ctx))
}, p.desc, p.dh.Ref, logs.LoggerFromContext(ctx))
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions cache/remotecache/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ type contentCacheExporter struct {
chains *v1.CacheChains
ingester content.Ingester
oci bool
ref string
}

func NewExporter(ingester content.Ingester, oci bool) Exporter {
func NewExporter(ingester content.Ingester, ref string, oci bool) Exporter {
cc := v1.NewCacheChains()
return &contentCacheExporter{CacheExporterTarget: cc, chains: cc, ingester: ingester, oci: oci}
return &contentCacheExporter{CacheExporterTarget: cc, chains: cc, ingester: ingester, oci: oci, ref: ref}
}

func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string, error) {
Expand Down Expand Up @@ -95,7 +96,7 @@ func (ce *contentCacheExporter) Finalize(ctx context.Context) (map[string]string
return nil, errors.Errorf("missing blob %s", l.Blob)
}
layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob))
if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, logs.LoggerFromContext(ctx)); err != nil {
if err := contentutil.Copy(ctx, ce.ingester, dgstPair.Provider, dgstPair.Descriptor, ce.ref, logs.LoggerFromContext(ctx)); err != nil {
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
}
layerDone(nil)
Expand Down
2 changes: 1 addition & 1 deletion cache/remotecache/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExpor
if err != nil {
return nil, err
}
return remotecache.NewExporter(cs, ociMediatypes), nil
return remotecache.NewExporter(cs, "", ociMediatypes), nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion cache/remotecache/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) r
if err != nil {
return nil, err
}
return remotecache.NewExporter(contentutil.FromPusher(pusher), ociMediatypes), nil
return remotecache.NewExporter(contentutil.FromPusher(pusher), ref, ociMediatypes), nil
}
}

Expand Down
1 change: 1 addition & 0 deletions source/containerimage/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach
Provider: p.manifest.Provider,
Progress: progressController,
SnapshotLabels: labels,
Ref: p.manifest.Ref,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions util/contentutil/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/pkg/errors"
)

func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, logger func([]byte)) error {
if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), logger)(ctx, desc); err != nil {
func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, ref string, logger func([]byte)) error {
if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), ref, logger)(ctx, desc); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -65,15 +65,15 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content.
handlers := []images.Handler{
images.ChildrenHandler(provider),
filterHandler,
retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), func(_ []byte) {}),
retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), "", func(_ []byte) {}),
}

if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
return errors.WithStack(err)
}

for i := len(manifestStack) - 1; i >= 0; i-- {
if err := Copy(ctx, ingester, provider, manifestStack[i], nil); err != nil {
if err := Copy(ctx, ingester, provider, manifestStack[i], "", nil); err != nil {
return errors.WithStack(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion util/contentutil/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestCopy(t *testing.T) {
err := content.WriteBlob(ctx, b0, "foo", bytes.NewBuffer([]byte("foobar")), ocispec.Descriptor{Size: -1})
require.NoError(t, err)

err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil)
err = Copy(ctx, b1, b0, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, "", nil)
require.NoError(t, err)

dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
Expand Down
2 changes: 1 addition & 1 deletion util/contentutil/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestFetcher(t *testing.T) {
p := FromFetcher(f)

b1 := NewBuffer()
err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, nil)
err = Copy(ctx, b1, p, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar")), Size: -1}, "", nil)
require.NoError(t, err)

dt, err := content.ReadBlob(ctx, b1, ocispec.Descriptor{Digest: digest.FromBytes([]byte("foobar"))})
Expand Down
2 changes: 1 addition & 1 deletion util/imageutil/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
children := childrenConfigHandler(cache, platform)

handlers := []images.Handler{
retryhandler.New(remotes.FetchHandler(cache, fetcher), func(_ []byte) {}),
retryhandler.New(remotes.FetchHandler(cache, fetcher), str, func(_ []byte) {}),
children,
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion util/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
}
handlers = append(handlers,
filterLayerBlobs(metadata, &mu),
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), logs.LoggerFromContext(ctx)),
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), p.ref, logs.LoggerFromContext(ctx)),
childrenHandler,
dslHandler,
)
Expand Down
2 changes: 1 addition & 1 deletion util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
}
})

pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), logs.LoggerFromContext(ctx))
pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), ref, logs.LoggerFromContext(ctx))
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
if err != nil {
return err
Expand Down
28 changes: 27 additions & 1 deletion util/resolver/retryhandler/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,43 @@ import (
"fmt"
"io"
"net"
"sync"
"syscall"
"time"

"github.com/containerd/containerd/images"
remoteserrors "github.com/containerd/containerd/remotes/errors"
"github.com/docker/distribution/reference"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)

func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc {
var mu sync.Mutex
var sem = map[string]*semaphore.Weighted{}

const connsPerHost = 4

func New(f images.HandlerFunc, ref string, logger func([]byte)) images.HandlerFunc {
if ref != "" {
if named, err := reference.ParseNormalizedNamed(ref); err == nil {
ref = reference.Domain(named)
}
}

return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
mu.Lock()
s, ok := sem[ref]
if !ok {
s = semaphore.NewWeighted(connsPerHost)
sem[ref] = s
}
mu.Unlock()
if err := s.Acquire(ctx, 1); err != nil {
return nil, err
}
defer s.Release(1)

backoff := time.Second
for {
descs, err := f(ctx, desc)
Expand Down