Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions daemon/containerd/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string,
opts = append(opts, containerd.WithImageHandler(h))
opts = i.applySnapshotterOpts(opts, ref)

if i.limits.MaxConcurrentDownloads > 0 {
// TODO(vvoland): use the i.downloadLimiter directly
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this TODO means?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intent here is to use the same semaphore.Weighted for all pulls - to limit the downloads across whole engine and not one pull.

opts = append(opts, containerd.WithMaxConcurrentDownloads(i.limits.MaxConcurrentDownloads))
Comment on lines +63 to +64
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For pulls, the maximum downloads is currently respected across one pull - not the whole engine. Pull creates its own semaphore.Weighted inside the private fetch function which makes it impossible to share one limiter for all pulls.
Possibly at some point we would need to have our own implementation of Pull anyway, so maybe it's fine for now? I also have a quick patch PoC for containerd that would allow to pass the semaphore.Weighted directly. So I could try to clean it up a bit and make a it PR for it upstream, WDYT?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, from the help: Set the max concurrent downloads for each pull (default 3)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The help text is misleading then - limit is per whole engine.

}

out := streamformatter.NewJSONProgressOutput(outStream, false)
finishProgress := showProgress(ctx, jobs, out, pullProgress(i.client.ContentStore(), true))
defer finishProgress()
Expand Down
4 changes: 1 addition & 3 deletions daemon/containerd/image_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

// PushImage initiates a push operation on the repository named localName.
Expand Down Expand Up @@ -92,8 +91,7 @@ func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHea
}
}()

var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
pusher := newLazyPusher(store, resolver, jobs, limiter, limiter)
pusher := newLazyPusher(store, resolver, jobs, i.downloadLimiter, i.uploadLimiter)

leasedCtx, release, err := i.client.WithLease(ctx)
if err != nil {
Expand Down
19 changes: 18 additions & 1 deletion daemon/containerd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
"golang.org/x/sync/singleflight"
)

Expand All @@ -31,20 +32,36 @@ type ImageService struct {
pruneRunning int32
registryHosts RegistryHostsProvider
registryService *registry.Service
limits Limits
downloadLimiter *semaphore.Weighted
uploadLimiter *semaphore.Weighted
}

type Limits struct {
// MaxConcurrentDownloads is the maximum number of downloads that
// may take place at a time for each pull.
MaxConcurrentDownloads int

// MaxConcurrentUploads is the maximum number of uploads that
// may take place at a time.
MaxConcurrentUploads int
}

type RegistryHostsProvider interface {
RegistryHosts() docker.RegistryHosts
}

// NewService creates a new ImageService.
func NewService(c *containerd.Client, containers container.Store, snapshotter string, hostsProvider RegistryHostsProvider, registry *registry.Service) *ImageService {
func NewService(c *containerd.Client, limits Limits, containers container.Store, snapshotter string, hostsProvider RegistryHostsProvider, registry *registry.Service) *ImageService {
return &ImageService{
client: c,
containers: containers,
limits: limits,
snapshotter: snapshotter,
registryHosts: hostsProvider,
registryService: registry,
downloadLimiter: semaphore.NewWeighted(int64(limits.MaxConcurrentDownloads)),
uploadLimiter: semaphore.NewWeighted(int64(limits.MaxConcurrentUploads)),
}
}

Expand Down
10 changes: 7 additions & 3 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,11 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
if err := configureKernelSecuritySupport(config, driverName); err != nil {
return nil, err
}
d.imageService = ctrd.NewService(d.containerdCli, d.containers, driverName, d, d.registryService)
limits := ctrd.Limits{
MaxConcurrentDownloads: config.MaxConcurrentDownloads,
MaxConcurrentUploads: config.MaxConcurrentUploads,
}
d.imageService = ctrd.NewService(d.containerdCli, limits, d.containers, driverName, d, d.registryService)
} else {
layerStore, err := layer.NewStoreFromOptions(layer.StoreOptions{
Root: config.Root,
Expand Down Expand Up @@ -1121,10 +1125,10 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
// if migration is called from daemon/images. layerStore might move as well.
d.imageService = images.NewImageService(imgSvcConfig)

logrus.Debugf("Max Concurrent Downloads: %d", imgSvcConfig.MaxConcurrentDownloads)
logrus.Debugf("Max Concurrent Uploads: %d", imgSvcConfig.MaxConcurrentUploads)
logrus.Debugf("Max Download Attempts: %d", imgSvcConfig.MaxDownloadAttempts)
}
logrus.Debugf("Max Concurrent Downloads: %d", config.MaxConcurrentDownloads)
logrus.Debugf("Max Concurrent Uploads: %d", config.MaxConcurrentUploads)

go d.execCommandGC()

Expand Down