Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions pkg/buses/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

type bus struct {
busRef BusReference
ref BusReference
handlerFuncs EventHandlerFuncs

reconciler *Reconciler
Expand Down Expand Up @@ -74,7 +74,7 @@ type BusProvisioner interface {
// NewBusProvisioner creates a new provisioner for a specific bus.
// EventHandlerFuncs are used to be notified when a channel or subscription is
// created, updated or removed.
func NewBusProvisioner(busRef BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusProvisioner {
func NewBusProvisioner(ref BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusProvisioner {
if opts == nil {
opts = &BusOpts{}
}
Expand All @@ -95,7 +95,7 @@ func NewBusProvisioner(busRef BusReference, handlerFuncs EventHandlerFuncs, opts
}

return &bus{
busRef: busRef,
ref: ref,
handlerFuncs: handlerFuncs,

cache: opts.Cache,
Expand All @@ -109,13 +109,13 @@ func NewBusProvisioner(busRef BusReference, handlerFuncs EventHandlerFuncs, opts
// infrastructure.
type BusDispatcher interface {
Run(threadiness int, stopCh <-chan struct{})
DispatchMessage(subscriptionRef SubscriptionReference, message *Message) error
DispatchMessage(subscription SubscriptionReference, message *Message) error
}

// NewBusDispatcher creates a new dispatcher for a specific bus.
// EventHandlerFuncs are used to be notified when a subscription is created,
// updated or removed, or a message is received.
func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusDispatcher {
func NewBusDispatcher(ref BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusDispatcher {
var b *bus

if opts == nil {
Expand All @@ -140,13 +140,13 @@ func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts
opts.MessageDispatcher = NewMessageDispatcher(opts.Logger.Named(dispatcherLoggingComponent))
}
if opts.MessageReceiver == nil {
opts.MessageReceiver = NewMessageReceiver(func(channelRef ChannelReference, message *Message) error {
return b.receiveMessage(channelRef, message)
opts.MessageReceiver = NewMessageReceiver(func(channel ChannelReference, message *Message) error {
return b.receiveMessage(channel, message)
}, opts.Logger.Named(receiverLoggingComponent))
}

b = &bus{
busRef: busRef,
ref: ref,
handlerFuncs: handlerFuncs,

cache: opts.Cache,
Expand All @@ -162,7 +162,7 @@ func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts

// Run starts the bus's processing.
func (b bus) Run(threadiness int, stopCh <-chan struct{}) {
go b.reconciler.Run(b.busRef, threadiness, stopCh)
go b.reconciler.Run(b.ref, threadiness, stopCh)
b.reconciler.WaitForCacheSync(stopCh)
if b.receiver != nil {
go b.receiver.Run(stopCh)
Expand All @@ -171,20 +171,20 @@ func (b bus) Run(threadiness int, stopCh <-chan struct{}) {
<-stopCh
}

func (b *bus) receiveMessage(channelRef ChannelReference, message *Message) error {
_, err := b.cache.Channel(channelRef)
func (b *bus) receiveMessage(channel ChannelReference, message *Message) error {
_, err := b.cache.Channel(channel)
if err != nil {
return ErrUnknownChannel
}
return b.handlerFuncs.onReceiveMessage(channelRef, message)
return b.handlerFuncs.onReceiveMessage(channel, message)
}

// DispatchMessage sends a message to a subscriber. This function is only
// avilable for bus dispatchers.
func (b *bus) DispatchMessage(subscriptionRef SubscriptionReference, message *Message) error {
subscription, err := b.cache.Subscription(subscriptionRef)
func (b *bus) DispatchMessage(ref SubscriptionReference, message *Message) error {
subscription, err := b.cache.Subscription(ref)
if err != nil {
return fmt.Errorf("unable to dispatch to unknown subscription %q", subscriptionRef.String())
return fmt.Errorf("unable to dispatch to unknown subscription %q", ref.String())
}
return b.dispatchMessage(subscription, message)
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/buses/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ type Cache struct {

// Channel returns a cached channel for provided reference or an error if the
// channel is not in the cache.
func (c *Cache) Channel(channelRef ChannelReference) (*channelsv1alpha1.Channel, error) {
channel, ok := c.channels[channelRef]
func (c *Cache) Channel(ref ChannelReference) (*channelsv1alpha1.Channel, error) {
channel, ok := c.channels[ref]
if !ok {
return nil, fmt.Errorf("unknown channel %q", channelRef.String())
return nil, fmt.Errorf("unknown channel %q", ref.String())
}
return channel, nil
}

// Subscription returns a cached subscription for provided reference or an
// error if the subscription is not in the cache.
func (c *Cache) Subscription(subscriptionRef SubscriptionReference) (*channelsv1alpha1.Subscription, error) {
subscription, ok := c.subscriptions[subscriptionRef]
func (c *Cache) Subscription(ref SubscriptionReference) (*channelsv1alpha1.Subscription, error) {
subscription, ok := c.subscriptions[ref]
if !ok {
return nil, fmt.Errorf("unknown subscription %q", subscriptionRef.String())
return nil, fmt.Errorf("unknown subscription %q", ref.String())
}
return subscription, nil
}
Expand All @@ -65,17 +65,17 @@ func (c *Cache) AddChannel(channel *channelsv1alpha1.Channel) {
if channel == nil {
return
}
channelRef := NewChannelReference(channel)
c.channels[channelRef] = channel
ref := NewChannelReference(channel)
c.channels[ref] = channel
}

// RemoveChannel removes the provided channel from the cache.
func (c *Cache) RemoveChannel(channel *channelsv1alpha1.Channel) {
if channel == nil {
return
}
channelRef := NewChannelReference(channel)
delete(c.channels, channelRef)
ref := NewChannelReference(channel)
delete(c.channels, ref)
}

// AddSubscription adds, or updates, the provided subscription to the cache for
Expand All @@ -84,15 +84,15 @@ func (c *Cache) AddSubscription(subscription *channelsv1alpha1.Subscription) {
if subscription == nil {
return
}
subscriptionRef := NewSubscriptionReference(subscription)
c.subscriptions[subscriptionRef] = subscription
ref := NewSubscriptionReference(subscription)
c.subscriptions[ref] = subscription
}

// RemoveSubscription removes the provided subscription from the cache.
func (c *Cache) RemoveSubscription(subscription *channelsv1alpha1.Subscription) {
if subscription == nil {
return
}
subscriptionRef := NewSubscriptionReference(subscription)
delete(c.subscriptions, subscriptionRef)
ref := NewSubscriptionReference(subscription)
delete(c.subscriptions, ref)
}
44 changes: 22 additions & 22 deletions pkg/buses/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ const (

func TestCacheErrsForUnknownChannel(t *testing.T) {
cache := buses.NewCache()
channelRef := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace)
ref := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace)
var expected *channelsv1alpha1.Channel
actual, err := cache.Channel(channelRef)
actual, err := cache.Channel(ref)
if err == nil {
t.Errorf("%s expected: %+v got: %+v", "Error", "<error>", err)
}
Expand All @@ -45,10 +45,10 @@ func TestCacheErrsForUnknownChannel(t *testing.T) {

func TestCacheRetrievesKnownChannel(t *testing.T) {
cache := buses.NewCache()
channelRef := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace)
expected := makeChannel(channelRef)
ref := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace)
expected := makeChannel(ref)
cache.AddChannel(expected)
actual, err := cache.Channel(channelRef)
actual, err := cache.Channel(ref)
if err != nil {
t.Errorf("%s expected: %+v got: %+v", "Unexpected error", nil, err)
}
Expand All @@ -59,12 +59,12 @@ func TestCacheRetrievesKnownChannel(t *testing.T) {

func TestCacheRemovesKnownChannel(t *testing.T) {
cache := buses.NewCache()
channelRef := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace)
channel := makeChannel(channelRef)
ref := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace)
channel := makeChannel(ref)
cache.AddChannel(channel)
cache.RemoveChannel(channel)
var expected *channelsv1alpha1.Channel
actual, err := cache.Channel(channelRef)
actual, err := cache.Channel(ref)
if err == nil {
t.Errorf("%s expected: %+v got: %+v", "Unexpected error", nil, err)
}
Expand All @@ -82,9 +82,9 @@ func TestCacheNilChannel(t *testing.T) {

func TestCacheErrsForUnknownSubscription(t *testing.T) {
cache := buses.NewCache()
subscriptionRef := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace)
ref := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace)
var expected *channelsv1alpha1.Subscription
actual, err := cache.Subscription(subscriptionRef)
actual, err := cache.Subscription(ref)
if err == nil {
t.Errorf("%s expected: %+v got: %+v", "Error", "<error>", err)
}
Expand All @@ -95,10 +95,10 @@ func TestCacheErrsForUnknownSubscription(t *testing.T) {

func TestCacheRetrievesKnownSubscription(t *testing.T) {
cache := buses.NewCache()
subscriptionRef := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace)
expected := makeSubscription(subscriptionRef)
ref := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace)
expected := makeSubscription(ref)
cache.AddSubscription(expected)
actual, err := cache.Subscription(subscriptionRef)
actual, err := cache.Subscription(ref)
if err != nil {
t.Errorf("%s expected: %+v got: %+v", "Unexpected error", nil, err)
}
Expand All @@ -109,12 +109,12 @@ func TestCacheRetrievesKnownSubscription(t *testing.T) {

func TestCacheRemovesKnownSubscription(t *testing.T) {
cache := buses.NewCache()
subscriptionRef := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace)
subscription := makeSubscription(subscriptionRef)
ref := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace)
subscription := makeSubscription(ref)
cache.AddSubscription(subscription)
cache.RemoveSubscription(subscription)
var expected *channelsv1alpha1.Subscription
actual, err := cache.Subscription(subscriptionRef)
actual, err := cache.Subscription(ref)
if err == nil {
t.Errorf("%s expected: %+v got: %+v", "Unexpected error", nil, err)
}
Expand All @@ -130,20 +130,20 @@ func TestCacheNilSubscription(t *testing.T) {
cache.RemoveSubscription(subscription)
}

func makeChannel(channelRef buses.ChannelReference) *channelsv1alpha1.Channel {
func makeChannel(ref buses.ChannelReference) *channelsv1alpha1.Channel {
return &channelsv1alpha1.Channel{
ObjectMeta: metav1.ObjectMeta{
Name: channelRef.Name,
Namespace: channelRef.Namespace,
Name: ref.Name,
Namespace: ref.Namespace,
},
}
}

func makeSubscription(subscriptionRef buses.SubscriptionReference) *channelsv1alpha1.Subscription {
func makeSubscription(ref buses.SubscriptionReference) *channelsv1alpha1.Subscription {
return &channelsv1alpha1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Name: subscriptionRef.Name,
Namespace: subscriptionRef.Namespace,
Name: ref.Name,
Namespace: ref.Namespace,
},
}
}
Loading