diff --git a/daemon/containerd/image_pull.go b/daemon/containerd/image_pull.go index 167225753f18d..ddf4e87d1c84e 100644 --- a/daemon/containerd/image_pull.go +++ b/daemon/containerd/image_pull.go @@ -59,6 +59,11 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string, opts = append(opts, containerd.WithImageHandler(h)) opts = i.applySnapshotterOpts(opts, ref) + if i.limits.MaxConcurrentDownloads > 0 { + // TODO(vvoland): use the i.downloadLimiter directly + opts = append(opts, containerd.WithMaxConcurrentDownloads(i.limits.MaxConcurrentDownloads)) + } + out := streamformatter.NewJSONProgressOutput(outStream, false) finishProgress := showProgress(ctx, jobs, out, pullProgress(i.client.ContentStore(), true)) defer finishProgress() diff --git a/daemon/containerd/image_push.go b/daemon/containerd/image_push.go index 29d6f9758ff55..01bdcc9dc3eca 100644 --- a/daemon/containerd/image_push.go +++ b/daemon/containerd/image_push.go @@ -15,7 +15,6 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "golang.org/x/sync/semaphore" ) // PushImage initiates a push operation on the repository named localName. @@ -92,8 +91,7 @@ func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHea } }() - var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads - pusher := newLazyPusher(store, resolver, jobs, limiter, limiter) + pusher := newLazyPusher(store, resolver, jobs, i.downloadLimiter, i.uploadLimiter) leasedCtx, release, err := i.client.WithLease(ctx) if err != nil { diff --git a/daemon/containerd/service.go b/daemon/containerd/service.go index c004ad208bb29..e3b3f7c963593 100644 --- a/daemon/containerd/service.go +++ b/daemon/containerd/service.go @@ -19,6 +19,7 @@ import ( "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" "golang.org/x/sync/singleflight" ) @@ -31,6 +32,19 @@ type ImageService struct { pruneRunning int32 registryHosts RegistryHostsProvider registryService *registry.Service + limits Limits + downloadLimiter *semaphore.Weighted + uploadLimiter *semaphore.Weighted +} + +type Limits struct { + // MaxConcurrentDownloads is the maximum number of downloads that + // may take place at a time for each pull. + MaxConcurrentDownloads int + + // MaxConcurrentUploads is the maximum number of uploads that + // may take place at a time. + MaxConcurrentUploads int } type RegistryHostsProvider interface { @@ -38,13 +52,16 @@ type RegistryHostsProvider interface { } // NewService creates a new ImageService. -func NewService(c *containerd.Client, containers container.Store, snapshotter string, hostsProvider RegistryHostsProvider, registry *registry.Service) *ImageService { +func NewService(c *containerd.Client, limits Limits, containers container.Store, snapshotter string, hostsProvider RegistryHostsProvider, registry *registry.Service) *ImageService { return &ImageService{ client: c, containers: containers, + limits: limits, snapshotter: snapshotter, registryHosts: hostsProvider, registryService: registry, + downloadLimiter: semaphore.NewWeighted(int64(limits.MaxConcurrentDownloads)), + uploadLimiter: semaphore.NewWeighted(int64(limits.MaxConcurrentUploads)), } } diff --git a/daemon/daemon.go b/daemon/daemon.go index a7e46c0bf1420..0fae091804397 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -1020,7 +1020,11 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S if err := configureKernelSecuritySupport(config, driverName); err != nil { return nil, err } - d.imageService = ctrd.NewService(d.containerdCli, d.containers, driverName, d, d.registryService) + limits := ctrd.Limits{ + MaxConcurrentDownloads: config.MaxConcurrentDownloads, + MaxConcurrentUploads: config.MaxConcurrentUploads, + } + d.imageService = ctrd.NewService(d.containerdCli, limits, d.containers, driverName, d, d.registryService) } else { layerStore, err := layer.NewStoreFromOptions(layer.StoreOptions{ Root: config.Root, @@ -1121,10 +1125,10 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S // if migration is called from daemon/images. layerStore might move as well. d.imageService = images.NewImageService(imgSvcConfig) - logrus.Debugf("Max Concurrent Downloads: %d", imgSvcConfig.MaxConcurrentDownloads) - logrus.Debugf("Max Concurrent Uploads: %d", imgSvcConfig.MaxConcurrentUploads) logrus.Debugf("Max Download Attempts: %d", imgSvcConfig.MaxDownloadAttempts) } + logrus.Debugf("Max Concurrent Downloads: %d", config.MaxConcurrentDownloads) + logrus.Debugf("Max Concurrent Uploads: %d", config.MaxConcurrentUploads) go d.execCommandGC()