diff --git a/Gopkg.lock b/Gopkg.lock index 6a0993d051d9..336a1e0a304f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -325,7 +325,7 @@ revision = "f0d7bb60956f88d4743f5fea66b5607b96d262c9" [[projects]] - digest = "1:04a13ba6568d48b986062c9f08455e4fe2a3681a0ad79ec6def45070a519506d" + digest = "1:c66ac8b6d6f89ac293db6948a2518029e16e303c2a0b7713b6fe1ca0b015883e" name = "github.com/knative/pkg" packages = [ "apis", @@ -361,7 +361,7 @@ "webhook", ] pruneopts = "NUT" - revision = "0122abd98318e345b107e01137ff05c33aa637d2" + revision = "8fc80deb200e7853795e6baf4029f406ca9534b8" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 02d285cc72f9..a26a64098c04 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -33,8 +33,8 @@ required = [ [[constraint]] name = "github.com/knative/pkg" - # HEAD as of 2018-09-06 - revision = "0122abd98318e345b107e01137ff05c33aa637d2" + # HEAD as of 2018-09-12 + revision = "8fc80deb200e7853795e6baf4029f406ca9534b8" [[constraint]] name = "github.com/knative/caching" diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 021491e41c86..df98c02f419b 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -124,7 +124,7 @@ func main() { }() // Watch the logging config map and dynamically update logging levels. - configMapWatcher := configmap.NewDefaultWatcher(kubeClient, system.Namespace) + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace) configMapWatcher.Watch(logging.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logLevelKey)) if err = configMapWatcher.Start(stopCh); err != nil { logger.Fatalf("failed to start configuration manager: %v", err) diff --git a/cmd/autoscaler/main.go b/cmd/autoscaler/main.go index b5578bb82518..c9cb046a58be 100644 --- a/cmd/autoscaler/main.go +++ b/cmd/autoscaler/main.go @@ -91,7 +91,7 @@ func main() { } // Watch the logging config map and dynamically update logging levels. - configMapWatcher := configmap.NewDefaultWatcher(kubeClientSet, system.Namespace) + configMapWatcher := configmap.NewInformedWatcher(kubeClientSet, system.Namespace) configMapWatcher.Watch(logging.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logLevelKey)) // This is based on how Kubernetes sets up its scale client based on discovery: diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 0e9c317c9c13..4ee016af6ebe 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -114,7 +114,7 @@ func main() { time.Minute*5, system.Namespace, nil) vpaInformerFactory := vpainformers.NewSharedInformerFactory(vpaClient, time.Second*30) - configMapWatcher := configmap.NewDefaultWatcher(kubeClient, system.Namespace) + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace) opt := reconciler.Options{ KubeClientSet: kubeClient, diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 9738adfb5637..befa816d6a44 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -70,7 +70,7 @@ func main() { } // Watch the logging config map and dynamically update logging levels. - configMapWatcher := configmap.NewDefaultWatcher(kubeClient, system.Namespace) + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace) configMapWatcher.Watch(logging.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logLevelKey)) if err = configMapWatcher.Start(stopCh); err != nil { logger.Fatalf("failed to start configuration manager: %v", err) diff --git a/pkg/reconciler/v1alpha1/autoscaling/autoscaling_test.go b/pkg/reconciler/v1alpha1/autoscaling/autoscaling_test.go index 078fd7cbed36..ce99e1937979 100644 --- a/pkg/reconciler/v1alpha1/autoscaling/autoscaling_test.go +++ b/pkg/reconciler/v1alpha1/autoscaling/autoscaling_test.go @@ -47,7 +47,7 @@ var ( ) func newConfigWatcher() configmap.Watcher { - return configmap.NewFixedWatcher( + return configmap.NewStaticWatcher( &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: system.Namespace, diff --git a/pkg/reconciler/v1alpha1/revision/queueing_test.go b/pkg/reconciler/v1alpha1/revision/queueing_test.go index 37a1d0cd524a..8bdf8ae55a4f 100644 --- a/pkg/reconciler/v1alpha1/revision/queueing_test.go +++ b/pkg/reconciler/v1alpha1/revision/queueing_test.go @@ -148,7 +148,7 @@ func newTestController(t *testing.T, servingObjects ...runtime.Object) ( servingClient = fakeclientset.NewSimpleClientset(servingObjects...) vpaClient = fakevpaclientset.NewSimpleClientset() - configMapWatcher = configmap.NewFixedWatcher(&corev1.ConfigMap{ + configMapWatcher = configmap.NewStaticWatcher(&corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: system.Namespace, Name: config.NetworkConfigName, diff --git a/pkg/reconciler/v1alpha1/revision/revision_test.go b/pkg/reconciler/v1alpha1/revision/revision_test.go index 6fc934186c1c..1ae0e58a6809 100644 --- a/pkg/reconciler/v1alpha1/revision/revision_test.go +++ b/pkg/reconciler/v1alpha1/revision/revision_test.go @@ -186,7 +186,7 @@ func newTestControllerWithConfig(t *testing.T, controllerConfig *config.Controll cms = append(cms, cm) } - configMapWatcher = configmap.NewFixedWatcher(cms...) + configMapWatcher = configmap.NewStaticWatcher(cms...) // Create informer factories with fake clients. The second parameter sets the // resync period to zero, disabling it. diff --git a/pkg/reconciler/v1alpha1/route/queueing_test.go b/pkg/reconciler/v1alpha1/route/queueing_test.go index b08df0a47bfe..bb5d60602d8f 100644 --- a/pkg/reconciler/v1alpha1/route/queueing_test.go +++ b/pkg/reconciler/v1alpha1/route/queueing_test.go @@ -64,7 +64,7 @@ func TestNewRouteCallsSyncHandler(t *testing.T) { // Create fake clients kubeClient := fakekubeclientset.NewSimpleClientset() - configMapWatcher := configmap.NewFixedWatcher(&corev1.ConfigMap{ + configMapWatcher := configmap.NewStaticWatcher(&corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: config.DomainConfigName, Namespace: system.Namespace, diff --git a/pkg/reconciler/v1alpha1/route/route_test.go b/pkg/reconciler/v1alpha1/route/route_test.go index 35f9de4bde2d..d0a96824e619 100644 --- a/pkg/reconciler/v1alpha1/route/route_test.go +++ b/pkg/reconciler/v1alpha1/route/route_test.go @@ -205,7 +205,7 @@ func newTestSetup(t *testing.T, configs ...*corev1.ConfigMap) ( cms = append(cms, cm) } - configMapWatcher = configmap.NewFixedWatcher(cms...) + configMapWatcher = configmap.NewStaticWatcher(cms...) sharedClient = fakesharedclientset.NewSimpleClientset() servingClient = fakeclientset.NewSimpleClientset() diff --git a/vendor/github.com/knative/pkg/configmap/default.go b/vendor/github.com/knative/pkg/configmap/default.go deleted file mode 100644 index 0f30cf6d9288..000000000000 --- a/vendor/github.com/knative/pkg/configmap/default.go +++ /dev/null @@ -1,128 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package configmap - -import ( - "errors" - "sync" - - corev1 "k8s.io/api/core/v1" - informers "k8s.io/client-go/informers" - corev1informers "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/tools/cache" -) - -// defaultImpl provides a default informer-based implementation of Watcher. -type defaultImpl struct { - sif informers.SharedInformerFactory - informer corev1informers.ConfigMapInformer - ns string - - // Guards mutations to defaultImpl fields - m sync.Mutex - - observers map[string][]Observer - started bool -} - -// Asserts that defaultImpl implements Watcher. -var _ Watcher = (*defaultImpl)(nil) - -// Watch implements Watcher -func (di *defaultImpl) Watch(name string, w Observer) { - di.m.Lock() - defer di.m.Unlock() - - if di.observers == nil { - di.observers = make(map[string][]Observer) - } - - wl, _ := di.observers[name] - di.observers[name] = append(wl, w) -} - -// Start implements Watcher -func (di *defaultImpl) Start(stopCh <-chan struct{}) error { - if err := di.registerCallbackAndStartInformer(stopCh); err != nil { - return err - } - - // Wait until it has been synced (WITHOUT holding the mutex, so callbacks happen) - if ok := cache.WaitForCacheSync(stopCh, di.informer.Informer().HasSynced); !ok { - return errors.New("Error waiting for ConfigMap informer to sync.") - } - - return di.checkObservedResourcesExist() -} - -func (di *defaultImpl) registerCallbackAndStartInformer(stopCh <-chan struct{}) error { - di.m.Lock() - defer di.m.Unlock() - if di.started { - return errors.New("Watcher already started!") - } - di.started = true - - di.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: di.addConfigMapEvent, - UpdateFunc: di.updateConfigMapEvent, - }) - - // Start the shared informer factory (non-blocking) - di.sif.Start(stopCh) - return nil -} - -func (di *defaultImpl) checkObservedResourcesExist() error { - di.m.Lock() - defer di.m.Unlock() - // Check that all objects with Observers exist in our informers. - for k := range di.observers { - _, err := di.informer.Lister().ConfigMaps(di.ns).Get(k) - if err != nil { - return err - } - } - return nil -} - -func (di *defaultImpl) addConfigMapEvent(obj interface{}) { - // If the ConfigMap update is outside of our namespace, then quickly disregard it. - configMap := obj.(*corev1.ConfigMap) - if configMap.Namespace != di.ns { - // Outside of our namespace. - // This shouldn't happen due to our filtered informer. - return - } - - // Within our namespace, take the lock and see if there are any registered observers. - di.m.Lock() - defer di.m.Unlock() - wl, ok := di.observers[configMap.Name] - if !ok { - return // No observers. - } - - // Iterate over the observers and invoke their callbacks. - for _, w := range wl { - w(configMap) - } -} - -func (di *defaultImpl) updateConfigMapEvent(old, new interface{}) { - di.addConfigMapEvent(new) -} diff --git a/vendor/github.com/knative/pkg/configmap/informed_watcher.go b/vendor/github.com/knative/pkg/configmap/informed_watcher.go new file mode 100644 index 000000000000..9da18f7778e7 --- /dev/null +++ b/vendor/github.com/knative/pkg/configmap/informed_watcher.go @@ -0,0 +1,122 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +istributed under the License is istributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "errors" + "time" + + corev1 "k8s.io/api/core/v1" + informers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +// NewDefaultWatcher creates a new default configmap.Watcher instance. +// +// Deprecated: Use NewInformedWatcher +func NewDefaultWatcher(kc kubernetes.Interface, namespace string) *InformedWatcher { + return NewInformedWatcher(kc, namespace) +} + +// NewInformedWatcher watchers a Kubernetes namespace for configmap changs +func NewInformedWatcher(kc kubernetes.Interface, namespace string) *InformedWatcher { + sif := informers.NewSharedInformerFactoryWithOptions( + kc, + 5*time.Minute, + informers.WithNamespace(namespace), + ) + + return &InformedWatcher{ + sif: sif, + informer: sif.Core().V1().ConfigMaps(), + ManualWatcher: ManualWatcher{ + Namespace: namespace, + }, + } +} + +// InformedWatcher provides an informer-based implementation of Watcher. +type InformedWatcher struct { + sif informers.SharedInformerFactory + informer corev1informers.ConfigMapInformer + started bool + + // Embedding this struct allows us to reuse the logic + // of registering and notifying observers. This simplifies the + // InformedWatcher to just setting up the Kubernetes informer + ManualWatcher +} + +// Asserts that InformedWatcher implements Watcher. +var _ Watcher = (*InformedWatcher)(nil) + +// Start implements Watcher +func (i *InformedWatcher) Start(stopCh <-chan struct{}) error { + if err := i.registerCallbackAndStartInformer(stopCh); err != nil { + return err + } + + // Wait until it has been synced (WITHOUT holing the mutex, so callbacks happen) + if ok := cache.WaitForCacheSync(stopCh, i.informer.Informer().HasSynced); !ok { + return errors.New("Error waiting for ConfigMap informer to sync.") + } + + return i.checkObservedResourcesExist() +} + +func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{}) error { + i.m.Lock() + defer i.m.Unlock() + if i.started { + return errors.New("Watcher already started!") + } + i.started = true + + i.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: i.addConfigMapEvent, + UpdateFunc: i.updateConfigMapEvent, + }) + + // Start the shared informer factory (non-blocking) + i.sif.Start(stopCh) + return nil +} + +func (i *InformedWatcher) checkObservedResourcesExist() error { + i.m.Lock() + defer i.m.Unlock() + // Check that all objects with Observers exist in our informers. + for k := range i.observers { + _, err := i.informer.Lister().ConfigMaps(i.Namespace).Get(k) + if err != nil { + return err + } + } + return nil +} + +func (i *InformedWatcher) addConfigMapEvent(obj interface{}) { + configMap := obj.(*corev1.ConfigMap) + i.OnChange(configMap) +} + +func (i *InformedWatcher) updateConfigMapEvent(old, new interface{}) { + configMap := new.(*corev1.ConfigMap) + i.OnChange(configMap) +} diff --git a/vendor/github.com/knative/pkg/configmap/manual_watcher.go b/vendor/github.com/knative/pkg/configmap/manual_watcher.go new file mode 100644 index 000000000000..b14c5ac7b004 --- /dev/null +++ b/vendor/github.com/knative/pkg/configmap/manual_watcher.go @@ -0,0 +1,71 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "sync" + + corev1 "k8s.io/api/core/v1" +) + +// ManualWatcher will notify Observers when a ConfigMap is manually reported as changed +type ManualWatcher struct { + Namespace string + + // Guards mutations to defaultImpl fields + m sync.Mutex + + started bool + observers map[string][]Observer +} + +var _ Watcher = (*ManualWatcher)(nil) + +// Watch implements Watcher +func (w *ManualWatcher) Watch(name string, o Observer) { + w.m.Lock() + defer w.m.Unlock() + + if w.observers == nil { + w.observers = make(map[string][]Observer) + } + + wl, _ := w.observers[name] + w.observers[name] = append(wl, o) +} + +func (w *ManualWatcher) Start(<-chan struct{}) error { + return nil +} + +func (w *ManualWatcher) OnChange(configMap *corev1.ConfigMap) { + if configMap.Namespace != w.Namespace { + return + } + // Within our namespace, take the lock and see if there are any registered observers. + w.m.Lock() + defer w.m.Unlock() + observers, ok := w.observers[configMap.Name] + if !ok { + return // No observers. + } + + // Iterate over the observers and invoke their callbacks. + for _, o := range observers { + o(configMap) + } +} diff --git a/vendor/github.com/knative/pkg/configmap/fixed.go b/vendor/github.com/knative/pkg/configmap/static_watcher.go similarity index 53% rename from vendor/github.com/knative/pkg/configmap/fixed.go rename to vendor/github.com/knative/pkg/configmap/static_watcher.go index d915e7ae0841..96a01140dbde 100644 --- a/vendor/github.com/knative/pkg/configmap/fixed.go +++ b/vendor/github.com/knative/pkg/configmap/static_watcher.go @@ -17,39 +17,47 @@ limitations under the License. package configmap import ( - "log" + "fmt" corev1 "k8s.io/api/core/v1" ) -// fixedImpl provides a fixed informer-based implementation of Watcher. -type fixedImpl struct { +// NewFixedWatcher returns a StaticWatcher that exposes a collection of ConfigMaps. +// +// Deprecated: Use NewStaticWatcher +func NewFixedWatcher(cms ...*corev1.ConfigMap) *StaticWatcher { + return NewStaticWatcher(cms...) +} + +// NewStaticWatcher returns an StaticWatcher that exposes a collection of ConfigMaps. +func NewStaticWatcher(cms ...*corev1.ConfigMap) *StaticWatcher { + cmm := make(map[string]*corev1.ConfigMap) + for _, cm := range cms { + cmm[cm.Name] = cm + } + return &StaticWatcher{cfgs: cmm} +} + +// StaticWatcher is a Watcher with static ConfigMaps. Callbacks will +// occur when Watch is invoked for a specific Observer +type StaticWatcher struct { cfgs map[string]*corev1.ConfigMap } // Asserts that fixedImpl implements Watcher. -var _ Watcher = (*fixedImpl)(nil) +var _ Watcher = (*StaticWatcher)(nil) // Watch implements Watcher -func (di *fixedImpl) Watch(name string, w Observer) { +func (di *StaticWatcher) Watch(name string, o Observer) { cm, ok := di.cfgs[name] if ok { - w(cm) + o(cm) } else { - log.Printf("Name %q is not found.", name) + panic(fmt.Sprintf("Tried to watch unknown config with name %q", name)) } } // Start implements Watcher -func (di *fixedImpl) Start(stopCh <-chan struct{}) error { +func (di *StaticWatcher) Start(<-chan struct{}) error { return nil } - -// NewFixedWatcher returns an Watcher that exposes the fixed collection of ConfigMaps. -func NewFixedWatcher(cms ...*corev1.ConfigMap) Watcher { - cmm := make(map[string]*corev1.ConfigMap) - for _, cm := range cms { - cmm[cm.Name] = cm - } - return &fixedImpl{cfgs: cmm} -} diff --git a/vendor/github.com/knative/pkg/configmap/watcher.go b/vendor/github.com/knative/pkg/configmap/watcher.go index ccb75f36cebd..d248bbd73a52 100644 --- a/vendor/github.com/knative/pkg/configmap/watcher.go +++ b/vendor/github.com/knative/pkg/configmap/watcher.go @@ -17,11 +17,7 @@ limitations under the License. package configmap import ( - "time" - corev1 "k8s.io/api/core/v1" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" ) // Observer is the signature of the callbacks that notify an observer of the latest @@ -40,15 +36,3 @@ type Watcher interface { // initial state of the ConfigMaps they are watching. Start(<-chan struct{}) error } - -// NewDefaultWatcher creates a new default configmap.Watcher instance. -func NewDefaultWatcher(kc kubernetes.Interface, ns string) Watcher { - sif := kubeinformers.NewFilteredSharedInformerFactory( - kc, 5*time.Minute, ns, nil) - - return &defaultImpl{ - sif: sif, - informer: sif.Core().V1().ConfigMaps(), - ns: ns, - } -}