Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/buses/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewBusProvisioner(ref BusReference, handlerFuncs EventHandlerFuncs, opts *B
}
if opts.Reconciler == nil {
opts.Reconciler = NewReconciler(
Provisioner, opts.MasterURL, opts.KubeConfig, opts.Cache,
ref, Provisioner, opts.MasterURL, opts.KubeConfig, opts.Cache,
handlerFuncs, opts.Logger.Named(reconcilerLoggingComponent),
)
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func NewBusDispatcher(ref BusReference, handlerFuncs EventHandlerFuncs, opts *Bu
}
if opts.Reconciler == nil {
opts.Reconciler = NewReconciler(
Dispatcher, opts.MasterURL, opts.KubeConfig, opts.Cache,
ref, Dispatcher, opts.MasterURL, opts.KubeConfig, opts.Cache,
handlerFuncs, opts.Logger.Named(reconcilerLoggingComponent),
)
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func NewBusDispatcher(ref BusReference, handlerFuncs EventHandlerFuncs, opts *Bu

// Run starts the bus's processing.
func (b bus) Run(threadiness int, stopCh <-chan struct{}) {
go b.reconciler.Run(b.ref, threadiness, stopCh)
go b.reconciler.Run(threadiness, stopCh)
b.reconciler.WaitForCacheSync(stopCh)
if b.receiver != nil {
go b.receiver.Run(stopCh)
Expand Down
164 changes: 78 additions & 86 deletions pkg/buses/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
// Subscribe/Unsubscribe happen.
type Reconciler struct {
bus channelsv1alpha1.GenericBus
ref BusReference
Comment thread
grantr marked this conversation as resolved.
handler EventHandlerFuncs
cache *Cache

Expand Down Expand Up @@ -97,6 +98,7 @@ type Reconciler struct {
// kubeconfig: the path of a kubeconfig file to create a client connection to the masterURL with
// handler: a EventHandlerFuncs with handler functions for the reconciler to call
func NewReconciler(
ref BusReference,
component, masterURL, kubeconfig string,
cache *Cache, handler EventHandlerFuncs,
logger *zap.SugaredLogger,
Expand Down Expand Up @@ -133,6 +135,7 @@ func NewReconciler(

reconciler := &Reconciler{
bus: nil,
ref: ref,
handler: handler,
cache: cache,

Expand All @@ -154,44 +157,47 @@ func NewReconciler(
}

logger.Info("Setting up event handlers")
// Set up an event handler for when Bus resources change
busInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
bus := obj.(*channelsv1alpha1.Bus)
reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(bus))
},
UpdateFunc: func(old, new interface{}) {
oldBus := old.(*channelsv1alpha1.Bus)
newBus := new.(*channelsv1alpha1.Bus)

if oldBus.ResourceVersion == newBus.ResourceVersion {
// Periodic resync will send update events for all known Buses.
// Two different versions of the same Bus will always have different RVs.
return
}

reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus))
},
})
// Set up an event handler for when ClusterBus resources change
clusterBusInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
clusterBus := obj.(*channelsv1alpha1.ClusterBus)
reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(clusterBus))
},
UpdateFunc: func(old, new interface{}) {
oldClusterBus := old.(*channelsv1alpha1.ClusterBus)
newClusterBus := new.(*channelsv1alpha1.ClusterBus)

if oldClusterBus.ResourceVersion == newClusterBus.ResourceVersion {
// Periodic resync will send update events for all known ClusterBuses.
// Two different versions of the same ClusterBus will always have different RVs.
return
}

reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(newClusterBus))
},
})
if reconciler.ref.IsNamespaced() {
// Set up an event handler for when Bus resources change
busInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
bus := obj.(*channelsv1alpha1.Bus)
reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(bus))
},
UpdateFunc: func(old, new interface{}) {
oldBus := old.(*channelsv1alpha1.Bus)
newBus := new.(*channelsv1alpha1.Bus)

if oldBus.ResourceVersion == newBus.ResourceVersion {
// Periodic resync will send update events for all known Buses.
// Two different versions of the same Bus will always have different RVs.
return
}

reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus))
},
})
} else {
// Set up an event handler for when ClusterBus resources change
clusterBusInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
clusterBus := obj.(*channelsv1alpha1.ClusterBus)
reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(clusterBus))
},
UpdateFunc: func(old, new interface{}) {
oldClusterBus := old.(*channelsv1alpha1.ClusterBus)
newClusterBus := new.(*channelsv1alpha1.ClusterBus)

if oldClusterBus.ResourceVersion == newClusterBus.ResourceVersion {
// Periodic resync will send update events for all known ClusterBuses.
// Two different versions of the same ClusterBus will always have different RVs.
return
}

reconciler.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(newClusterBus))
},
})
}
// Set up an event handler for when Channel resources change
channelInformer.Informer().AddEventHandler(informercache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -284,7 +290,7 @@ func (r *Reconciler) RecordSubscriptionEventf(ref SubscriptionReference, eventty
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (r *Reconciler) Run(ref BusReference, threadiness int, stopCh <-chan struct{}) error {
func (r *Reconciler) Run(threadiness int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer r.workqueue.ShutDown()

Expand All @@ -298,20 +304,20 @@ func (r *Reconciler) Run(ref BusReference, threadiness int, stopCh <-chan struct
return err
}

if len(ref.Namespace) == 0 {
// reconciler is for a ClusterBus
clusterBus, err := r.clusterBusesLister.Get(ref.Name)
if r.ref.IsNamespaced() {
// reconciler is for a namespaced Bus
bus, err := r.busesLister.Buses(r.ref.Namespace).Get(r.ref.Name)
if err != nil {
r.logger.Fatalf("Unknown clusterbus %q: %v", ref.Name, err)
r.logger.Fatalf("Unknown bus %q: %v", r.ref.String(), err)
}
r.bus = clusterBus.DeepCopy()
r.bus = bus.DeepCopy()
} else {
// reconciler is for a namespaced Bus
bus, err := r.busesLister.Buses(ref.Namespace).Get(ref.Name)
// reconciler is for a ClusterBus
clusterBus, err := r.clusterBusesLister.Get(r.ref.Name)
if err != nil {
r.logger.Fatalf("Unknown bus %q: %v", ref, err)
r.logger.Fatalf("Unknown clusterbus %q: %v", r.ref.String(), err)
}
r.bus = bus.DeepCopy()
r.bus = clusterBus.DeepCopy()
}

r.logger.Info("Starting workers")
Expand All @@ -330,7 +336,14 @@ func (r *Reconciler) Run(ref BusReference, threadiness int, stopCh <-chan struct
// WaitForCacheSync blocks returning until the reconciler's informers have
// synchronized. It returns an error if the caches cannot sync.
func (r *Reconciler) WaitForCacheSync(stopCh <-chan struct{}) error {
if ok := informercache.WaitForCacheSync(stopCh, r.busesSynced, r.clusterBusesSynced, r.channelsSynced, r.subscriptionsSynced); !ok {
var busesSynced informercache.InformerSynced
// get correct synced reference for bus type
if r.ref.IsNamespaced() {
busesSynced = r.busesSynced
} else {
busesSynced = r.clusterBusesSynced
}
if ok := informercache.WaitForCacheSync(stopCh, busesSynced, r.channelsSynced, r.subscriptionsSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
return nil
Expand Down Expand Up @@ -365,7 +378,7 @@ func (r *Reconciler) processNextWorkItem() bool {
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// form kind/namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
Expand All @@ -377,7 +390,8 @@ func (r *Reconciler) processNextWorkItem() bool {
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the name string of the resource to be synced.
// Run the syncHandler, passing it the kind/namespace/name key of the
// resource to be synced.
if err := r.syncHandler(key); err != nil {
r.workqueue.AddRateLimited(obj)
return fmt.Errorf("error syncing reconciler '%s': %v", key, err)
Expand All @@ -401,18 +415,13 @@ func (r *Reconciler) processNextWorkItem() bool {
// converge the two. It then updates the Status block of the resource with the
// current status.
func (r *Reconciler) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
// Convert the kind/namespace/name string into a distinct kind, namespace and name
kind, namespace, name, err := splitWorkqueueKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

if r.bus == nil && !(kind == busKind || kind == clusterBusKind) {
// don't attempt to sync until we have seen the bus for this reconciler
return fmt.Errorf("Unknown bus for reconciler")
}

switch kind {
case busKind:
err = r.syncBus(namespace, name)
Expand Down Expand Up @@ -448,7 +457,7 @@ func (r *Reconciler) syncBus(namespace string, name string) error {
}

// Sync the Bus
err = r.createOrUpdateBus(bus)
err = r.createOrUpdateBus(bus.DeepCopy())
if err != nil {
return err
}
Expand All @@ -470,7 +479,7 @@ func (r *Reconciler) syncClusterBus(name string) error {
}

// Sync the ClusterBus
err = r.createOrUpdateClusterBus(clusterBus)
err = r.createOrUpdateBus(clusterBus.DeepCopy())
if err != nil {
return err
}
Expand Down Expand Up @@ -530,36 +539,19 @@ func (r *Reconciler) syncSubscription(namespace string, name string) error {
return nil
}

func (r *Reconciler) createOrUpdateBus(bus *channelsv1alpha1.Bus) error {
if r.bus.GetObjectKind().GroupVersionKind().Kind != bus.Kind ||
bus.Namespace != r.bus.GetObjectMeta().GetNamespace() ||
bus.Name != r.bus.GetObjectMeta().GetName() {
// this is not our bus
return nil
}

previousBus := r.bus
r.bus = bus.DeepCopy()
if !equality.Semantic.DeepEqual(r.bus.GetSpec(), previousBus.GetSpec()) {
err := r.handler.onBus(r.bus, r)
if err != nil {
return err
}
}

return nil
}

func (r *Reconciler) createOrUpdateClusterBus(clusterBus *channelsv1alpha1.ClusterBus) error {
if r.bus.GetObjectKind().GroupVersionKind().Kind != clusterBus.Kind ||
clusterBus.Name != r.bus.GetObjectMeta().GetName() {
// this is not our clusterbus
func (r *Reconciler) createOrUpdateBus(bus channelsv1alpha1.GenericBus) error {
if r.ref != NewBusReference(bus) {
// not the bus for this reconciler
return nil
}

previousBus := r.bus
r.bus = clusterBus.DeepCopy()
if !equality.Semantic.DeepEqual(r.bus.GetSpec(), previousBus.GetSpec()) {
// stash the new bus on the reconciler while retaining the old bus. This
// operation is threadsafe because there is only a single Bus/ClusterBus
// that is valid for the Reconciler and the workqueue guarantees that it
// will not emit the same key concurrently. Any bus received is an updated
// revision of the current bus.
bus, r.bus = r.bus, bus
if !equality.Semantic.DeepEqual(r.bus.GetSpec(), bus.GetSpec()) {
err := r.handler.onBus(r.bus, r)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion pkg/buses/references.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@ func NewBusReferenceFromNames(name, namespace string) BusReference {
}
}

// IsNamespaced returns true is the reference is for a Bus and not a ClusterBus
func (r *BusReference) IsNamespaced() bool {
return r.Namespace != ""
}

func (r *BusReference) String() string {
if r.Namespace != "" {
if r.IsNamespaced() {
return fmt.Sprintf("%s/%s", r.Namespace, r.Name)
}
return r.Name
Expand Down
23 changes: 23 additions & 0 deletions pkg/buses/references_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,29 @@ func TestNewBusReferenceFromNames_ClusterBus(t *testing.T) {
}
}

func TestBusReference_IsNamespaced(t *testing.T) {
busRef := buses.BusReference{
Name: referencesTestBusName,
Namespace: referencesTestNamespace,
}
expected := true
actual := busRef.IsNamespaced()
if expected != actual {
t.Errorf("%s expected: %+v got: %+v", "IsNamespaced", expected, actual)
}
}

func TestBusReference_IsNamespaced_ClusterBus(t *testing.T) {
busRef := buses.BusReference{
Name: referencesTestClusterBusName,
}
expected := false
actual := busRef.IsNamespaced()
if expected != actual {
t.Errorf("%s expected: %+v got: %+v", "IsNamespaced", expected, actual)
}
}

func TestBusReference_String(t *testing.T) {
ref := buses.BusReference{
Name: referencesTestBusName,
Expand Down