diff --git a/pkg/buses/bus.go b/pkg/buses/bus.go index cda926757ec..ccc2596eec8 100644 --- a/pkg/buses/bus.go +++ b/pkg/buses/bus.go @@ -89,7 +89,7 @@ func NewBusProvisioner(ref BusReference, handlerFuncs EventHandlerFuncs, opts *B } if opts.Reconciler == nil { opts.Reconciler = NewReconciler( - Provisioner, opts.MasterURL, opts.KubeConfig, opts.Cache, + ref, Provisioner, opts.MasterURL, opts.KubeConfig, opts.Cache, handlerFuncs, opts.Logger.Named(reconcilerLoggingComponent), ) } @@ -132,7 +132,7 @@ func NewBusDispatcher(ref BusReference, handlerFuncs EventHandlerFuncs, opts *Bu } if opts.Reconciler == nil { opts.Reconciler = NewReconciler( - Dispatcher, opts.MasterURL, opts.KubeConfig, opts.Cache, + ref, Dispatcher, opts.MasterURL, opts.KubeConfig, opts.Cache, handlerFuncs, opts.Logger.Named(reconcilerLoggingComponent), ) } @@ -162,7 +162,7 @@ func NewBusDispatcher(ref BusReference, handlerFuncs EventHandlerFuncs, opts *Bu // Run starts the bus's processing. func (b bus) Run(threadiness int, stopCh <-chan struct{}) { - go b.reconciler.Run(b.ref, threadiness, stopCh) + go b.reconciler.Run(threadiness, stopCh) b.reconciler.WaitForCacheSync(stopCh) if b.receiver != nil { go b.receiver.Run(stopCh) diff --git a/pkg/buses/reconciler.go b/pkg/buses/reconciler.go index db9e1f4c00a..b5d3892a9de 100644 --- a/pkg/buses/reconciler.go +++ b/pkg/buses/reconciler.go @@ -63,6 +63,7 @@ const ( // Subscribe/Unsubscribe happen. type Reconciler struct { bus channelsv1alpha1.GenericBus + ref BusReference handler EventHandlerFuncs cache *Cache @@ -97,6 +98,7 @@ type Reconciler struct { // kubeconfig: the path of a kubeconfig file to create a client connection to the masterURL with // handler: a EventHandlerFuncs with handler functions for the reconciler to call func NewReconciler( + ref BusReference, component, masterURL, kubeconfig string, cache *Cache, handler EventHandlerFuncs, logger *zap.SugaredLogger, @@ -133,6 +135,7 @@ func NewReconciler( reconciler := &Reconciler{ bus: nil, + ref: ref, handler: handler, cache: cache, @@ -154,44 +157,47 @@ func NewReconciler( } logger.Info("Setting up event handlers") - // Set up an event handler for when Bus resources change - busInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - bus := obj.(*channelsv1alpha1.Bus) - reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(bus)) - }, - UpdateFunc: func(old, new interface{}) { - oldBus := old.(*channelsv1alpha1.Bus) - newBus := new.(*channelsv1alpha1.Bus) - - if oldBus.ResourceVersion == newBus.ResourceVersion { - // Periodic resync will send update events for all known Buses. - // Two different versions of the same Bus will always have different RVs. - return - } - - reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus)) - }, - }) - // Set up an event handler for when ClusterBus resources change - clusterBusInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - clusterBus := obj.(*channelsv1alpha1.ClusterBus) - reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(clusterBus)) - }, - UpdateFunc: func(old, new interface{}) { - oldClusterBus := old.(*channelsv1alpha1.ClusterBus) - newClusterBus := new.(*channelsv1alpha1.ClusterBus) - - if oldClusterBus.ResourceVersion == newClusterBus.ResourceVersion { - // Periodic resync will send update events for all known ClusterBuses. - // Two different versions of the same ClusterBus will always have different RVs. - return - } - - reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(newClusterBus)) - }, - }) + if reconciler.ref.IsNamespaced() { + // Set up an event handler for when Bus resources change + busInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + bus := obj.(*channelsv1alpha1.Bus) + reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(bus)) + }, + UpdateFunc: func(old, new interface{}) { + oldBus := old.(*channelsv1alpha1.Bus) + newBus := new.(*channelsv1alpha1.Bus) + + if oldBus.ResourceVersion == newBus.ResourceVersion { + // Periodic resync will send update events for all known Buses. + // Two different versions of the same Bus will always have different RVs. + return + } + + reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus)) + }, + }) + } else { + // Set up an event handler for when ClusterBus resources change + clusterBusInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + clusterBus := obj.(*channelsv1alpha1.ClusterBus) + reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(clusterBus)) + }, + UpdateFunc: func(old, new interface{}) { + oldClusterBus := old.(*channelsv1alpha1.ClusterBus) + newClusterBus := new.(*channelsv1alpha1.ClusterBus) + + if oldClusterBus.ResourceVersion == newClusterBus.ResourceVersion { + // Periodic resync will send update events for all known ClusterBuses. + // Two different versions of the same ClusterBus will always have different RVs. + return + } + + reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(newClusterBus)) + }, + }) + } // Set up an event handler for when Channel resources change channelInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -284,7 +290,7 @@ func (r *Reconciler) RecordSubscriptionEventf(ref SubscriptionReference, eventty // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (r *Reconciler) Run(ref BusReference, threadiness int, stopCh <-chan struct{}) error { +func (r *Reconciler) Run(threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer r.workqueue.ShutDown() @@ -298,20 +304,20 @@ func (r *Reconciler) Run(ref BusReference, threadiness int, stopCh <-chan struct return err } - if len(ref.Namespace) == 0 { - // reconciler is for a ClusterBus - clusterBus, err := r.clusterBusesLister.Get(ref.Name) + if r.ref.IsNamespaced() { + // reconciler is for a namespaced Bus + bus, err := r.busesLister.Buses(r.ref.Namespace).Get(r.ref.Name) if err != nil { - r.logger.Fatalf("Unknown clusterbus %q: %v", ref.Name, err) + r.logger.Fatalf("Unknown bus %q: %v", r.ref.String(), err) } - r.bus = clusterBus.DeepCopy() + r.bus = bus.DeepCopy() } else { - // reconciler is for a namespaced Bus - bus, err := r.busesLister.Buses(ref.Namespace).Get(ref.Name) + // reconciler is for a ClusterBus + clusterBus, err := r.clusterBusesLister.Get(r.ref.Name) if err != nil { - r.logger.Fatalf("Unknown bus %q: %v", ref, err) + r.logger.Fatalf("Unknown clusterbus %q: %v", r.ref.String(), err) } - r.bus = bus.DeepCopy() + r.bus = clusterBus.DeepCopy() } r.logger.Info("Starting workers") @@ -330,7 +336,14 @@ func (r *Reconciler) Run(ref BusReference, threadiness int, stopCh <-chan struct // WaitForCacheSync blocks returning until the reconciler's informers have // synchronized. It returns an error if the caches cannot sync. func (r *Reconciler) WaitForCacheSync(stopCh <-chan struct{}) error { - if ok := informercache.WaitForCacheSync(stopCh, r.busesSynced, r.clusterBusesSynced, r.channelsSynced, r.subscriptionsSynced); !ok { + var busesSynced informercache.InformerSynced + // get correct synced reference for bus type + if r.ref.IsNamespaced() { + busesSynced = r.busesSynced + } else { + busesSynced = r.clusterBusesSynced + } + if ok := informercache.WaitForCacheSync(stopCh, busesSynced, r.channelsSynced, r.subscriptionsSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } return nil @@ -365,7 +378,7 @@ func (r *Reconciler) processNextWorkItem() bool { var key string var ok bool // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the + // form kind/namespace/name. We do this as the delayed nature of the // workqueue means the items in the informer cache may actually be // more up to date that when the item was initially put onto the // workqueue. @@ -377,7 +390,8 @@ func (r *Reconciler) processNextWorkItem() bool { runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } - // Run the syncHandler, passing it the name string of the resource to be synced. + // Run the syncHandler, passing it the kind/namespace/name key of the + // resource to be synced. if err := r.syncHandler(key); err != nil { r.workqueue.AddRateLimited(obj) return fmt.Errorf("error syncing reconciler '%s': %v", key, err) @@ -401,18 +415,13 @@ func (r *Reconciler) processNextWorkItem() bool { // converge the two. It then updates the Status block of the resource with the // current status. func (r *Reconciler) syncHandler(key string) error { - // Convert the namespace/name string into a distinct namespace and name + // Convert the kind/namespace/name string into a distinct kind, namespace and name kind, namespace, name, err := splitWorkqueueKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } - if r.bus == nil && !(kind == busKind || kind == clusterBusKind) { - // don't attempt to sync until we have seen the bus for this reconciler - return fmt.Errorf("Unknown bus for reconciler") - } - switch kind { case busKind: err = r.syncBus(namespace, name) @@ -448,7 +457,7 @@ func (r *Reconciler) syncBus(namespace string, name string) error { } // Sync the Bus - err = r.createOrUpdateBus(bus) + err = r.createOrUpdateBus(bus.DeepCopy()) if err != nil { return err } @@ -470,7 +479,7 @@ func (r *Reconciler) syncClusterBus(name string) error { } // Sync the ClusterBus - err = r.createOrUpdateClusterBus(clusterBus) + err = r.createOrUpdateBus(clusterBus.DeepCopy()) if err != nil { return err } @@ -530,36 +539,19 @@ func (r *Reconciler) syncSubscription(namespace string, name string) error { return nil } -func (r *Reconciler) createOrUpdateBus(bus *channelsv1alpha1.Bus) error { - if r.bus.GetObjectKind().GroupVersionKind().Kind != bus.Kind || - bus.Namespace != r.bus.GetObjectMeta().GetNamespace() || - bus.Name != r.bus.GetObjectMeta().GetName() { - // this is not our bus - return nil - } - - previousBus := r.bus - r.bus = bus.DeepCopy() - if !equality.Semantic.DeepEqual(r.bus.GetSpec(), previousBus.GetSpec()) { - err := r.handler.onBus(r.bus, r) - if err != nil { - return err - } - } - - return nil -} - -func (r *Reconciler) createOrUpdateClusterBus(clusterBus *channelsv1alpha1.ClusterBus) error { - if r.bus.GetObjectKind().GroupVersionKind().Kind != clusterBus.Kind || - clusterBus.Name != r.bus.GetObjectMeta().GetName() { - // this is not our clusterbus +func (r *Reconciler) createOrUpdateBus(bus channelsv1alpha1.GenericBus) error { + if r.ref != NewBusReference(bus) { + // not the bus for this reconciler return nil } - previousBus := r.bus - r.bus = clusterBus.DeepCopy() - if !equality.Semantic.DeepEqual(r.bus.GetSpec(), previousBus.GetSpec()) { + // stash the new bus on the reconciler while retaining the old bus. This + // operation is threadsafe because there is only a single Bus/ClusterBus + // that is valid for the Reconciler and the workqueue guarantees that it + // will not emit the same key concurrently. Any bus received is an updated + // revision of the current bus. + bus, r.bus = r.bus, bus + if !equality.Semantic.DeepEqual(r.bus.GetSpec(), bus.GetSpec()) { err := r.handler.onBus(r.bus, r) if err != nil { return err diff --git a/pkg/buses/references.go b/pkg/buses/references.go index 0f4029ed8a3..1a0b58608ca 100644 --- a/pkg/buses/references.go +++ b/pkg/buses/references.go @@ -46,8 +46,13 @@ func NewBusReferenceFromNames(name, namespace string) BusReference { } } +// IsNamespaced returns true is the reference is for a Bus and not a ClusterBus +func (r *BusReference) IsNamespaced() bool { + return r.Namespace != "" +} + func (r *BusReference) String() string { - if r.Namespace != "" { + if r.IsNamespaced() { return fmt.Sprintf("%s/%s", r.Namespace, r.Name) } return r.Name diff --git a/pkg/buses/references_test.go b/pkg/buses/references_test.go index 84854f2adec..febff414ec1 100644 --- a/pkg/buses/references_test.go +++ b/pkg/buses/references_test.go @@ -86,6 +86,29 @@ func TestNewBusReferenceFromNames_ClusterBus(t *testing.T) { } } +func TestBusReference_IsNamespaced(t *testing.T) { + busRef := buses.BusReference{ + Name: referencesTestBusName, + Namespace: referencesTestNamespace, + } + expected := true + actual := busRef.IsNamespaced() + if expected != actual { + t.Errorf("%s expected: %+v got: %+v", "IsNamespaced", expected, actual) + } +} + +func TestBusReference_IsNamespaced_ClusterBus(t *testing.T) { + busRef := buses.BusReference{ + Name: referencesTestClusterBusName, + } + expected := false + actual := busRef.IsNamespaced() + if expected != actual { + t.Errorf("%s expected: %+v got: %+v", "IsNamespaced", expected, actual) + } +} + func TestBusReference_String(t *testing.T) { ref := buses.BusReference{ Name: referencesTestBusName,