From a86786b621476852542621983e195ecbd649d5c7 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 7 Sep 2018 15:30:20 -0400 Subject: [PATCH 1/5] Cleanup Bus/ClusterBus reconciler - only start the needed informer for either Bus or ClusterBus, we don't need both - consolidate the createOrUpdateBus and createOrUpdateClusterBus methods - use BusReference to check if the updated bus is the reconciled bus Refs #418 --- pkg/buses/bus.go | 4 +- pkg/buses/reconciler.go | 131 ++++++++++++++++------------------- pkg/buses/references.go | 7 +- pkg/buses/references_test.go | 23 ++++++ 4 files changed, 90 insertions(+), 75 deletions(-) diff --git a/pkg/buses/bus.go b/pkg/buses/bus.go index 63614d5bd5e..8056ad29641 100644 --- a/pkg/buses/bus.go +++ b/pkg/buses/bus.go @@ -89,7 +89,7 @@ func NewBusProvisioner(busRef BusReference, handlerFuncs EventHandlerFuncs, opts } if opts.Reconciler == nil { opts.Reconciler = NewReconciler( - Provisioner, opts.MasterURL, opts.KubeConfig, opts.Cache, + busRef, Provisioner, opts.MasterURL, opts.KubeConfig, opts.Cache, handlerFuncs, opts.Logger.Named(reconcilerLoggingComponent), ) } @@ -132,7 +132,7 @@ func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts } if opts.Reconciler == nil { opts.Reconciler = NewReconciler( - Dispatcher, opts.MasterURL, opts.KubeConfig, opts.Cache, + busRef, Dispatcher, opts.MasterURL, opts.KubeConfig, opts.Cache, handlerFuncs, opts.Logger.Named(reconcilerLoggingComponent), ) } diff --git a/pkg/buses/reconciler.go b/pkg/buses/reconciler.go index 31cef13b30e..1fa8cc8eda1 100644 --- a/pkg/buses/reconciler.go +++ b/pkg/buses/reconciler.go @@ -63,6 +63,7 @@ const ( // Subscribe/Unsubscribe happen. type Reconciler struct { bus channelsv1alpha1.GenericBus + ref BusReference handler EventHandlerFuncs cache *Cache @@ -97,6 +98,7 @@ type Reconciler struct { // kubeconfig: the path of a kubeconfig file to create a client connection to the masterURL with // handler: a EventHandlerFuncs with handler functions for the reconciler to call func NewReconciler( + ref BusReference, component, masterURL, kubeconfig string, cache *Cache, handler EventHandlerFuncs, logger *zap.SugaredLogger, @@ -133,6 +135,7 @@ func NewReconciler( reconciler := &Reconciler{ bus: nil, + ref: ref, handler: handler, cache: cache, @@ -154,44 +157,47 @@ func NewReconciler( } logger.Info("Setting up event handlers") - // Set up an event handler for when Bus resources change - busInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - bus := obj.(*channelsv1alpha1.Bus) - reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(bus)) - }, - UpdateFunc: func(old, new interface{}) { - oldBus := old.(*channelsv1alpha1.Bus) - newBus := new.(*channelsv1alpha1.Bus) - - if oldBus.ResourceVersion == newBus.ResourceVersion { - // Periodic resync will send update events for all known Buses. - // Two different versions of the same Bus will always have different RVs. - return - } - - reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus)) - }, - }) - // Set up an event handler for when ClusterBus resources change - clusterBusInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - clusterBus := obj.(*channelsv1alpha1.ClusterBus) - reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(clusterBus)) - }, - UpdateFunc: func(old, new interface{}) { - oldClusterBus := old.(*channelsv1alpha1.ClusterBus) - newClusterBus := new.(*channelsv1alpha1.ClusterBus) - - if oldClusterBus.ResourceVersion == newClusterBus.ResourceVersion { - // Periodic resync will send update events for all known ClusterBuses. - // Two different versions of the same ClusterBus will always have different RVs. - return - } - - reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(newClusterBus)) - }, - }) + if reconciler.ref.IsNamespaced() { + // Set up an event handler for when Bus resources change + busInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + bus := obj.(*channelsv1alpha1.Bus) + reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(bus)) + }, + UpdateFunc: func(old, new interface{}) { + oldBus := old.(*channelsv1alpha1.Bus) + newBus := new.(*channelsv1alpha1.Bus) + + if oldBus.ResourceVersion == newBus.ResourceVersion { + // Periodic resync will send update events for all known Buses. + // Two different versions of the same Bus will always have different RVs. + return + } + + reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus)) + }, + }) + } else { + // Set up an event handler for when ClusterBus resources change + clusterBusInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + clusterBus := obj.(*channelsv1alpha1.ClusterBus) + reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(clusterBus)) + }, + UpdateFunc: func(old, new interface{}) { + oldClusterBus := old.(*channelsv1alpha1.ClusterBus) + newClusterBus := new.(*channelsv1alpha1.ClusterBus) + + if oldClusterBus.ResourceVersion == newClusterBus.ResourceVersion { + // Periodic resync will send update events for all known ClusterBuses. + // Two different versions of the same ClusterBus will always have different RVs. + return + } + + reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(newClusterBus)) + }, + }) + } // Set up an event handler for when Channel resources change channelInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -330,7 +336,14 @@ func (r *Reconciler) Run(busRef BusReference, threadiness int, stopCh <-chan str // WaitForCacheSync blocks returning until the reconciler's informers have // synchronized. It returns an error if the caches cannot sync. func (r *Reconciler) WaitForCacheSync(stopCh <-chan struct{}) error { - if ok := informercache.WaitForCacheSync(stopCh, r.busesSynced, r.clusterBusesSynced, r.channelsSynced, r.subscriptionsSynced); !ok { + var busesSynced informercache.InformerSynced + // get correct synced reference for bus type + if r.ref.IsNamespaced() { + busesSynced = r.busesSynced + } else { + busesSynced = r.clusterBusesSynced + } + if ok := informercache.WaitForCacheSync(stopCh, busesSynced, r.channelsSynced, r.subscriptionsSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } return nil @@ -408,11 +421,6 @@ func (r *Reconciler) syncHandler(key string) error { return nil } - if r.bus == nil && !(kind == busKind || kind == clusterBusKind) { - // don't attempt to sync until we have seen the bus for this reconciler - return fmt.Errorf("Unknown bus for reconciler") - } - switch kind { case busKind: err = r.syncBus(namespace, name) @@ -470,7 +478,7 @@ func (r *Reconciler) syncClusterBus(name string) error { } // Sync the ClusterBus - err = r.createOrUpdateClusterBus(clusterBus) + err = r.createOrUpdateBus(clusterBus) if err != nil { return err } @@ -530,36 +538,15 @@ func (r *Reconciler) syncSubscription(namespace string, name string) error { return nil } -func (r *Reconciler) createOrUpdateBus(bus *channelsv1alpha1.Bus) error { - if r.bus.GetObjectKind().GroupVersionKind().Kind != bus.Kind || - bus.Namespace != r.bus.GetObjectMeta().GetNamespace() || - bus.Name != r.bus.GetObjectMeta().GetName() { - // this is not our bus - return nil - } - - previousBus := r.bus - r.bus = bus.DeepCopy() - if !equality.Semantic.DeepEqual(r.bus.GetSpec(), previousBus.GetSpec()) { - err := r.handler.onBus(r.bus, r) - if err != nil { - return err - } - } - - return nil -} - -func (r *Reconciler) createOrUpdateClusterBus(clusterBus *channelsv1alpha1.ClusterBus) error { - if r.bus.GetObjectKind().GroupVersionKind().Kind != clusterBus.Kind || - clusterBus.Name != r.bus.GetObjectMeta().GetName() { - // this is not our clusterbus +func (r *Reconciler) createOrUpdateBus(bus channelsv1alpha1.GenericBus) error { + if r.ref != NewBusReference(bus) { + // not the bus for this reconciler return nil } - previousBus := r.bus - r.bus = clusterBus.DeepCopy() - if !equality.Semantic.DeepEqual(r.bus.GetSpec(), previousBus.GetSpec()) { + // stash the new bus on the reconciler while retaining the old bus + bus, r.bus = r.bus, bus + if !equality.Semantic.DeepEqual(r.bus.GetSpec(), bus.GetSpec()) { err := r.handler.onBus(r.bus, r) if err != nil { return err diff --git a/pkg/buses/references.go b/pkg/buses/references.go index 0f4029ed8a3..1a0b58608ca 100644 --- a/pkg/buses/references.go +++ b/pkg/buses/references.go @@ -46,8 +46,13 @@ func NewBusReferenceFromNames(name, namespace string) BusReference { } } +// IsNamespaced returns true is the reference is for a Bus and not a ClusterBus +func (r *BusReference) IsNamespaced() bool { + return r.Namespace != "" +} + func (r *BusReference) String() string { - if r.Namespace != "" { + if r.IsNamespaced() { return fmt.Sprintf("%s/%s", r.Namespace, r.Name) } return r.Name diff --git a/pkg/buses/references_test.go b/pkg/buses/references_test.go index 91d3f147fa0..ba39e029037 100644 --- a/pkg/buses/references_test.go +++ b/pkg/buses/references_test.go @@ -86,6 +86,29 @@ func TestNewBusReferenceFromNames_ClusterBus(t *testing.T) { } } +func TestBusReference_IsNamespaced(t *testing.T) { + busRef := buses.BusReference{ + Name: referencesTestBusName, + Namespace: referencesTestNamespace, + } + expected := true + actual := busRef.IsNamespaced() + if expected != actual { + t.Errorf("%s expected: %+v got: %+v", "IsNamespaced", expected, actual) + } +} + +func TestBusReference_IsNamespaced_ClusterBus(t *testing.T) { + busRef := buses.BusReference{ + Name: referencesTestClusterBusName, + } + expected := false + actual := busRef.IsNamespaced() + if expected != actual { + t.Errorf("%s expected: %+v got: %+v", "IsNamespaced", expected, actual) + } +} + func TestBusReference_String(t *testing.T) { busRef := buses.BusReference{ Name: referencesTestBusName, From df39751912f8685b32df131eabe90a1ddf41276b Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 7 Sep 2018 18:17:52 -0400 Subject: [PATCH 2/5] Remove redudant BusReference passed to Reconciler#Run() --- pkg/buses/bus.go | 2 +- pkg/buses/reconciler.go | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/buses/bus.go b/pkg/buses/bus.go index 8056ad29641..cb88ffca435 100644 --- a/pkg/buses/bus.go +++ b/pkg/buses/bus.go @@ -162,7 +162,7 @@ func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts // Run starts the bus's processing. func (b bus) Run(threadiness int, stopCh <-chan struct{}) { - go b.reconciler.Run(b.busRef, threadiness, stopCh) + go b.reconciler.Run(threadiness, stopCh) b.reconciler.WaitForCacheSync(stopCh) if b.receiver != nil { go b.receiver.Run(stopCh) diff --git a/pkg/buses/reconciler.go b/pkg/buses/reconciler.go index 1fa8cc8eda1..485053fdadf 100644 --- a/pkg/buses/reconciler.go +++ b/pkg/buses/reconciler.go @@ -290,7 +290,7 @@ func (r *Reconciler) RecordSubscriptionEventf(subscriptionRef SubscriptionRefere // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (r *Reconciler) Run(busRef BusReference, threadiness int, stopCh <-chan struct{}) error { +func (r *Reconciler) Run(threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer r.workqueue.ShutDown() @@ -304,20 +304,20 @@ func (r *Reconciler) Run(busRef BusReference, threadiness int, stopCh <-chan str return err } - if len(busRef.Namespace) == 0 { - // reconciler is for a ClusterBus - clusterBus, err := r.clusterBusesLister.Get(busRef.Name) + if r.ref.IsNamespaced() { + // reconciler is for a namespaced Bus + bus, err := r.busesLister.Buses(r.ref.Namespace).Get(r.ref.Name) if err != nil { - r.logger.Fatalf("Unknown clusterbus %q: %v", busRef.Name, err) + r.logger.Fatalf("Unknown bus %q: %v", r.ref.String(), err) } - r.bus = clusterBus.DeepCopy() + r.bus = bus.DeepCopy() } else { - // reconciler is for a namespaced Bus - bus, err := r.busesLister.Buses(busRef.Namespace).Get(busRef.Name) + // reconciler is for a ClusterBus + clusterBus, err := r.clusterBusesLister.Get(r.ref.Name) if err != nil { - r.logger.Fatalf("Unknown bus %q: %v", busRef, err) + r.logger.Fatalf("Unknown clusterbus %q: %v", r.ref.String(), err) } - r.bus = bus.DeepCopy() + r.bus = clusterBus.DeepCopy() } r.logger.Info("Starting workers") From 58e83045402b0ffaf6f7d5cc5de2b32a1e960737 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Mon, 10 Sep 2018 16:33:17 -0400 Subject: [PATCH 3/5] Create copy of Bus before calling createOrUpdateBus --- pkg/buses/reconciler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/buses/reconciler.go b/pkg/buses/reconciler.go index 485053fdadf..7083d00c44b 100644 --- a/pkg/buses/reconciler.go +++ b/pkg/buses/reconciler.go @@ -456,7 +456,7 @@ func (r *Reconciler) syncBus(namespace string, name string) error { } // Sync the Bus - err = r.createOrUpdateBus(bus) + err = r.createOrUpdateBus(bus.DeepCopy()) if err != nil { return err } @@ -478,7 +478,7 @@ func (r *Reconciler) syncClusterBus(name string) error { } // Sync the ClusterBus - err = r.createOrUpdateBus(clusterBus) + err = r.createOrUpdateBus(clusterBus.DeepCopy()) if err != nil { return err } From a31e6aa68c2198d7b290b2bc5cb20f09a2358de8 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 11 Sep 2018 18:58:47 -0400 Subject: [PATCH 4/5] Review feedback --- pkg/buses/reconciler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/buses/reconciler.go b/pkg/buses/reconciler.go index 251d1e05949..d873ec7a901 100644 --- a/pkg/buses/reconciler.go +++ b/pkg/buses/reconciler.go @@ -544,7 +544,11 @@ func (r *Reconciler) createOrUpdateBus(bus channelsv1alpha1.GenericBus) error { return nil } - // stash the new bus on the reconciler while retaining the old bus + // stash the new bus on the reconciler while retaining the old bus. This + // operation is threadsafe because there is only a single Bus/ClusterBus + // that is valid for the Reconciler and the workqueue guarantees that it + // will not emit the same key concurrently. Any bus received is an updated + // revision of the current bus. bus, r.bus = r.bus, bus if !equality.Semantic.DeepEqual(r.bus.GetSpec(), bus.GetSpec()) { err := r.handler.onBus(r.bus, r) From 5f4ffc061ac6314dd01b67dee6d254830f36d388 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 11 Sep 2018 19:19:46 -0400 Subject: [PATCH 5/5] Clarify that the workqueue key is --- pkg/buses/reconciler.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/buses/reconciler.go b/pkg/buses/reconciler.go index d873ec7a901..b5d3892a9de 100644 --- a/pkg/buses/reconciler.go +++ b/pkg/buses/reconciler.go @@ -378,7 +378,7 @@ func (r *Reconciler) processNextWorkItem() bool { var key string var ok bool // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the + // form kind/namespace/name. We do this as the delayed nature of the // workqueue means the items in the informer cache may actually be // more up to date that when the item was initially put onto the // workqueue. @@ -390,7 +390,8 @@ func (r *Reconciler) processNextWorkItem() bool { runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } - // Run the syncHandler, passing it the name string of the resource to be synced. + // Run the syncHandler, passing it the kind/namespace/name key of the + // resource to be synced. if err := r.syncHandler(key); err != nil { r.workqueue.AddRateLimited(obj) return fmt.Errorf("error syncing reconciler '%s': %v", key, err) @@ -414,7 +415,7 @@ func (r *Reconciler) processNextWorkItem() bool { // converge the two. It then updates the Status block of the resource with the // current status. func (r *Reconciler) syncHandler(key string) error { - // Convert the namespace/name string into a distinct namespace and name + // Convert the kind/namespace/name string into a distinct kind, namespace and name kind, namespace, name, err := splitWorkqueueKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))