Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions pkg/buses/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,14 @@ func (m *Monitor) Run(busNamespace, busName string, threadiness int, stopCh <-ch
if err != nil {
glog.Fatalf("Unknown clusterbus %q", busName)
}
m.bus = clusterBus
m.bus = clusterBus.DeepCopy()
} else {
// monitor is for a namespaced Bus
bus, err := m.busesLister.Buses(busNamespace).Get(busName)
if err != nil {
glog.Fatalf("Unknown bus '%s/%s'", busNamespace, busName)
}
m.bus = bus
m.bus = bus.DeepCopy()
}

glog.Info("Starting workers")
Expand Down Expand Up @@ -787,14 +787,15 @@ func (m *Monitor) getOrCreateChannelSummary(key channelKey) *channelSummary {
}

func (m *Monitor) createOrUpdateBus(bus *channelsv1alpha1.Bus) error {
if bus.Namespace != m.bus.GetObjectMeta().GetNamespace() ||
if m.bus.GetObjectKind().GroupVersionKind().Kind != bus.Kind ||
bus.Namespace != m.bus.GetObjectMeta().GetNamespace() ||
bus.Name != m.bus.GetObjectMeta().GetName() {
// this is not our bus
return nil
}

if !reflect.DeepEqual(m.bus.GetSpec(), bus.Spec) {
m.bus = bus
m.bus = bus.DeepCopy()
err := m.handler.onBus(bus, m)
if err != nil {
return err
Expand All @@ -805,13 +806,14 @@ func (m *Monitor) createOrUpdateBus(bus *channelsv1alpha1.Bus) error {
}

func (m *Monitor) createOrUpdateClusterBus(clusterBus *channelsv1alpha1.ClusterBus) error {
if clusterBus.Name != m.bus.GetObjectMeta().GetName() {
if m.bus.GetObjectKind().GroupVersionKind().Kind != clusterBus.Kind ||
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we were sometimes passing a ClusterBus to createOrUpdateBus and vice-versa?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createOrUpdateClusterBus will only receive a ClusterBus and createOrUpdateBus will only receive a Bus. The issue is that they both assign to m.bus which is a GenericBus interface that can be either.

The monitor is listening for Bus/ClusterBus so that it can receive updated parameters for channels and subscriptions. Realistically, it should only ever need to listen for either Buses or ClusterBuses, never both.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is a method on the monitor; I keep forgetting that these controllers aren't stateless (and each Monitor will see all the Bus & ClusterBus updates and choose which ones apply to its stored bus).

/lgtm
/approve

clusterBus.Name != m.bus.GetObjectMeta().GetName() {
// this is not our clusterbus
return nil
}

if !reflect.DeepEqual(m.bus.GetSpec(), clusterBus.Spec) {
m.bus = clusterBus
m.bus = clusterBus.DeepCopy()
err := m.handler.onBus(clusterBus, m)
if err != nil {
return err
Expand Down