Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/image/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
)

// NOTE: scheduler's semantics do not lend it for reuse elsewhere and its use in
Expand Down Expand Up @@ -33,7 +34,6 @@ type bucket map[interface{}]interface{}
// newScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting
// the rate at which buckets are processed, and a function to invoke when items are scanned in
// a bucket.
// TODO: remove DEBUG statements from this file once this logic has been adequately validated.
func newScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *scheduler {
// Add one more bucket to serve as the "current" bucket
bucketCount++
Expand All @@ -58,9 +58,11 @@ func (s *scheduler) RunUntil(ch <-chan struct{}) {
func (s *scheduler) RunOnce() {
key, value, last := s.next()
if last {
klog.V(5).Infof("Image controller scheduler: waiting for limit")
s.limiter.Accept()
return
}
klog.V(5).Infof("Image controller scheduler: handle %s", key)
s.handle(key, value)
}

Expand All @@ -74,6 +76,7 @@ func (s *scheduler) at(inc int) int {
func (s *scheduler) next() (interface{}, interface{}, bool) {
s.mu.Lock()
defer s.mu.Unlock()
klog.V(5).Infof("Image controller scheduler: queue (%d):\n %#v", s.position, s.buckets)

last := s.buckets[s.position]
// Grab the first item in the bucket, move it to the end and return it.
Expand All @@ -84,6 +87,7 @@ func (s *scheduler) next() (interface{}, interface{}, bool) {
}
// The bucket was empty. Advance to the next bucket.
s.position = s.at(1)
klog.V(5).Infof("Image controller scheduler: bucket was empty, advance to %d of %d buckets", s.position, len(s.buckets))
return nil, nil, true
}

Expand Down Expand Up @@ -116,6 +120,7 @@ func (s *scheduler) Add(key, value interface{}) {
least = size
}
}
klog.V(5).Infof("Image controller scheduler: add %s to bucket %d", key, target)
s.buckets[target][key] = value
}

Expand All @@ -126,13 +131,14 @@ func (s *scheduler) Remove(key, value interface{}) bool {
defer s.mu.Unlock()

match := true
for _, bucket := range s.buckets {
for i, bucket := range s.buckets {
if value != nil {
if old, ok := bucket[key]; ok && old != value {
match = false
continue
}
}
klog.V(5).Infof("Image controller scheduler: remove %s from bucket %d", key, i)
delete(bucket, key)
}
return match
Expand Down