diff --git a/pkg/buses/bus.go b/pkg/buses/bus.go index 63614d5bd5e..cda926757ec 100644 --- a/pkg/buses/bus.go +++ b/pkg/buses/bus.go @@ -25,7 +25,7 @@ import ( ) type bus struct { - busRef BusReference + ref BusReference handlerFuncs EventHandlerFuncs reconciler *Reconciler @@ -74,7 +74,7 @@ type BusProvisioner interface { // NewBusProvisioner creates a new provisioner for a specific bus. // EventHandlerFuncs are used to be notified when a channel or subscription is // created, updated or removed. -func NewBusProvisioner(busRef BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusProvisioner { +func NewBusProvisioner(ref BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusProvisioner { if opts == nil { opts = &BusOpts{} } @@ -95,7 +95,7 @@ func NewBusProvisioner(busRef BusReference, handlerFuncs EventHandlerFuncs, opts } return &bus{ - busRef: busRef, + ref: ref, handlerFuncs: handlerFuncs, cache: opts.Cache, @@ -109,13 +109,13 @@ func NewBusProvisioner(busRef BusReference, handlerFuncs EventHandlerFuncs, opts // infrastructure. type BusDispatcher interface { Run(threadiness int, stopCh <-chan struct{}) - DispatchMessage(subscriptionRef SubscriptionReference, message *Message) error + DispatchMessage(subscription SubscriptionReference, message *Message) error } // NewBusDispatcher creates a new dispatcher for a specific bus. // EventHandlerFuncs are used to be notified when a subscription is created, // updated or removed, or a message is received. -func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusDispatcher { +func NewBusDispatcher(ref BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusDispatcher { var b *bus if opts == nil { @@ -140,13 +140,13 @@ func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts opts.MessageDispatcher = NewMessageDispatcher(opts.Logger.Named(dispatcherLoggingComponent)) } if opts.MessageReceiver == nil { - opts.MessageReceiver = NewMessageReceiver(func(channelRef ChannelReference, message *Message) error { - return b.receiveMessage(channelRef, message) + opts.MessageReceiver = NewMessageReceiver(func(channel ChannelReference, message *Message) error { + return b.receiveMessage(channel, message) }, opts.Logger.Named(receiverLoggingComponent)) } b = &bus{ - busRef: busRef, + ref: ref, handlerFuncs: handlerFuncs, cache: opts.Cache, @@ -162,7 +162,7 @@ func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts // Run starts the bus's processing. func (b bus) Run(threadiness int, stopCh <-chan struct{}) { - go b.reconciler.Run(b.busRef, threadiness, stopCh) + go b.reconciler.Run(b.ref, threadiness, stopCh) b.reconciler.WaitForCacheSync(stopCh) if b.receiver != nil { go b.receiver.Run(stopCh) @@ -171,20 +171,20 @@ func (b bus) Run(threadiness int, stopCh <-chan struct{}) { <-stopCh } -func (b *bus) receiveMessage(channelRef ChannelReference, message *Message) error { - _, err := b.cache.Channel(channelRef) +func (b *bus) receiveMessage(channel ChannelReference, message *Message) error { + _, err := b.cache.Channel(channel) if err != nil { return ErrUnknownChannel } - return b.handlerFuncs.onReceiveMessage(channelRef, message) + return b.handlerFuncs.onReceiveMessage(channel, message) } // DispatchMessage sends a message to a subscriber. This function is only // avilable for bus dispatchers. -func (b *bus) DispatchMessage(subscriptionRef SubscriptionReference, message *Message) error { - subscription, err := b.cache.Subscription(subscriptionRef) +func (b *bus) DispatchMessage(ref SubscriptionReference, message *Message) error { + subscription, err := b.cache.Subscription(ref) if err != nil { - return fmt.Errorf("unable to dispatch to unknown subscription %q", subscriptionRef.String()) + return fmt.Errorf("unable to dispatch to unknown subscription %q", ref.String()) } return b.dispatchMessage(subscription, message) } diff --git a/pkg/buses/cache.go b/pkg/buses/cache.go index b263d140745..430e209e5ad 100644 --- a/pkg/buses/cache.go +++ b/pkg/buses/cache.go @@ -41,20 +41,20 @@ type Cache struct { // Channel returns a cached channel for provided reference or an error if the // channel is not in the cache. -func (c *Cache) Channel(channelRef ChannelReference) (*channelsv1alpha1.Channel, error) { - channel, ok := c.channels[channelRef] +func (c *Cache) Channel(ref ChannelReference) (*channelsv1alpha1.Channel, error) { + channel, ok := c.channels[ref] if !ok { - return nil, fmt.Errorf("unknown channel %q", channelRef.String()) + return nil, fmt.Errorf("unknown channel %q", ref.String()) } return channel, nil } // Subscription returns a cached subscription for provided reference or an // error if the subscription is not in the cache. -func (c *Cache) Subscription(subscriptionRef SubscriptionReference) (*channelsv1alpha1.Subscription, error) { - subscription, ok := c.subscriptions[subscriptionRef] +func (c *Cache) Subscription(ref SubscriptionReference) (*channelsv1alpha1.Subscription, error) { + subscription, ok := c.subscriptions[ref] if !ok { - return nil, fmt.Errorf("unknown subscription %q", subscriptionRef.String()) + return nil, fmt.Errorf("unknown subscription %q", ref.String()) } return subscription, nil } @@ -65,8 +65,8 @@ func (c *Cache) AddChannel(channel *channelsv1alpha1.Channel) { if channel == nil { return } - channelRef := NewChannelReference(channel) - c.channels[channelRef] = channel + ref := NewChannelReference(channel) + c.channels[ref] = channel } // RemoveChannel removes the provided channel from the cache. @@ -74,8 +74,8 @@ func (c *Cache) RemoveChannel(channel *channelsv1alpha1.Channel) { if channel == nil { return } - channelRef := NewChannelReference(channel) - delete(c.channels, channelRef) + ref := NewChannelReference(channel) + delete(c.channels, ref) } // AddSubscription adds, or updates, the provided subscription to the cache for @@ -84,8 +84,8 @@ func (c *Cache) AddSubscription(subscription *channelsv1alpha1.Subscription) { if subscription == nil { return } - subscriptionRef := NewSubscriptionReference(subscription) - c.subscriptions[subscriptionRef] = subscription + ref := NewSubscriptionReference(subscription) + c.subscriptions[ref] = subscription } // RemoveSubscription removes the provided subscription from the cache. @@ -93,6 +93,6 @@ func (c *Cache) RemoveSubscription(subscription *channelsv1alpha1.Subscription) if subscription == nil { return } - subscriptionRef := NewSubscriptionReference(subscription) - delete(c.subscriptions, subscriptionRef) + ref := NewSubscriptionReference(subscription) + delete(c.subscriptions, ref) } diff --git a/pkg/buses/cache_test.go b/pkg/buses/cache_test.go index e6c38c8663e..5b2f8cee9e0 100644 --- a/pkg/buses/cache_test.go +++ b/pkg/buses/cache_test.go @@ -32,9 +32,9 @@ const ( func TestCacheErrsForUnknownChannel(t *testing.T) { cache := buses.NewCache() - channelRef := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace) + ref := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace) var expected *channelsv1alpha1.Channel - actual, err := cache.Channel(channelRef) + actual, err := cache.Channel(ref) if err == nil { t.Errorf("%s expected: %+v got: %+v", "Error", "", err) } @@ -45,10 +45,10 @@ func TestCacheErrsForUnknownChannel(t *testing.T) { func TestCacheRetrievesKnownChannel(t *testing.T) { cache := buses.NewCache() - channelRef := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace) - expected := makeChannel(channelRef) + ref := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace) + expected := makeChannel(ref) cache.AddChannel(expected) - actual, err := cache.Channel(channelRef) + actual, err := cache.Channel(ref) if err != nil { t.Errorf("%s expected: %+v got: %+v", "Unexpected error", nil, err) } @@ -59,12 +59,12 @@ func TestCacheRetrievesKnownChannel(t *testing.T) { func TestCacheRemovesKnownChannel(t *testing.T) { cache := buses.NewCache() - channelRef := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace) - channel := makeChannel(channelRef) + ref := buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace) + channel := makeChannel(ref) cache.AddChannel(channel) cache.RemoveChannel(channel) var expected *channelsv1alpha1.Channel - actual, err := cache.Channel(channelRef) + actual, err := cache.Channel(ref) if err == nil { t.Errorf("%s expected: %+v got: %+v", "Unexpected error", nil, err) } @@ -82,9 +82,9 @@ func TestCacheNilChannel(t *testing.T) { func TestCacheErrsForUnknownSubscription(t *testing.T) { cache := buses.NewCache() - subscriptionRef := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace) + ref := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace) var expected *channelsv1alpha1.Subscription - actual, err := cache.Subscription(subscriptionRef) + actual, err := cache.Subscription(ref) if err == nil { t.Errorf("%s expected: %+v got: %+v", "Error", "", err) } @@ -95,10 +95,10 @@ func TestCacheErrsForUnknownSubscription(t *testing.T) { func TestCacheRetrievesKnownSubscription(t *testing.T) { cache := buses.NewCache() - subscriptionRef := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace) - expected := makeSubscription(subscriptionRef) + ref := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace) + expected := makeSubscription(ref) cache.AddSubscription(expected) - actual, err := cache.Subscription(subscriptionRef) + actual, err := cache.Subscription(ref) if err != nil { t.Errorf("%s expected: %+v got: %+v", "Unexpected error", nil, err) } @@ -109,12 +109,12 @@ func TestCacheRetrievesKnownSubscription(t *testing.T) { func TestCacheRemovesKnownSubscription(t *testing.T) { cache := buses.NewCache() - subscriptionRef := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace) - subscription := makeSubscription(subscriptionRef) + ref := buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace) + subscription := makeSubscription(ref) cache.AddSubscription(subscription) cache.RemoveSubscription(subscription) var expected *channelsv1alpha1.Subscription - actual, err := cache.Subscription(subscriptionRef) + actual, err := cache.Subscription(ref) if err == nil { t.Errorf("%s expected: %+v got: %+v", "Unexpected error", nil, err) } @@ -130,20 +130,20 @@ func TestCacheNilSubscription(t *testing.T) { cache.RemoveSubscription(subscription) } -func makeChannel(channelRef buses.ChannelReference) *channelsv1alpha1.Channel { +func makeChannel(ref buses.ChannelReference) *channelsv1alpha1.Channel { return &channelsv1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ - Name: channelRef.Name, - Namespace: channelRef.Namespace, + Name: ref.Name, + Namespace: ref.Namespace, }, } } -func makeSubscription(subscriptionRef buses.SubscriptionReference) *channelsv1alpha1.Subscription { +func makeSubscription(ref buses.SubscriptionReference) *channelsv1alpha1.Subscription { return &channelsv1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ - Name: subscriptionRef.Name, - Namespace: subscriptionRef.Namespace, + Name: ref.Name, + Namespace: ref.Namespace, }, } } diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go index 79b6ae8fbc4..d8385b0e19d 100644 --- a/pkg/buses/gcppubsub/bus.go +++ b/pkg/buses/gcppubsub/bus.go @@ -29,7 +29,7 @@ import ( const BusType = "gcppubsub" type CloudPubSubBus struct { - busRef buses.BusReference + ref buses.BusReference reconciler *buses.Reconciler dispatcher buses.BusDispatcher provisioner buses.BusProvisioner @@ -40,7 +40,7 @@ type CloudPubSubBus struct { logger *zap.SugaredLogger } -func NewCloudPubSubBusDispatcher(busRef buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { +func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { ctx := context.Background() pubsubClient, err := pubsub.NewClient(ctx, projectID) if err != nil { @@ -48,29 +48,29 @@ func NewCloudPubSubBusDispatcher(busRef buses.BusReference, projectID string, op } bus := &CloudPubSubBus{ - busRef: busRef, + ref: ref, pubsubClient: pubsubClient, } eventHandlers := buses.EventHandlerFuncs{ - SubscribeFunc: func(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters) error { - return bus.startReceivingEvents(subscriptionRef, parameters) + SubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference, parameters buses.ResolvedParameters) error { + return bus.startReceivingEvents(subscription, parameters) }, - UnsubscribeFunc: func(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference) error { - bus.stopReceivingEvents(subscriptionRef) + UnsubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference) error { + bus.stopReceivingEvents(subscription) return nil }, - ReceiveMessageFunc: func(channelRef buses.ChannelReference, messgae *buses.Message) error { - return bus.sendEventToTopic(channelRef, messgae) + ReceiveMessageFunc: func(channel buses.ChannelReference, messgae *buses.Message) error { + return bus.sendEventToTopic(channel, messgae) }, } - bus.dispatcher = buses.NewBusDispatcher(busRef, eventHandlers, opts) + bus.dispatcher = buses.NewBusDispatcher(ref, eventHandlers, opts) bus.logger = opts.Logger bus.reconciler = opts.Reconciler bus.receivers = make(map[string]context.CancelFunc) return bus, nil } -func NewCloudPubSubBusProvisioner(busRef buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { +func NewCloudPubSubBusProvisioner(ref buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { ctx := context.Background() pubsubClient, err := pubsub.NewClient(ctx, projectID) if err != nil { @@ -78,24 +78,24 @@ func NewCloudPubSubBusProvisioner(busRef buses.BusReference, projectID string, o } bus := &CloudPubSubBus{ - busRef: busRef, + ref: ref, pubsubClient: pubsubClient, } eventHandlers := buses.EventHandlerFuncs{ - ProvisionFunc: func(channelRef buses.ChannelReference, parameters buses.ResolvedParameters) error { - return bus.createTopic(channelRef, parameters) + ProvisionFunc: func(channel buses.ChannelReference, parameters buses.ResolvedParameters) error { + return bus.createTopic(channel, parameters) }, - UnprovisionFunc: func(channelRef buses.ChannelReference) error { - return bus.deleteTopic(channelRef) + UnprovisionFunc: func(channel buses.ChannelReference) error { + return bus.deleteTopic(channel) }, - SubscribeFunc: func(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters) error { - return bus.createOrUpdateSubscription(channelRef, subscriptionRef, parameters) + SubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference, parameters buses.ResolvedParameters) error { + return bus.createOrUpdateSubscription(channel, subscription, parameters) }, - UnsubscribeFunc: func(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference) error { - return bus.deleteSubscription(subscriptionRef) + UnsubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference) error { + return bus.deleteSubscription(subscription) }, } - bus.provisioner = buses.NewBusProvisioner(busRef, eventHandlers, opts) + bus.provisioner = buses.NewBusProvisioner(ref, eventHandlers, opts) bus.logger = opts.Logger return bus, nil } @@ -113,14 +113,14 @@ func (b *CloudPubSubBus) Run(threadness int, stopCh <-chan struct{}) { // Knative Subscription. This method will not block, but will continue to // receive events until either this method or StopReceivingEvents is called for // the same Subscription. -func (b *CloudPubSubBus) startReceivingEvents(subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters) error { +func (b *CloudPubSubBus) startReceivingEvents(ref buses.SubscriptionReference, parameters buses.ResolvedParameters) error { ctx := context.Background() cctx, cancel := context.WithCancel(ctx) // cancel current subscription receiver, if any - b.stopReceivingEvents(subscriptionRef) + b.stopReceivingEvents(ref) - subscriptionID := b.subscriptionID(subscriptionRef) + subscriptionID := b.subscriptionID(ref) subscription := b.pubsubClient.Subscription(subscriptionID) b.receivers[subscriptionID] = cancel @@ -139,12 +139,12 @@ func (b *CloudPubSubBus) startReceivingEvents(subscriptionRef buses.Subscription Headers: pubsubMessage.Attributes, Payload: pubsubMessage.Data, } - err := b.dispatcher.DispatchMessage(subscriptionRef, message) + err := b.dispatcher.DispatchMessage(ref, message) if err != nil { - b.logger.Warnf("Unable to dispatch event %q to %q", pubsubMessage.ID, subscriptionRef.String()) + b.logger.Warnf("Unable to dispatch event %q to %q", pubsubMessage.ID, ref.String()) pubsubMessage.Nack() } else { - b.logger.Infof("Dispatched event %q to %q", pubsubMessage.ID, subscriptionRef.String()) + b.logger.Infof("Dispatched event %q to %q", pubsubMessage.ID, ref.String()) pubsubMessage.Ack() } }) @@ -152,7 +152,7 @@ func (b *CloudPubSubBus) startReceivingEvents(subscriptionRef buses.Subscription b.logger.Errorf("Error receiving messesages for %q: %v", subscriptionID, err) } delete(b.receivers, subscriptionID) - b.reconciler.RequeueSubscription(subscriptionRef) + b.reconciler.RequeueSubscription(ref) }() return nil @@ -161,8 +161,8 @@ func (b *CloudPubSubBus) startReceivingEvents(subscriptionRef buses.Subscription // stopReceivingEvents stops receiving events for a previous call to // StartReceivingEvents. Calls for a Subscription that is not not actively receiving // are ignored. -func (b *CloudPubSubBus) stopReceivingEvents(subscriptionRef buses.SubscriptionReference) { - subscriptionID := b.subscriptionID(subscriptionRef) +func (b *CloudPubSubBus) stopReceivingEvents(ref buses.SubscriptionReference) { + subscriptionID := b.subscriptionID(ref) if cancel, ok := b.receivers[subscriptionID]; ok { b.logger.Infof("Stop receiving events for subscription %q", subscriptionID) cancel() @@ -172,10 +172,10 @@ func (b *CloudPubSubBus) stopReceivingEvents(subscriptionRef buses.SubscriptionR // sendEventToTopic sends a message to the Cloud Pub/Sub Topic backing the // Channel. -func (b *CloudPubSubBus) sendEventToTopic(channelRef buses.ChannelReference, message *buses.Message) error { +func (b *CloudPubSubBus) sendEventToTopic(channel buses.ChannelReference, message *buses.Message) error { ctx := context.Background() - topicID := b.topicID(channelRef) + topicID := b.topicID(channel) topic := b.pubsubClient.Topic(topicID) result := topic.Publish(ctx, &pubsub.Message{ @@ -195,10 +195,10 @@ func (b *CloudPubSubBus) sendEventToTopic(channelRef buses.ChannelReference, mes } // createTopic creates a Topic in Cloud Pub/Sub for the Channel. -func (b *CloudPubSubBus) createTopic(channelRef buses.ChannelReference, parameters buses.ResolvedParameters) error { +func (b *CloudPubSubBus) createTopic(channel buses.ChannelReference, parameters buses.ResolvedParameters) error { ctx := context.Background() - topicID := b.topicID(channelRef) + topicID := b.topicID(channel) topic := b.pubsubClient.Topic(topicID) // check if topic exists before creating @@ -218,10 +218,10 @@ func (b *CloudPubSubBus) createTopic(channelRef buses.ChannelReference, paramete } // deleteTopic deletes the Topic in Cloud Pub/Sub for the Channel. -func (b *CloudPubSubBus) deleteTopic(channelRef buses.ChannelReference) error { +func (b *CloudPubSubBus) deleteTopic(channel buses.ChannelReference) error { ctx := context.Background() - topicID := b.topicID(channelRef) + topicID := b.topicID(channel) topic := b.pubsubClient.Topic(topicID) // check if topic exists before deleting @@ -238,10 +238,10 @@ func (b *CloudPubSubBus) deleteTopic(channelRef buses.ChannelReference) error { // createOrUpdateSubscription creates a Subscription in Cloud Pub/Sub for the // Knative Subscription, or idempotently updates a Subscription if it already // exists. -func (b *CloudPubSubBus) createOrUpdateSubscription(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters) error { +func (b *CloudPubSubBus) createOrUpdateSubscription(channel buses.ChannelReference, ref buses.SubscriptionReference, parameters buses.ResolvedParameters) error { ctx := context.Background() - subscriptionID := b.subscriptionID(subscriptionRef) + subscriptionID := b.subscriptionID(ref) subscription := b.pubsubClient.Subscription(subscriptionID) // check if subscription exists before creating @@ -255,7 +255,7 @@ func (b *CloudPubSubBus) createOrUpdateSubscription(channelRef buses.ChannelRefe } // create subscription - topicID := b.topicID(channelRef) + topicID := b.topicID(channel) topic := b.pubsubClient.Topic(topicID) b.logger.Infof("Create subscription %q for topic %q", subscriptionID, topicID) subscription, err := b.pubsubClient.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{ @@ -266,10 +266,10 @@ func (b *CloudPubSubBus) createOrUpdateSubscription(channelRef buses.ChannelRefe // deleteSubscription removes a Subscription from Cloud Pub/Sub for a Knative // Subscription. -func (b *CloudPubSubBus) deleteSubscription(subscriptionRef buses.SubscriptionReference) error { +func (b *CloudPubSubBus) deleteSubscription(ref buses.SubscriptionReference) error { ctx := context.Background() - subscriptionID := b.subscriptionID(subscriptionRef) + subscriptionID := b.subscriptionID(ref) subscription := b.pubsubClient.Subscription(subscriptionID) // check if subscription exists before deleting @@ -283,10 +283,10 @@ func (b *CloudPubSubBus) deleteSubscription(subscriptionRef buses.SubscriptionRe return subscription.Delete(ctx) } -func (b *CloudPubSubBus) topicID(channelRef buses.ChannelReference) string { - return fmt.Sprintf("channel-%s-%s-%s", b.busRef.Name, channelRef.Namespace, channelRef.Name) +func (b *CloudPubSubBus) topicID(channel buses.ChannelReference) string { + return fmt.Sprintf("channel-%s-%s-%s", b.ref.Name, channel.Namespace, channel.Name) } -func (b *CloudPubSubBus) subscriptionID(subscriptionRef buses.SubscriptionReference) string { - return fmt.Sprintf("subscription-%s-%s-%s", b.busRef.Name, subscriptionRef.Namespace, subscriptionRef.Name) +func (b *CloudPubSubBus) subscriptionID(subscription buses.SubscriptionReference) string { + return fmt.Sprintf("subscription-%s-%s-%s", b.ref.Name, subscription.Namespace, subscription.Name) } diff --git a/pkg/buses/gcppubsub/dispatcher/main.go b/pkg/buses/gcppubsub/dispatcher/main.go index 230fd444249..8718b170e81 100644 --- a/pkg/buses/gcppubsub/dispatcher/main.go +++ b/pkg/buses/gcppubsub/dispatcher/main.go @@ -31,7 +31,7 @@ const ( ) func main() { - busRef := buses.NewBusReferenceFromNames( + ref := buses.NewBusReferenceFromNames( os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) @@ -40,7 +40,7 @@ func main() { logger := buses.NewBusLoggerFromConfig(config) defer logger.Sync() logger = logger.With( - zap.String("channels.knative.dev/bus", busRef.String()), + zap.String("channels.knative.dev/bus", ref.String()), zap.String("channels.knative.dev/busType", gcppubsub.BusType), zap.String("channels.knative.dev/busComponent", buses.Dispatcher), ) @@ -58,7 +58,7 @@ func main() { flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus, err := gcppubsub.NewCloudPubSubBusDispatcher(busRef, projectID, opts) + bus, err := gcppubsub.NewCloudPubSubBusDispatcher(ref, projectID, opts) if err != nil { logger.Fatalf("Error starting pub/sub bus dispatcher: %v", err) } diff --git a/pkg/buses/gcppubsub/provisioner/main.go b/pkg/buses/gcppubsub/provisioner/main.go index 56a817935b2..e2a4fd87384 100644 --- a/pkg/buses/gcppubsub/provisioner/main.go +++ b/pkg/buses/gcppubsub/provisioner/main.go @@ -31,7 +31,7 @@ const ( ) func main() { - busRef := buses.NewBusReferenceFromNames( + ref := buses.NewBusReferenceFromNames( os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) @@ -40,7 +40,7 @@ func main() { logger := buses.NewBusLoggerFromConfig(config) defer logger.Sync() logger = logger.With( - zap.String("channels.knative.dev/bus", busRef.String()), + zap.String("channels.knative.dev/bus", ref.String()), zap.String("channels.knative.dev/busType", gcppubsub.BusType), zap.String("channels.knative.dev/busComponent", buses.Provisioner), ) @@ -57,7 +57,7 @@ func main() { flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus, err := gcppubsub.NewCloudPubSubBusProvisioner(busRef, projectID, opts) + bus, err := gcppubsub.NewCloudPubSubBusProvisioner(ref, projectID, opts) if err != nil { logger.Fatalf("Error starting pub/sub bus provisioner: %v", err) } diff --git a/pkg/buses/handler_funcs.go b/pkg/buses/handler_funcs.go index 6ce19dc7149..75e899ea0fd 100644 --- a/pkg/buses/handler_funcs.go +++ b/pkg/buses/handler_funcs.go @@ -44,24 +44,24 @@ type ResolvedParameters = map[string]string // created or deleted, or if one of the relevant resources is changed. type EventHandlerFuncs struct { // BusFunc is invoked when the Bus requires sync. - BusFunc func(busRef BusReference) error + BusFunc func(bus BusReference) error // ProvisionFunc is invoked when a new Channel should be provisioned or when // the attributes change. - ProvisionFunc func(channelRef ChannelReference, parameters ResolvedParameters) error + ProvisionFunc func(channel ChannelReference, parameters ResolvedParameters) error // UnprovisionFunc in invoked when a Channel should be deleted. - UnprovisionFunc func(channelRef ChannelReference) error + UnprovisionFunc func(channel ChannelReference) error // SubscribeFunc is invoked when a new Subscription should be set up or when // the attributes change. - SubscribeFunc func(channelRef ChannelReference, subscriptionRef SubscriptionReference, parameters ResolvedParameters) error + SubscribeFunc func(channel ChannelReference, subscription SubscriptionReference, parameters ResolvedParameters) error // UnsubscribeFunc is invoked when a Subscription should be deleted. - UnsubscribeFunc func(channelRef ChannelReference, subscriptionRef SubscriptionReference) error + UnsubscribeFunc func(channel ChannelReference, subscription SubscriptionReference) error // ReceiveMessageFunc is invoked when a Message is received on a Channel - ReceiveMessageFunc func(channelRef ChannelReference, message *Message) error + ReceiveMessageFunc func(channel ChannelReference, message *Message) error logger *zap.SugaredLogger } @@ -70,8 +70,8 @@ func (h EventHandlerFuncs) onBus(bus channelsv1alpha1.GenericBus, reconciler *Re if h.BusFunc == nil { return nil } - busRef := NewBusReference(bus) - err := h.BusFunc(busRef) + ref := NewBusReference(bus) + err := h.BusFunc(ref) if err != nil { reconciler.RecordBusEventf(corev1.EventTypeWarning, errResourceSync, "Error syncing Bus: %s", err) } else { @@ -90,13 +90,13 @@ func (h EventHandlerFuncs) onProvision(channel *channelsv1alpha1.Channel, reconc } channelCopy := channel.DeepCopy() var cond *channelsv1alpha1.ChannelCondition - channelRef := NewChannelReference(channel) - err = h.ProvisionFunc(channelRef, parameters) + ref := NewChannelReference(channel) + err = h.ProvisionFunc(ref, parameters) if err != nil { - reconciler.RecordChannelEventf(channelRef, corev1.EventTypeWarning, errResourceSync, "Error provisioning channel: %s", err) + reconciler.RecordChannelEventf(ref, corev1.EventTypeWarning, errResourceSync, "Error provisioning channel: %s", err) cond = util.NewChannelCondition(channelsv1alpha1.ChannelProvisioned, corev1.ConditionFalse, errResourceSync, err.Error()) } else { - reconciler.RecordChannelEventf(channelRef, corev1.EventTypeNormal, successSynced, "Channel provisioned successfully") + reconciler.RecordChannelEventf(ref, corev1.EventTypeNormal, successSynced, "Channel provisioned successfully") cond = util.NewChannelCondition(channelsv1alpha1.ChannelProvisioned, corev1.ConditionTrue, successSynced, "Channel provisioned successfully") } util.SetChannelCondition(&channelCopy.Status, *cond) @@ -119,12 +119,12 @@ func (h EventHandlerFuncs) onUnprovision(channel *channelsv1alpha1.Channel, reco if h.UnprovisionFunc == nil { return nil } - channelRef := NewChannelReference(channel) - if err := h.UnprovisionFunc(channelRef); err != nil { - reconciler.RecordChannelEventf(channelRef, corev1.EventTypeWarning, errResourceSync, "Error unprovisioning channel: %s", err) + ref := NewChannelReference(channel) + if err := h.UnprovisionFunc(ref); err != nil { + reconciler.RecordChannelEventf(ref, corev1.EventTypeWarning, errResourceSync, "Error unprovisioning channel: %s", err) return err } - reconciler.RecordChannelEventf(channelRef, corev1.EventTypeNormal, successSynced, "Channel unprovisioned successfully") + reconciler.RecordChannelEventf(ref, corev1.EventTypeNormal, successSynced, "Channel unprovisioned successfully") // skip updating status conditions since the channel was deleted return nil } @@ -137,16 +137,16 @@ func (h EventHandlerFuncs) onSubscribe(subscription *channelsv1alpha1.Subscripti if err != nil { return err } - channelRef := NewChannelReferenceFromSubscription(subscription) - subscriptionRef := NewSubscriptionReference(subscription) + channel := NewChannelReferenceFromSubscription(subscription) + ref := NewSubscriptionReference(subscription) subscriptionCopy := subscription.DeepCopy() var cond *channelsv1alpha1.SubscriptionCondition - err = h.SubscribeFunc(channelRef, subscriptionRef, parameters) + err = h.SubscribeFunc(channel, ref, parameters) if err != nil { - reconciler.RecordSubscriptionEventf(subscriptionRef, corev1.EventTypeWarning, errResourceSync, "Error subscribing: %s", err) + reconciler.RecordSubscriptionEventf(ref, corev1.EventTypeWarning, errResourceSync, "Error subscribing: %s", err) cond = util.NewSubscriptionCondition(channelsv1alpha1.SubscriptionDispatching, corev1.ConditionFalse, errResourceSync, err.Error()) } else { - reconciler.RecordSubscriptionEventf(subscriptionRef, corev1.EventTypeNormal, successSynced, "Subscribed successfully") + reconciler.RecordSubscriptionEventf(ref, corev1.EventTypeNormal, successSynced, "Subscribed successfully") cond = util.NewSubscriptionCondition(channelsv1alpha1.SubscriptionDispatching, corev1.ConditionTrue, successSynced, "Subscription dispatcher successfully created") } util.SetSubscriptionCondition(&subscriptionCopy.Status, *cond) @@ -168,23 +168,23 @@ func (h EventHandlerFuncs) onUnsubscribe(subscription *channelsv1alpha1.Subscrip if h.UnsubscribeFunc == nil { return nil } - channelRef := NewChannelReferenceFromSubscription(subscription) - subscriptionRef := NewSubscriptionReference(subscription) - if err := h.UnsubscribeFunc(channelRef, subscriptionRef); err != nil { - reconciler.RecordSubscriptionEventf(subscriptionRef, corev1.EventTypeWarning, errResourceSync, "Error unsubscribing: %s", err) + channel := NewChannelReferenceFromSubscription(subscription) + ref := NewSubscriptionReference(subscription) + if err := h.UnsubscribeFunc(channel, ref); err != nil { + reconciler.RecordSubscriptionEventf(ref, corev1.EventTypeWarning, errResourceSync, "Error unsubscribing: %s", err) return err } - reconciler.RecordSubscriptionEventf(subscriptionRef, corev1.EventTypeNormal, successSynced, "Unsubscribed successfully") + reconciler.RecordSubscriptionEventf(ref, corev1.EventTypeNormal, successSynced, "Unsubscribed successfully") // skip updating status conditions since the subscription was deleted return nil } -func (h EventHandlerFuncs) onReceiveMessage(channelRef ChannelReference, message *Message) error { +func (h EventHandlerFuncs) onReceiveMessage(channel ChannelReference, message *Message) error { if h.ReceiveMessageFunc == nil { // TODO use a static error return fmt.Errorf("unable to dispatch message") } - return h.ReceiveMessageFunc(channelRef, message) + return h.ReceiveMessageFunc(channel, message) } // resolveChannelParameters resolves the given Channel Parameters and the Bus' diff --git a/pkg/buses/kafka/bus.go b/pkg/buses/kafka/bus.go index ae0b1d17607..3187eabf660 100644 --- a/pkg/buses/kafka/bus.go +++ b/pkg/buses/kafka/bus.go @@ -38,7 +38,7 @@ const ( ) type KafkaBus struct { - busRef buses.BusReference + ref buses.BusReference dispatcher buses.BusDispatcher provisioner buses.BusProvisioner @@ -50,30 +50,30 @@ type KafkaBus struct { logger *zap.SugaredLogger } -func NewKafkaBusDispatcher(busRef buses.BusReference, brokers []string, opts *buses.BusOpts) (*KafkaBus, error) { +func NewKafkaBusDispatcher(ref buses.BusReference, brokers []string, opts *buses.BusOpts) (*KafkaBus, error) { bus := &KafkaBus{ - busRef: busRef, + ref: ref, kafkaBrokers: brokers, } eventHandlers := buses.EventHandlerFuncs{ - SubscribeFunc: func(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters) error { - return bus.subscribe(channelRef, subscriptionRef, parameters) + SubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference, parameters buses.ResolvedParameters) error { + return bus.subscribe(channel, subscription, parameters) }, - UnsubscribeFunc: func(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference) error { - return bus.unsubscribe(channelRef, subscriptionRef) + UnsubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference) error { + return bus.unsubscribe(channel, subscription) }, - ReceiveMessageFunc: func(channelRef buses.ChannelReference, message *buses.Message) error { - bus.kafkaAsyncProducer.Input() <- toKafkaMessage(channelRef, message) + ReceiveMessageFunc: func(channel buses.ChannelReference, message *buses.Message) error { + bus.kafkaAsyncProducer.Input() <- toKafkaMessage(channel, message) return nil }, } - bus.dispatcher = buses.NewBusDispatcher(busRef, eventHandlers, opts) + bus.dispatcher = buses.NewBusDispatcher(ref, eventHandlers, opts) bus.logger = opts.Logger bus.kafkaConsumers = make(map[buses.SubscriptionReference]*cluster.Consumer) conf := sarama.NewConfig() conf.Version = sarama.V1_1_0_0 - conf.ClientID = busRef.Name + "-dispatcher" + conf.ClientID = ref.Name + "-dispatcher" client, err := sarama.NewClient(brokers, conf) if err != nil { return nil, fmt.Errorf("unable to create kafka client: %v", err) @@ -87,25 +87,25 @@ func NewKafkaBusDispatcher(busRef buses.BusReference, brokers []string, opts *bu return bus, nil } -func NewKafkaBusProvisioner(busRef buses.BusReference, brokers []string, opts *buses.BusOpts) (*KafkaBus, error) { +func NewKafkaBusProvisioner(ref buses.BusReference, brokers []string, opts *buses.BusOpts) (*KafkaBus, error) { bus := &KafkaBus{ - busRef: busRef, + ref: ref, kafkaBrokers: brokers, } eventHandlers := buses.EventHandlerFuncs{ - ProvisionFunc: func(channelRef buses.ChannelReference, parameters buses.ResolvedParameters) error { - return bus.provision(channelRef, parameters) + ProvisionFunc: func(channel buses.ChannelReference, parameters buses.ResolvedParameters) error { + return bus.provision(channel, parameters) }, - UnprovisionFunc: func(channelRef buses.ChannelReference) error { - return bus.unprovision(channelRef) + UnprovisionFunc: func(channel buses.ChannelReference) error { + return bus.unprovision(channel) }, } - bus.provisioner = buses.NewBusProvisioner(busRef, eventHandlers, opts) + bus.provisioner = buses.NewBusProvisioner(ref, eventHandlers, opts) bus.logger = opts.Logger conf := sarama.NewConfig() conf.Version = sarama.V1_1_0_0 - conf.ClientID = busRef.Name + "-provisioner" + conf.ClientID = ref.Name + "-provisioner" clusterAdmin, err := sarama.NewClusterAdmin(brokers, conf) if err != nil { @@ -139,26 +139,26 @@ func (b *KafkaBus) Run(threadness int, stopCh <-chan struct{}) { } } -func (b *KafkaBus) subscribe(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters) error { - if _, ok := b.kafkaConsumers[subscriptionRef]; ok { +func (b *KafkaBus) subscribe(channel buses.ChannelReference, subscription buses.SubscriptionReference, parameters buses.ResolvedParameters) error { + if _, ok := b.kafkaConsumers[subscription]; ok { // subscribe can be called multiple times for the same subscription, //unsubscribe before we resubscribe - err := b.unsubscribe(channelRef, subscriptionRef) + err := b.unsubscribe(channel, subscription) if err != nil { return err } } - b.logger.Infof("Subscribing %q to %q (%v)", subscriptionRef.String(), channelRef.String(), parameters) + b.logger.Infof("Subscribing %q to %q (%v)", subscription.String(), channel.String(), parameters) - topicName := topicName(channelRef) + topicName := topicName(channel) initialOffset, err := resolveInitialOffset(parameters) if err != nil { return err } - group := fmt.Sprintf("%s.%s.%s", b.busRef.Name, subscriptionRef.Namespace, subscriptionRef.Name) + group := fmt.Sprintf("%s.%s.%s", b.ref.Name, subscription.Namespace, subscription.Name) consumerConfig := cluster.NewConfig() consumerConfig.Version = sarama.V1_1_0_0 consumerConfig.Consumer.Offsets.Initial = initialOffset @@ -167,15 +167,15 @@ func (b *KafkaBus) subscribe(channelRef buses.ChannelReference, subscriptionRef return err } - b.kafkaConsumers[subscriptionRef] = consumer + b.kafkaConsumers[subscription] = consumer go func() { for { msg, more := <-consumer.Messages() if more { - b.logger.Infof("Dispatching a message for subscription %q", subscriptionRef.String()) + b.logger.Infof("Dispatching a message for subscription %q", subscription.String()) message := fromKafkaMessage(msg) - err := b.dispatcher.DispatchMessage(subscriptionRef, message) + err := b.dispatcher.DispatchMessage(subscription, message) if err != nil { b.logger.Warnf("Got error trying to dispatch message: %v", err) } @@ -185,23 +185,23 @@ func (b *KafkaBus) subscribe(channelRef buses.ChannelReference, subscriptionRef break } } - b.logger.Infof("Consumer for subscription %q stopped", subscriptionRef.String()) + b.logger.Infof("Consumer for subscription %q stopped", subscription.String()) }() return nil } -func (b *KafkaBus) unsubscribe(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference) error { - b.logger.Infof("Un-Subscribing %q from %q", subscriptionRef.String(), channelRef.String()) - if consumer, ok := b.kafkaConsumers[subscriptionRef]; ok { - delete(b.kafkaConsumers, subscriptionRef) +func (b *KafkaBus) unsubscribe(channel buses.ChannelReference, subscription buses.SubscriptionReference) error { + b.logger.Infof("Un-Subscribing %q from %q", subscription.String(), channel.String()) + if consumer, ok := b.kafkaConsumers[subscription]; ok { + delete(b.kafkaConsumers, subscription) return consumer.Close() } return nil } -func (b *KafkaBus) provision(channelRef buses.ChannelReference, parameters buses.ResolvedParameters) error { - topicName := topicName(channelRef) +func (b *KafkaBus) provision(channel buses.ChannelReference, parameters buses.ResolvedParameters) error { + topicName := topicName(channel) b.logger.Infof("Provisioning topic %s on bus backed by Kafka", topicName) partitions := 1 @@ -209,7 +209,7 @@ func (b *KafkaBus) provision(channelRef buses.ChannelReference, parameters buses var err error partitions, err = strconv.Atoi(p) if err != nil { - b.logger.Warnf("Could not parse partition count for %q: %s", channelRef.String(), p) + b.logger.Warnf("Could not parse partition count for %q: %s", channel.String(), p) } } @@ -227,8 +227,8 @@ func (b *KafkaBus) provision(channelRef buses.ChannelReference, parameters buses return err } -func (b *KafkaBus) unprovision(channelRef buses.ChannelReference) error { - topicName := topicName(channelRef) +func (b *KafkaBus) unprovision(channel buses.ChannelReference) error { + topicName := topicName(channel) b.logger.Infof("Un-provisioning topic %s from bus backed by Kafka", topicName) err := b.kafkaClusterAdmin.DeleteTopic(topicName) @@ -243,9 +243,9 @@ func (b *KafkaBus) unprovision(channelRef buses.ChannelReference) error { return err } -func toKafkaMessage(channelRef buses.ChannelReference, message *buses.Message) *sarama.ProducerMessage { +func toKafkaMessage(channel buses.ChannelReference, message *buses.Message) *sarama.ProducerMessage { kafkaMessage := sarama.ProducerMessage{ - Topic: topicName(channelRef), + Topic: topicName(channel), Value: sarama.ByteEncoder(message.Payload), } for h, v := range message.Headers { diff --git a/pkg/buses/kafka/dispatcher/main.go b/pkg/buses/kafka/dispatcher/main.go index a2f4bc069fb..b66e8de5f00 100644 --- a/pkg/buses/kafka/dispatcher/main.go +++ b/pkg/buses/kafka/dispatcher/main.go @@ -33,7 +33,7 @@ const ( ) func main() { - busRef := buses.NewBusReferenceFromNames( + ref := buses.NewBusReferenceFromNames( os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) @@ -42,7 +42,7 @@ func main() { logger := buses.NewBusLoggerFromConfig(config) defer logger.Sync() logger = logger.With( - zap.String("channels.knative.dev/bus", busRef.String()), + zap.String("channels.knative.dev/bus", ref.String()), zap.String("channels.knative.dev/busType", kafka.BusType), zap.String("channels.knative.dev/busComponent", buses.Dispatcher), ) @@ -61,7 +61,7 @@ func main() { flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus, err := kafka.NewKafkaBusDispatcher(busRef, brokers, opts) + bus, err := kafka.NewKafkaBusDispatcher(ref, brokers, opts) if err != nil { logger.Fatalf("Error starting kafka bus dispatcher: %v", err) } diff --git a/pkg/buses/kafka/provisioner/main.go b/pkg/buses/kafka/provisioner/main.go index aee166bee05..b5fa3172eff 100644 --- a/pkg/buses/kafka/provisioner/main.go +++ b/pkg/buses/kafka/provisioner/main.go @@ -33,7 +33,7 @@ const ( ) func main() { - busRef := buses.NewBusReferenceFromNames( + ref := buses.NewBusReferenceFromNames( os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) @@ -42,7 +42,7 @@ func main() { logger := buses.NewBusLoggerFromConfig(config) defer logger.Sync() logger = logger.With( - zap.String("channels.knative.dev/bus", busRef.String()), + zap.String("channels.knative.dev/bus", ref.String()), zap.String("channels.knative.dev/busType", kafka.BusType), zap.String("channels.knative.dev/busComponent", buses.Provisioner), ) @@ -61,7 +61,7 @@ func main() { flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus, err := kafka.NewKafkaBusProvisioner(busRef, brokers, opts) + bus, err := kafka.NewKafkaBusProvisioner(ref, brokers, opts) if err != nil { logger.Fatalf("Error starting kafka bus provisioner: %v", err) } diff --git a/pkg/buses/message_receiver.go b/pkg/buses/message_receiver.go index 5d95789578c..7639a7aec51 100644 --- a/pkg/buses/message_receiver.go +++ b/pkg/buses/message_receiver.go @@ -103,7 +103,7 @@ func (r *MessageReceiver) stop(srv *http.Server) { func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Request) { host := req.Host r.logger.Infof("Received request for %s", host) - channelReference := r.parseChannelReference(host) + channel := r.parseChannel(host) message, err := r.fromRequest(req) if err != nil { @@ -111,7 +111,7 @@ func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Reque return } - err = r.receiverFunc(channelReference, message) + err = r.receiverFunc(channel, message) if err != nil { if err == ErrUnknownChannel { res.WriteHeader(http.StatusNotFound) @@ -163,9 +163,9 @@ func (r *MessageReceiver) fromHTTPHeaders(headers http.Header) map[string]string return safe } -// parseChannelReference converts the channel's hostname into a channel +// parseChannel converts the channel's hostname into a channel // reference. -func (r *MessageReceiver) parseChannelReference(host string) ChannelReference { +func (r *MessageReceiver) parseChannel(host string) ChannelReference { chunks := strings.Split(host, ".") return ChannelReference{ Name: chunks[0], diff --git a/pkg/buses/reconciler.go b/pkg/buses/reconciler.go index 31cef13b30e..db9e1f4c00a 100644 --- a/pkg/buses/reconciler.go +++ b/pkg/buses/reconciler.go @@ -245,9 +245,9 @@ func NewReconciler( // RequeueSubscription will add the Subscription to the workqueue for future // processing. Reprocessing a Subscription is often used within a dispatcher // when a long lived receiver is interrupted by an asynchronous error. -func (r *Reconciler) RequeueSubscription(subscriptionRef SubscriptionReference) { - r.logger.Infof("Requeue subscription %q", subscriptionRef.String()) - r.workqueue.AddRateLimited(makeWorkqueueKey(subscriptionKind, subscriptionRef.Namespace, subscriptionRef.Name)) +func (r *Reconciler) RequeueSubscription(subscription SubscriptionReference) { + r.logger.Infof("Requeue subscription %q", subscription.String()) + r.workqueue.AddRateLimited(makeWorkqueueKey(subscriptionKind, subscription.Namespace, subscription.Name)) } // RecordBusEventf creates a new event for the reconciled bus and records it @@ -259,8 +259,8 @@ func (r *Reconciler) RecordBusEventf(eventtype, reason, messageFmt string, args // RecordChannelEventf creates a new event for the channel and records it with // the api server. Attempts to records an event for an unknown channel are // ignored. -func (r *Reconciler) RecordChannelEventf(channelRef ChannelReference, eventtype, reason, messageFmt string, args ...interface{}) { - channel, err := r.cache.Channel(channelRef) +func (r *Reconciler) RecordChannelEventf(ref ChannelReference, eventtype, reason, messageFmt string, args ...interface{}) { + channel, err := r.cache.Channel(ref) if err != nil { // TODO handle error return @@ -271,8 +271,8 @@ func (r *Reconciler) RecordChannelEventf(channelRef ChannelReference, eventtype, // RecordSubscriptionEventf creates a new event for the subscription and // records it with the api server. Attempts to records an event for an unknown // subscription are ignored. -func (r *Reconciler) RecordSubscriptionEventf(subscriptionRef SubscriptionReference, eventtype, reason, messageFmt string, args ...interface{}) { - subscription, err := r.cache.Subscription(subscriptionRef) +func (r *Reconciler) RecordSubscriptionEventf(ref SubscriptionReference, eventtype, reason, messageFmt string, args ...interface{}) { + subscription, err := r.cache.Subscription(ref) if err != nil { // TODO handle error return @@ -284,7 +284,7 @@ func (r *Reconciler) RecordSubscriptionEventf(subscriptionRef SubscriptionRefere // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (r *Reconciler) Run(busRef BusReference, threadiness int, stopCh <-chan struct{}) error { +func (r *Reconciler) Run(ref BusReference, threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer r.workqueue.ShutDown() @@ -298,18 +298,18 @@ func (r *Reconciler) Run(busRef BusReference, threadiness int, stopCh <-chan str return err } - if len(busRef.Namespace) == 0 { + if len(ref.Namespace) == 0 { // reconciler is for a ClusterBus - clusterBus, err := r.clusterBusesLister.Get(busRef.Name) + clusterBus, err := r.clusterBusesLister.Get(ref.Name) if err != nil { - r.logger.Fatalf("Unknown clusterbus %q: %v", busRef.Name, err) + r.logger.Fatalf("Unknown clusterbus %q: %v", ref.Name, err) } r.bus = clusterBus.DeepCopy() } else { // reconciler is for a namespaced Bus - bus, err := r.busesLister.Buses(busRef.Namespace).Get(busRef.Name) + bus, err := r.busesLister.Buses(ref.Namespace).Get(ref.Name) if err != nil { - r.logger.Fatalf("Unknown bus %q: %v", busRef, err) + r.logger.Fatalf("Unknown bus %q: %v", ref, err) } r.bus = bus.DeepCopy() } @@ -484,8 +484,8 @@ func (r *Reconciler) syncChannel(namespace string, name string) error { if err != nil { // The Channel resource may no longer exist if errors.IsNotFound(err) { - channelRef := NewChannelReferenceFromNames(name, namespace) - err = r.removeChannel(channelRef) + ref := NewChannelReferenceFromNames(name, namespace) + err = r.removeChannel(ref) if err != nil { return err } @@ -510,8 +510,8 @@ func (r *Reconciler) syncSubscription(namespace string, name string) error { if err != nil { // The Subscription resource may no longer exist if errors.IsNotFound(err) { - subscriptionRef := NewSubscriptionReferenceFromNames(name, namespace) - err = r.removeSubscription(subscriptionRef) + ref := NewSubscriptionReferenceFromNames(name, namespace) + err = r.removeSubscription(ref) if err != nil { return err } @@ -583,8 +583,8 @@ func (r *Reconciler) createOrUpdateChannel(channel *channelsv1alpha1.Channel) er return nil } -func (r *Reconciler) removeChannel(channelRef ChannelReference) error { - channel, err := r.cache.Channel(channelRef) +func (r *Reconciler) removeChannel(ref ChannelReference) error { + channel, err := r.cache.Channel(ref) if err != nil { // the channel isn't provisioned return nil @@ -600,11 +600,11 @@ func (r *Reconciler) removeChannel(channelRef ChannelReference) error { } func (r *Reconciler) createOrUpdateSubscription(subscription *channelsv1alpha1.Subscription) error { - channelRef := NewChannelReferenceFromSubscription(subscription) - _, err := r.cache.Channel(channelRef) + ref := NewChannelReferenceFromSubscription(subscription) + _, err := r.cache.Channel(ref) if err != nil { // channel is not provisioned, before erring we need to check if the channel is provionable - channel, errS := r.channelsLister.Channels(channelRef.Namespace).Get(channelRef.Name) + channel, errS := r.channelsLister.Channels(ref.Namespace).Get(ref.Name) if errS != nil { return err } @@ -623,8 +623,8 @@ func (r *Reconciler) createOrUpdateSubscription(subscription *channelsv1alpha1.S return nil } -func (r *Reconciler) removeSubscription(subscriptionRef SubscriptionReference) error { - subscription, err := r.cache.Subscription(subscriptionRef) +func (r *Reconciler) removeSubscription(ref SubscriptionReference) error { + subscription, err := r.cache.Subscription(ref) if err != nil { return nil } diff --git a/pkg/buses/references_test.go b/pkg/buses/references_test.go index 91d3f147fa0..84854f2adec 100644 --- a/pkg/buses/references_test.go +++ b/pkg/buses/references_test.go @@ -87,23 +87,23 @@ func TestNewBusReferenceFromNames_ClusterBus(t *testing.T) { } func TestBusReference_String(t *testing.T) { - busRef := buses.BusReference{ + ref := buses.BusReference{ Name: referencesTestBusName, Namespace: referencesTestNamespace, } expected := fmt.Sprintf("%s/%s", referencesTestNamespace, referencesTestBusName) - actual := busRef.String() + actual := ref.String() if expected != actual { t.Errorf("%s expected: %+v got: %+v", "BusReference", expected, actual) } } func TestBusReference_String_ClusterBus(t *testing.T) { - busRef := buses.BusReference{ + ref := buses.BusReference{ Name: referencesTestClusterBusName, } expected := referencesTestClusterBusName - actual := busRef.String() + actual := ref.String() if expected != actual { t.Errorf("%s expected: %+v got: %+v", "BusReference", expected, actual) } @@ -158,12 +158,12 @@ func TestNewChannelReferenceFromNames(t *testing.T) { } func TestChannelReference_String(t *testing.T) { - channelRef := buses.ChannelReference{ + ref := buses.ChannelReference{ Name: referencesTestChannelName, Namespace: referencesTestNamespace, } expected := fmt.Sprintf("%s/%s", referencesTestNamespace, referencesTestChannelName) - actual := channelRef.String() + actual := ref.String() if expected != actual { t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) } @@ -198,12 +198,12 @@ func TestNewSubscriptionReferenceFromNames(t *testing.T) { } func TestSubscriptionReference_String(t *testing.T) { - subscriptionRef := buses.SubscriptionReference{ + ref := buses.SubscriptionReference{ Name: referencesTestSubscriptionName, Namespace: referencesTestNamespace, } expected := fmt.Sprintf("%s/%s", referencesTestNamespace, referencesTestSubscriptionName) - actual := subscriptionRef.String() + actual := ref.String() if expected != actual { t.Errorf("%s expected: %+v got: %+v", "SubscriptionReference", expected, actual) } diff --git a/pkg/buses/stub/bus.go b/pkg/buses/stub/bus.go index 1e67fefaad8..0376364b17e 100644 --- a/pkg/buses/stub/bus.go +++ b/pkg/buses/stub/bus.go @@ -25,38 +25,38 @@ import ( // BusType is the type of the stub bus const BusType = "stub" -func NewStubBusDispatcher(busRef buses.BusReference, opts *buses.BusOpts) *StubBus { +func NewStubBusDispatcher(ref buses.BusReference, opts *buses.BusOpts) *StubBus { bus := &StubBus{ channels: make(map[buses.ChannelReference]*stubChannel), } handlerFuncs := buses.EventHandlerFuncs{ - ProvisionFunc: func(channelRef buses.ChannelReference, parameters buses.ResolvedParameters) error { - bus.logger.Infof("Provision channel %q", channelRef.String()) - bus.addChannel(channelRef, parameters) + ProvisionFunc: func(channel buses.ChannelReference, parameters buses.ResolvedParameters) error { + bus.logger.Infof("Provision channel %q", channel.String()) + bus.addChannel(channel, parameters) return nil }, - UnprovisionFunc: func(channelRef buses.ChannelReference) error { - bus.logger.Infof("Unprovision channel %q", channelRef.String()) - bus.removeChannel(channelRef) + UnprovisionFunc: func(channel buses.ChannelReference) error { + bus.logger.Infof("Unprovision channel %q", channel.String()) + bus.removeChannel(channel) return nil }, - SubscribeFunc: func(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters) error { - bus.logger.Infof("Subscribe %q to %q channel", subscriptionRef.String(), channelRef.String()) - bus.channel(channelRef).addSubscription(subscriptionRef, parameters, bus.dispatcher) + SubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference, parameters buses.ResolvedParameters) error { + bus.logger.Infof("Subscribe %q to %q channel", subscription.String(), channel.String()) + bus.channel(channel).addSubscription(subscription, parameters, bus.dispatcher) return nil }, - UnsubscribeFunc: func(channelRef buses.ChannelReference, subscriptionRef buses.SubscriptionReference) error { - bus.logger.Infof("Unsubscribe %q from %q channel", subscriptionRef.String(), channelRef.String()) - bus.channel(channelRef).removeSubscription(subscriptionRef) + UnsubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference) error { + bus.logger.Infof("Unsubscribe %q from %q channel", subscription.String(), channel.String()) + bus.channel(channel).removeSubscription(subscription) return nil }, - ReceiveMessageFunc: func(channelRef buses.ChannelReference, message *buses.Message) error { - bus.logger.Infof("Recieved message for %q channel", channelRef.String()) - bus.channel(channelRef).receiveMessage(message) + ReceiveMessageFunc: func(channel buses.ChannelReference, message *buses.Message) error { + bus.logger.Infof("Recieved message for %q channel", channel.String()) + bus.channel(channel).receiveMessage(message) return nil }, } - bus.dispatcher = buses.NewBusDispatcher(busRef, handlerFuncs, opts) + bus.dispatcher = buses.NewBusDispatcher(ref, handlerFuncs, opts) bus.logger = opts.Logger return bus @@ -73,30 +73,31 @@ func (b *StubBus) Run(threadness int, stopCh <-chan struct{}) { b.dispatcher.Run(threadness, stopCh) } -func (b *StubBus) addChannel(channelRef buses.ChannelReference, parameters buses.ResolvedParameters) { - if channel, ok := b.channels[channelRef]; ok { +func (b *StubBus) addChannel(ref buses.ChannelReference, parameters buses.ResolvedParameters) { + if channel, ok := b.channels[ref]; ok { // update channel channel.parameters = parameters } else { // create channel - b.channels[channelRef] = &stubChannel{ + b.channels[ref] = &stubChannel{ + ref: ref, parameters: parameters, subscriptions: make(map[buses.SubscriptionReference]*stubSubscription), - logger: b.logger.With(zap.String("channels.knative.dev/channel", channelRef.String())), + logger: b.logger.With(zap.String("channels.knative.dev/channel", ref.String())), } } } -func (b *StubBus) removeChannel(channelRef buses.ChannelReference) { - delete(b.channels, channelRef) +func (b *StubBus) removeChannel(channel buses.ChannelReference) { + delete(b.channels, channel) } -func (b *StubBus) channel(channelRef buses.ChannelReference) *stubChannel { - return b.channels[channelRef] +func (b *StubBus) channel(channel buses.ChannelReference) *stubChannel { + return b.channels[channel] } type stubChannel struct { - channelRef buses.ChannelReference + ref buses.ChannelReference parameters buses.ResolvedParameters subscriptions map[buses.SubscriptionReference]*stubSubscription @@ -109,36 +110,36 @@ func (c *stubChannel) receiveMessage(message *buses.Message) { } } -func (c *stubChannel) addSubscription(subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters, bus buses.BusDispatcher) { - if subscription, ok := c.subscriptions[subscriptionRef]; ok { +func (c *stubChannel) addSubscription(ref buses.SubscriptionReference, parameters buses.ResolvedParameters, bus buses.BusDispatcher) { + if subscription, ok := c.subscriptions[ref]; ok { // update subscription subscription.parameters = parameters } else { // create subscription - c.subscriptions[subscriptionRef] = &stubSubscription{ - bus: bus, - parameters: parameters, - subscriptionRef: subscriptionRef, + c.subscriptions[ref] = &stubSubscription{ + ref: ref, + bus: bus, + parameters: parameters, - logger: c.logger.With(zap.String("channels.knative.dev/subscription", subscriptionRef.String())), + logger: c.logger.With(zap.String("channels.knative.dev/subscription", ref.String())), } } } -func (c *stubChannel) removeSubscription(subscriptionRef buses.SubscriptionReference) { - delete(c.subscriptions, subscriptionRef) +func (c *stubChannel) removeSubscription(subscription buses.SubscriptionReference) { + delete(c.subscriptions, subscription) } type stubSubscription struct { - bus buses.BusDispatcher - parameters buses.ResolvedParameters - subscriptionRef buses.SubscriptionReference + ref buses.SubscriptionReference + bus buses.BusDispatcher + parameters buses.ResolvedParameters logger *zap.SugaredLogger } func (s *stubSubscription) dispatchMessage(message *buses.Message) error { - err := s.bus.DispatchMessage(s.subscriptionRef, message) + err := s.bus.DispatchMessage(s.ref, message) if err != nil { s.logger.Warnf("Failed to dispatch message: %v", err) } diff --git a/pkg/buses/stub/dispatcher/main.go b/pkg/buses/stub/dispatcher/main.go index 9f188d5e365..4064c2c5e8e 100644 --- a/pkg/buses/stub/dispatcher/main.go +++ b/pkg/buses/stub/dispatcher/main.go @@ -31,7 +31,7 @@ const ( ) func main() { - busRef := buses.NewBusReferenceFromNames( + ref := buses.NewBusReferenceFromNames( os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) @@ -40,7 +40,7 @@ func main() { logger := buses.NewBusLoggerFromConfig(config) defer logger.Sync() logger = logger.With( - zap.String("channels.knative.dev/bus", busRef.String()), + zap.String("channels.knative.dev/bus", ref.String()), zap.String("channels.knative.dev/busType", stub.BusType), zap.String("channels.knative.dev/busComponent", buses.Dispatcher), ) @@ -53,7 +53,7 @@ func main() { flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus := stub.NewStubBusDispatcher(busRef, opts) + bus := stub.NewStubBusDispatcher(ref, opts) // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler()