From 362dc9ce7b78561369412588bc48b17857ba926a Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 12 Jun 2018 11:57:06 -0400 Subject: [PATCH 01/19] Allow montior handler funcs to return errors --- pkg/buses/monitor.go | 28 ++++++++++++++++------------ pkg/buses/stub/main.go | 12 ++++++++---- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 220f1491e27..eca2f46dfad 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -38,10 +38,10 @@ type Monitor struct { // MonitorEventHandlerFuncs handler functions for channel and subscription provisioning type MonitorEventHandlerFuncs struct { - ProvisionFunc func(channel channelsv1alpha1.Channel) - UnprovisionFunc func(channel channelsv1alpha1.Channel) - SubscribeFunc func(subscription channelsv1alpha1.Subscription) - UnsubscribeFunc func(subscription channelsv1alpha1.Subscription) + ProvisionFunc func(channel channelsv1alpha1.Channel) error + UnprovisionFunc func(channel channelsv1alpha1.Channel) error + SubscribeFunc func(subscription channelsv1alpha1.Subscription) error + UnsubscribeFunc func(subscription channelsv1alpha1.Subscription) error } type channelSummary struct { @@ -259,7 +259,7 @@ func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) { } } -func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) { +func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error { channelKey := makeChannelKeyFromChannel(channel) summary := m.getOrCreateChannelSummary(channelKey) @@ -270,11 +270,13 @@ func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) { m.mutex.Unlock() if !reflect.DeepEqual(old, new) { - m.handler.ProvisionFunc(channel) + return m.handler.ProvisionFunc(channel) } + + return nil } -func (m *Monitor) removeChannel(channel channelsv1alpha1.Channel) { +func (m *Monitor) removeChannel(channel channelsv1alpha1.Channel) error { channelKey := makeChannelKeyFromChannel(channel) summary := m.getOrCreateChannelSummary(channelKey) @@ -282,10 +284,10 @@ func (m *Monitor) removeChannel(channel channelsv1alpha1.Channel) { summary.Channel = nil m.mutex.Unlock() - m.handler.UnprovisionFunc(channel) + return m.handler.UnprovisionFunc(channel) } -func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subscription) { +func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subscription) error { channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getOrCreateChannelSummary(channelKey) subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) @@ -299,11 +301,13 @@ func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subsc m.mutex.Unlock() if !reflect.DeepEqual(old.Subscription, new.Subscription) { - m.handler.SubscribeFunc(subscription) + return m.handler.SubscribeFunc(subscription) } + + return nil } -func (m *Monitor) removeSubscription(subscription channelsv1alpha1.Subscription) { +func (m *Monitor) removeSubscription(subscription channelsv1alpha1.Subscription) error { channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getOrCreateChannelSummary(channelKey) subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) @@ -312,7 +316,7 @@ func (m *Monitor) removeSubscription(subscription channelsv1alpha1.Subscription) delete(summary.Subscriptions, subscriptionKey) m.mutex.Unlock() - m.handler.UnsubscribeFunc(subscription) + return m.handler.UnsubscribeFunc(subscription) } type channelKey struct { diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index b1c20eaa5df..cf66be8016f 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -154,17 +154,21 @@ func main() { informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) monitor := buses.NewMonitor(name, informerFactory, buses.MonitorEventHandlerFuncs{ - ProvisionFunc: func(channel channelsv1alpha1.Channel) { + ProvisionFunc: func(channel channelsv1alpha1.Channel) error { glog.Infof("Provision channel %q\n", channel.Name) + return nil }, - UnprovisionFunc: func(channel channelsv1alpha1.Channel) { + UnprovisionFunc: func(channel channelsv1alpha1.Channel) error { glog.Infof("Unprovision channel %q\n", channel.Name) + return nil }, - SubscribeFunc: func(subscription channelsv1alpha1.Subscription) { + SubscribeFunc: func(subscription channelsv1alpha1.Subscription) error { glog.Infof("Subscribe %q to %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel) + return nil }, - UnsubscribeFunc: func(subscription channelsv1alpha1.Subscription) { + UnsubscribeFunc: func(subscription channelsv1alpha1.Subscription) error { glog.Infof("Unubscribe %q from %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel) + return nil }, }) bus := NewStubBus(name, monitor) From e17286ff5916f7690cfafbaafe540941367300c1 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 12 Jun 2018 12:32:30 -0400 Subject: [PATCH 02/19] Filter handler funcs for channels and subscriptions on the monitored bus --- pkg/buses/monitor.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index eca2f46dfad..151a6388365 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -259,6 +259,10 @@ func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) { } } +func (m *Monitor) isChannelForBus(channel channelsv1alpha1.Channel) bool { + return channel.Spec.Bus == m.busName +} + func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error { channelKey := makeChannelKeyFromChannel(channel) summary := m.getOrCreateChannelSummary(channelKey) @@ -269,7 +273,7 @@ func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error summary.Channel = new m.mutex.Unlock() - if !reflect.DeepEqual(old, new) { + if m.isChannelForBus(channel) && !reflect.DeepEqual(old, new) { return m.handler.ProvisionFunc(channel) } @@ -284,7 +288,17 @@ func (m *Monitor) removeChannel(channel channelsv1alpha1.Channel) error { summary.Channel = nil m.mutex.Unlock() - return m.handler.UnprovisionFunc(channel) + if m.isChannelForBus(channel) { + return m.handler.UnprovisionFunc(channel) + } + + return nil +} + +func (m *Monitor) isSubscriptionForBus(subscription channelsv1alpha1.Subscription) bool { + channelKey := makeChannelKeyFromSubscription(subscription) + summary := m.getChannelSummary(channelKey) + return summary != nil && summary.Channel.Bus == m.busName } func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subscription) error { @@ -300,7 +314,7 @@ func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subsc summary.Subscriptions[subscriptionKey] = new m.mutex.Unlock() - if !reflect.DeepEqual(old.Subscription, new.Subscription) { + if m.isSubscriptionForBus(subscription) && !reflect.DeepEqual(old.Subscription, new.Subscription) { return m.handler.SubscribeFunc(subscription) } @@ -316,7 +330,11 @@ func (m *Monitor) removeSubscription(subscription channelsv1alpha1.Subscription) delete(summary.Subscriptions, subscriptionKey) m.mutex.Unlock() - return m.handler.UnsubscribeFunc(subscription) + if m.isSubscriptionForBus(subscription) { + return m.handler.UnsubscribeFunc(subscription) + } + + return nil } type channelKey struct { From a3eb63f963c606d1cc954f5a4dab9f6a59d9d7a2 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 12 Jun 2018 20:44:23 -0400 Subject: [PATCH 03/19] Requeue failed updates --- pkg/buses/monitor.go | 403 +++++++++++++++++++++++++++++++++++++---- pkg/buses/stub/main.go | 10 + 2 files changed, 380 insertions(+), 33 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 151a6388365..afabe062918 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -17,33 +17,91 @@ package buses import ( + "fmt" "reflect" + "strings" "sync" + "time" "github.com/golang/glog" channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" informers "github.com/knative/eventing/pkg/client/informers/externalversions" + listers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +const ( + busKind = "Bus" + channelKind = "Channel" + subscriptionKind = "Subscription" ) // Monitor utility to manage channels and subscriptions for a bus type Monitor struct { - busName string - handler MonitorEventHandlerFuncs - - bus *channelsv1alpha1.BusSpec - cache map[channelKey]*channelSummary - mutex *sync.Mutex + busName string + handler MonitorEventHandlerFuncs + bus *channelsv1alpha1.BusSpec + busesLister listers.BusLister + busesSynced cache.InformerSynced + channelsLister listers.ChannelLister + channelsSynced cache.InformerSynced + subscriptionsLister listers.SubscriptionLister + subscriptionsSynced cache.InformerSynced + workqueue workqueue.RateLimitingInterface + cache map[channelKey]*channelSummary + provisionedChannels map[resourceKey]channelsv1alpha1.Channel + provisionedSubscriptions map[resourceKey]channelsv1alpha1.Subscription + mutex *sync.Mutex } // MonitorEventHandlerFuncs handler functions for channel and subscription provisioning type MonitorEventHandlerFuncs struct { + BusFunc func(bus channelsv1alpha1.Bus) error ProvisionFunc func(channel channelsv1alpha1.Channel) error UnprovisionFunc func(channel channelsv1alpha1.Channel) error SubscribeFunc func(subscription channelsv1alpha1.Subscription) error UnsubscribeFunc func(subscription channelsv1alpha1.Subscription) error } +func (h MonitorEventHandlerFuncs) onBus(bus channelsv1alpha1.Bus) error { + if h.BusFunc != nil { + return h.BusFunc(bus) + } + return nil +} + +func (h MonitorEventHandlerFuncs) onProvision(channel channelsv1alpha1.Channel) error { + if h.ProvisionFunc != nil { + return h.ProvisionFunc(channel) + } + return nil +} + +func (h MonitorEventHandlerFuncs) onUnprovision(channel channelsv1alpha1.Channel) error { + if h.UnprovisionFunc != nil { + return h.UnprovisionFunc(channel) + } + return nil +} + +func (h MonitorEventHandlerFuncs) onSubscribe(subscription channelsv1alpha1.Subscription) error { + if h.SubscribeFunc != nil { + return h.SubscribeFunc(subscription) + } + return nil +} + +func (h MonitorEventHandlerFuncs) onUnsubscribe(subscription channelsv1alpha1.Subscription) error { + if h.UnsubscribeFunc != nil { + return h.UnsubscribeFunc(subscription) + } + return nil +} + type channelSummary struct { Channel *channelsv1alpha1.ChannelSpec Subscriptions map[subscriptionKey]subscriptionSummary @@ -64,8 +122,17 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, busName: busName, handler: handler, - bus: nil, - cache: make(map[channelKey]*channelSummary), + bus: nil, + busesLister: busInformer.Lister(), + busesSynced: busInformer.Informer().HasSynced, + channelsLister: channelInformer.Lister(), + channelsSynced: channelInformer.Informer().HasSynced, + subscriptionsLister: subscriptionInformer.Lister(), + subscriptionsSynced: subscriptionInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Monitor"), + cache: make(map[channelKey]*channelSummary), + provisionedChannels: make(map[resourceKey]channelsv1alpha1.Channel), + provisionedSubscriptions: make(map[resourceKey]channelsv1alpha1.Subscription), mutex: &sync.Mutex{}, } @@ -74,7 +141,7 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, busInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { bus := obj.(*channelsv1alpha1.Bus) - monitor.createOrUpdateBus(*bus) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(*bus)) }, UpdateFunc: func(old, new interface{}) { oldBus := old.(*channelsv1alpha1.Bus) @@ -86,14 +153,14 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, return } - monitor.createOrUpdateBus(*newBus) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(*newBus)) }, }) // Set up an event handler for when Channel resources change channelInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { channel := obj.(*channelsv1alpha1.Channel) - monitor.createOrUpdateChannel(*channel) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(*channel)) }, UpdateFunc: func(old, new interface{}) { oldChannel := old.(*channelsv1alpha1.Channel) @@ -105,21 +172,18 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, return } - monitor.createOrUpdateChannel(*newChannel) - if oldChannel.Spec.Bus != newChannel.Spec.Bus { - monitor.removeChannel(*oldChannel) - } + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(*newChannel)) }, DeleteFunc: func(obj interface{}) { channel := obj.(*channelsv1alpha1.Channel) - monitor.removeChannel(*channel) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(*channel)) }, }) // Set up an event handler for when Subscription resources change subscriptionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { subscription := obj.(*channelsv1alpha1.Subscription) - monitor.createOrUpdateSubscription(*subscription) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(*subscription)) }, UpdateFunc: func(old, new interface{}) { oldSubscription := old.(*channelsv1alpha1.Subscription) @@ -131,14 +195,11 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, return } - monitor.createOrUpdateSubscription(*newSubscription) - if oldSubscription.Spec.Channel != newSubscription.Spec.Channel { - monitor.removeSubscription(*oldSubscription) - } + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(*newSubscription)) }, DeleteFunc: func(obj interface{}) { subscription := obj.(*channelsv1alpha1.Subscription) - monitor.removeSubscription(*subscription) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(*subscription)) }, }) @@ -230,6 +291,193 @@ func (m *Monitor) SubscriptionParams( return params } +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (m *Monitor) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer m.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + glog.Info("Starting Bus controller") + + // Wait for the caches to be synced before starting workers + glog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, m.busesSynced, m.channelsSynced, m.subscriptionsSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + glog.Info("Starting workers") + // Launch two workers to process Bus resources + for i := 0; i < threadiness; i++ { + go wait.Until(m.runWorker, time.Second, stopCh) + } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + + return nil +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (m *Monitor) runWorker() { + for m.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (m *Monitor) processNextWorkItem() bool { + obj, shutdown := m.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer m.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer m.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + m.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Bus resource to be synced. + if err := m.syncHandler(key); err != nil { + return fmt.Errorf("error syncing monitor '%s': %s", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + m.workqueue.Forget(obj) + glog.Infof("Successfully synced monitor '%s'", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Bus resource +// with the current status of the resource. +func (m *Monitor) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + kind, namespace, name, err := splitWorkqueueKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + switch kind { + case busKind: + err = m.syncBus(namespace, name) + case channelKind: + err = m.syncChannel(namespace, name) + case subscriptionKind: + err = m.syncSubscription(namespace, name) + default: + runtime.HandleError(fmt.Errorf("Unknown resource kind %s", kind)) + return nil + } + + if err != nil { + return err + } + + return nil +} + +func (m *Monitor) syncBus(namespace string, name string) error { + // Get the Bus resource with this namespace/name + bus, err := m.busesLister.Buses(namespace).Get(name) + if err != nil { + // The Bus resource may no longer exist + if errors.IsNotFound(err) { + m.removeBus(namespace, name) + return nil + } + + return err + } + + // Sync the Bus + err = m.createOrUpdateBus(*bus) + if err != nil { + return err + } + + return nil +} + +func (m *Monitor) syncChannel(namespace string, name string) error { + // Get the Channel resource with this namespace/name + channel, err := m.channelsLister.Channels(namespace).Get(name) + if err != nil { + // The Channel resource may no longer exist + if errors.IsNotFound(err) { + m.removeChannel(namespace, name) + return nil + } + + return err + } + + // Sync the Channel + err = m.createOrUpdateChannel(*channel) + if err != nil { + return err + } + + return nil +} + +func (m *Monitor) syncSubscription(namespace string, name string) error { + // Get the Subscription resource with this namespace/name + subscription, err := m.subscriptionsLister.Subscriptions(namespace).Get(name) + if err != nil { + // The Subscription resource may no longer exist + if errors.IsNotFound(err) { + m.removeSubscription(namespace, name) + return nil + } + + return err + } + + // Sync the Subscription + err = m.createOrUpdateSubscription(*subscription) + if err != nil { + return err + } + + return nil +} + func (m *Monitor) getChannelSummary(key channelKey) *channelSummary { return m.cache[key] } @@ -249,14 +497,26 @@ func (m *Monitor) getOrCreateChannelSummary(key channelKey) *channelSummary { return summary } -func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) { +func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) error { if bus.Name != m.busName { // this is not our bus - return + return nil } + if !reflect.DeepEqual(m.bus, bus.Spec) { m.bus = &bus.Spec + err := m.handler.onBus(bus) + if err != nil { + return err + } } + + return nil +} + +func (m *Monitor) removeBus(namespace string, name string) error { + // nothing to do + return nil } func (m *Monitor) isChannelForBus(channel channelsv1alpha1.Channel) bool { @@ -264,6 +524,7 @@ func (m *Monitor) isChannelForBus(channel channelsv1alpha1.Channel) bool { } func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error { + resourceKey := makeResourceKey(channelKind, channel.Namespace, channel.Name) channelKey := makeChannelKeyFromChannel(channel) summary := m.getOrCreateChannelSummary(channelKey) @@ -274,13 +535,23 @@ func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error m.mutex.Unlock() if m.isChannelForBus(channel) && !reflect.DeepEqual(old, new) { - return m.handler.ProvisionFunc(channel) + err := m.handler.onProvision(channel) + if err != nil { + return err + } + m.provisionedChannels[resourceKey] = channel } return nil } -func (m *Monitor) removeChannel(channel channelsv1alpha1.Channel) error { +func (m *Monitor) removeChannel(namespace string, name string) error { + resourceKey := makeResourceKey(channelKind, namespace, name) + channel, ok := m.provisionedChannels[resourceKey] + if !ok { + return nil + } + channelKey := makeChannelKeyFromChannel(channel) summary := m.getOrCreateChannelSummary(channelKey) @@ -288,20 +559,29 @@ func (m *Monitor) removeChannel(channel channelsv1alpha1.Channel) error { summary.Channel = nil m.mutex.Unlock() - if m.isChannelForBus(channel) { - return m.handler.UnprovisionFunc(channel) + err := m.handler.onUnprovision(channel) + if err != nil { + return err } + delete(m.provisionedChannels, resourceKey) return nil } +func (m *Monitor) isChannelKnown(subscription channelsv1alpha1.Subscription) bool { + channelKey := makeChannelKeyFromSubscription(subscription) + summary := m.getChannelSummary(channelKey) + return summary != nil && summary.Channel != nil +} + func (m *Monitor) isSubscriptionForBus(subscription channelsv1alpha1.Subscription) bool { channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getChannelSummary(channelKey) - return summary != nil && summary.Channel.Bus == m.busName + return summary != nil && summary.Channel != nil && summary.Channel.Bus == m.busName } func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subscription) error { + resourceKey := makeResourceKey(subscriptionKind, subscription.Namespace, subscription.Name) channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getOrCreateChannelSummary(channelKey) subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) @@ -314,14 +594,28 @@ func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subsc summary.Subscriptions[subscriptionKey] = new m.mutex.Unlock() + if !m.isChannelKnown(subscription) { + return fmt.Errorf("Unknown channel %q for subscription", subscription.Spec.Channel) + } + if m.isSubscriptionForBus(subscription) && !reflect.DeepEqual(old.Subscription, new.Subscription) { - return m.handler.SubscribeFunc(subscription) + err := m.handler.onSubscribe(subscription) + if err != nil { + return err + } + m.provisionedSubscriptions[resourceKey] = subscription } return nil } -func (m *Monitor) removeSubscription(subscription channelsv1alpha1.Subscription) error { +func (m *Monitor) removeSubscription(namespace string, name string) error { + resourceKey := makeResourceKey(subscriptionKind, namespace, name) + subscription, ok := m.provisionedSubscriptions[resourceKey] + if !ok { + return nil + } + channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getOrCreateChannelSummary(channelKey) subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) @@ -330,13 +624,29 @@ func (m *Monitor) removeSubscription(subscription channelsv1alpha1.Subscription) delete(summary.Subscriptions, subscriptionKey) m.mutex.Unlock() - if m.isSubscriptionForBus(subscription) { - return m.handler.UnsubscribeFunc(subscription) + err := m.handler.onUnsubscribe(subscription) + if err != nil { + return err } + delete(m.provisionedSubscriptions, resourceKey) return nil } +type resourceKey struct { + Kind string + Namespace string + Name string +} + +func makeResourceKey(kind string, namespace string, name string) resourceKey { + return resourceKey{ + Kind: kind, + Namespace: namespace, + Name: name, + } +} + type channelKey struct { Name string Namespace string @@ -368,3 +678,30 @@ func makeSubscriptionKeyFromSubscription(subscription channelsv1alpha1.Subscript Namespace: subscription.Namespace, } } + +func makeWorkqueueKeyForBus(bus channelsv1alpha1.Bus) string { + return makeWorkqueueKey(busKind, bus.Namespace, bus.Name) +} + +func makeWorkqueueKeyForChannel(channel channelsv1alpha1.Channel) string { + return makeWorkqueueKey(channelKind, channel.Namespace, channel.Name) +} + +func makeWorkqueueKeyForSubscription(subscription channelsv1alpha1.Subscription) string { + return makeWorkqueueKey(subscriptionKind, subscription.Namespace, subscription.Name) +} + +func makeWorkqueueKey(kind string, namespace string, name string) string { + return fmt.Sprintf("%s/%s/%s", kind, namespace, name) +} + +func splitWorkqueueKey(key string) (string, string, string, error) { + chunks := strings.Split(key, "/") + if len(chunks) != 3 { + return "", "", "", fmt.Errorf("Unknown workqueue key %v", key) + } + kind := chunks[0] + namespace := chunks[1] + name := chunks[2] + return kind, namespace, name, nil +} diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index cf66be8016f..81a678e6650 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -35,6 +35,10 @@ import ( "k8s.io/client-go/tools/clientcmd" ) +const ( + threadsPerMonitor = 1 +) + var ( masterURL string kubeconfig string @@ -174,7 +178,13 @@ func main() { bus := NewStubBus(name, monitor) go informerFactory.Start(stopCh) + go func() { + if err := monitor.Run(threadsPerMonitor, stopCh); err != nil { + glog.Fatalf("Error running monitor: %s", err.Error()) + } + }() + glog.Infoln("Starting web server") http.HandleFunc("/", bus.handleEvent) glog.Fatal(http.ListenAndServe(":8080", nil)) From 72d39ae7caaf5e7aec9b58ba7dfef256b6823cd8 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Wed, 13 Jun 2018 11:44:23 -0400 Subject: [PATCH 04/19] Return full Channel/Subscription not *Spec --- pkg/buses/monitor.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index afabe062918..9573f7552b7 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -207,14 +207,21 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, } // Channel for a channel name and namespace -func (m *Monitor) Channel(channel string, namespace string) *channelsv1alpha1.ChannelSpec { - channelKey := makeChannelKeyWithNames(channel, namespace) - summary := m.getChannelSummary(channelKey) +func (m *Monitor) Channel(name string, namespace string) *channelsv1alpha1.Channel { + resourceKey := makeResourceKey(channelKind, namespace, name) + if channel, ok := m.provisionedChannels[resourceKey]; ok { + return &channel + } + return nil +} - if summary == nil { - return nil +// Subscription for a subscription name and namespace +func (m *Monitor) Subscription(name string, namespace string) *channelsv1alpha1.Subscription { + resourceKey := makeResourceKey(subscriptionKind, namespace, name) + if subscription, ok := m.provisionedSubscriptions[resourceKey]; ok { + return &subscription } - return summary.Channel + return nil } // Subscriptions for a channel name and namespace From 62bd4e8dc5235b71db54477a6020a183725ce99c Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Wed, 13 Jun 2018 13:24:05 -0400 Subject: [PATCH 05/19] Bus arguments exposed as envvars --- Gopkg.lock | 2 +- pkg/apis/channels/v1alpha1/bus_types.go | 5 ++++- .../v1alpha1/zz_generated.deepcopy.go | 13 ++++++++++++ pkg/controller/bus/controller.go | 20 +++++++++++++++++++ 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 438b4dafc9e..9763ee40cef 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -861,6 +861,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "3cc97f3e7505f0e537f526b3f11472a99f515438f217963fd1d73894dc44f669" + inputs-digest = "7bf61109495259a8320f9520372403eba48314bd7354ac1fa204c238be9e3476" solver-name = "gps-cdcl" solver-version = 1 diff --git a/pkg/apis/channels/v1alpha1/bus_types.go b/pkg/apis/channels/v1alpha1/bus_types.go index de9635baa7f..892db901b6a 100644 --- a/pkg/apis/channels/v1alpha1/bus_types.go +++ b/pkg/apis/channels/v1alpha1/bus_types.go @@ -37,7 +37,10 @@ type Bus struct { // BusSpec (what the user wants) for a bus type BusSpec struct { - // Parameters configuration params for the bus + // Arguments to configure the bus + Arguments *[]Argument `json:"arguments,omitempty"` + + // Parameters configuration params for channels on the bus Parameters *[]Parameter `json:"parameters,omitempty"` // Provisioner container definition to manage channels on the bus. diff --git a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go index 33aac9e1ee5..4f905113020 100644 --- a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go @@ -113,6 +113,19 @@ func (in *BusList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BusSpec) DeepCopyInto(out *BusSpec) { *out = *in + if in.Arguments != nil { + in, out := &in.Arguments, &out.Arguments + if *in == nil { + *out = nil + } else { + *out = new([]Argument) + if **in != nil { + in, out := *in, *out + *out = make([]Argument, len(*in)) + copy(*out, *in) + } + } + } if in.Parameters != nil { in, out := &in.Parameters, &out.Parameters if *in == nil { diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index 823d969f1e3..7fa9e1c8314 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -643,6 +643,16 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Value: bus.Name, }, ) + if bus.Spec.Arguments != nil { + for _, arg := range *bus.Spec.Arguments { + container.Env = append(container.Env, + corev1.EnvVar{ + Name: arg.Name, + Value: arg.Value, + }, + ) + } + } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: controller.BusDispatcherDeploymentName(bus.ObjectMeta.Name), @@ -741,6 +751,16 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Value: bus.Name, }, ) + if bus.Spec.Arguments != nil { + for _, arg := range *bus.Spec.Arguments { + container.Env = append(container.Env, + corev1.EnvVar{ + Name: arg.Name, + Value: arg.Value, + }, + ) + } + } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: controller.BusProvisionerDeploymentName(bus.ObjectMeta.Name), From baeb5412f06983d91e73311441fcc9eb8c33cbda Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Thu, 14 Jun 2018 13:17:48 -0400 Subject: [PATCH 06/19] Add volumes to bus --- pkg/apis/channels/v1alpha1/bus_types.go | 3 +++ .../channels/v1alpha1/zz_generated.deepcopy.go | 15 +++++++++++++++ pkg/controller/bus/controller.go | 10 ++++++++++ 3 files changed, 28 insertions(+) diff --git a/pkg/apis/channels/v1alpha1/bus_types.go b/pkg/apis/channels/v1alpha1/bus_types.go index 892db901b6a..7e479af0670 100644 --- a/pkg/apis/channels/v1alpha1/bus_types.go +++ b/pkg/apis/channels/v1alpha1/bus_types.go @@ -48,6 +48,9 @@ type BusSpec struct { // Dispatcher container definition to use for the bus data plane. Dispatcher kapi.Container `json:"dispatcher"` + + // Volumes to be mounted inside the provisioner or dispatcher containers + Volumes *[]kapi.Volume `json:"volumes,omitempty"` } // BusStatus (computed) for a bus diff --git a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go index 4f905113020..0958bda3639 100644 --- a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go @@ -151,6 +151,21 @@ func (in *BusSpec) DeepCopyInto(out *BusSpec) { } } in.Dispatcher.DeepCopyInto(&out.Dispatcher) + if in.Volumes != nil { + in, out := &in.Volumes, &out.Volumes + if *in == nil { + *out = nil + } else { + *out = new([]v1.Volume) + if **in != nil { + in, out := *in, *out + *out = make([]v1.Volume, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + } + } return } diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index 7fa9e1c8314..e07888318e7 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -653,6 +653,10 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { ) } } + volumes := []corev1.Volume{} + if bus.Spec.Volumes != nil { + volumes = *bus.Spec.Volumes + } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: controller.BusDispatcherDeploymentName(bus.ObjectMeta.Name), @@ -679,6 +683,7 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Containers: []corev1.Container{ *container, }, + Volumes: volumes, }, }, }, @@ -761,6 +766,10 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { ) } } + volumes := []corev1.Volume{} + if bus.Spec.Volumes != nil { + volumes = *bus.Spec.Volumes + } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: controller.BusProvisionerDeploymentName(bus.ObjectMeta.Name), @@ -787,6 +796,7 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Containers: []corev1.Container{ *container, }, + Volumes: volumes, }, }, }, From cef2d3b64f755eb0a8ac88e6288cc67c7a3bf4ac Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 15 Jun 2018 00:51:30 -0400 Subject: [PATCH 07/19] Cleanup race conditions --- pkg/buses/monitor.go | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 9573f7552b7..2c6ebdcde35 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -298,6 +298,11 @@ func (m *Monitor) SubscriptionParams( return params } +func (m *Monitor) RequeueSubscription(subscription channelsv1alpha1.Subscription) { + glog.Infof("Requeue subscription %q\n", subscription.Name) + m.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(subscription)) +} + // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for @@ -307,7 +312,7 @@ func (m *Monitor) Run(threadiness int, stopCh <-chan struct{}) error { defer m.workqueue.ShutDown() // Start the informer factories to begin populating the informer caches - glog.Info("Starting Bus controller") + glog.Info("Starting monitor") // Wait for the caches to be synced before starting workers glog.Info("Waiting for informer caches to sync") @@ -372,6 +377,7 @@ func (m *Monitor) processNextWorkItem() bool { // Run the syncHandler, passing it the namespace/name string of the // Bus resource to be synced. if err := m.syncHandler(key); err != nil { + m.workqueue.AddRateLimited(obj) return fmt.Errorf("error syncing monitor '%s': %s", key, err.Error()) } // Finally, if no error occurs we Forget this item so it does not @@ -425,7 +431,10 @@ func (m *Monitor) syncBus(namespace string, name string) error { if err != nil { // The Bus resource may no longer exist if errors.IsNotFound(err) { - m.removeBus(namespace, name) + err = m.removeBus(namespace, name) + if err != nil { + return err + } return nil } @@ -447,7 +456,10 @@ func (m *Monitor) syncChannel(namespace string, name string) error { if err != nil { // The Channel resource may no longer exist if errors.IsNotFound(err) { - m.removeChannel(namespace, name) + err = m.removeChannel(namespace, name) + if err != nil { + return err + } return nil } @@ -469,7 +481,10 @@ func (m *Monitor) syncSubscription(namespace string, name string) error { if err != nil { // The Subscription resource may no longer exist if errors.IsNotFound(err) { - m.removeSubscription(namespace, name) + err = m.removeSubscription(namespace, name) + if err != nil { + return err + } return nil } @@ -581,6 +596,12 @@ func (m *Monitor) isChannelKnown(subscription channelsv1alpha1.Subscription) boo return summary != nil && summary.Channel != nil } +func (m *Monitor) isSubscriptionProvisioned(subscription channelsv1alpha1.Subscription) bool { + resourceKey := makeResourceKey(subscriptionKind, subscription.Namespace, subscription.Name) + _, ok := m.provisionedSubscriptions[resourceKey] + return ok +} + func (m *Monitor) isSubscriptionForBus(subscription channelsv1alpha1.Subscription) bool { channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getChannelSummary(channelKey) @@ -604,8 +625,11 @@ func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subsc if !m.isChannelKnown(subscription) { return fmt.Errorf("Unknown channel %q for subscription", subscription.Spec.Channel) } + if !m.isSubscriptionForBus(subscription) { + return nil + } - if m.isSubscriptionForBus(subscription) && !reflect.DeepEqual(old.Subscription, new.Subscription) { + if !m.isSubscriptionProvisioned(subscription) || !reflect.DeepEqual(old.Subscription, new.Subscription) { err := m.handler.onSubscribe(subscription) if err != nil { return err From 612a8e863f6a416153ce28c20b0e1c7836170fc1 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 15 Jun 2018 10:35:46 -0400 Subject: [PATCH 08/19] Split hello sample channel and subscription into separate resources --- sample/hello/hello-channel.yaml | 10 +--------- sample/hello/hello-subscription.yaml | 7 +++++++ 2 files changed, 8 insertions(+), 9 deletions(-) create mode 100644 sample/hello/hello-subscription.yaml diff --git a/sample/hello/hello-channel.yaml b/sample/hello/hello-channel.yaml index 482f12da2de..7791e7168ed 100644 --- a/sample/hello/hello-channel.yaml +++ b/sample/hello/hello-channel.yaml @@ -4,12 +4,4 @@ metadata: name: aloha spec: bus: stub - ---- -apiVersion: channels.knative.dev/v1alpha1 -kind: Subscription -metadata: - name: aloha2hello -spec: - channel: aloha - subscriber: hello-00001-service + \ No newline at end of file diff --git a/sample/hello/hello-subscription.yaml b/sample/hello/hello-subscription.yaml new file mode 100644 index 00000000000..ed2cf6338cf --- /dev/null +++ b/sample/hello/hello-subscription.yaml @@ -0,0 +1,7 @@ +apiVersion: channels.knative.dev/v1alpha1 +kind: Subscription +metadata: + name: aloha2hello +spec: + channel: aloha + subscriber: hello-00001-service From 11ef741e410c97c27943b5a73bb89f2557bb0247 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 15 Jun 2018 10:43:15 -0400 Subject: [PATCH 09/19] Revert bus arguments, use envvar instead --- pkg/apis/channels/v1alpha1/bus_types.go | 3 --- .../v1alpha1/zz_generated.deepcopy.go | 13 ------------ pkg/controller/bus/controller.go | 20 ------------------- 3 files changed, 36 deletions(-) diff --git a/pkg/apis/channels/v1alpha1/bus_types.go b/pkg/apis/channels/v1alpha1/bus_types.go index 7e479af0670..75e672eb76b 100644 --- a/pkg/apis/channels/v1alpha1/bus_types.go +++ b/pkg/apis/channels/v1alpha1/bus_types.go @@ -37,9 +37,6 @@ type Bus struct { // BusSpec (what the user wants) for a bus type BusSpec struct { - // Arguments to configure the bus - Arguments *[]Argument `json:"arguments,omitempty"` - // Parameters configuration params for channels on the bus Parameters *[]Parameter `json:"parameters,omitempty"` diff --git a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go index 0958bda3639..468851ee3c4 100644 --- a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go @@ -113,19 +113,6 @@ func (in *BusList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BusSpec) DeepCopyInto(out *BusSpec) { *out = *in - if in.Arguments != nil { - in, out := &in.Arguments, &out.Arguments - if *in == nil { - *out = nil - } else { - *out = new([]Argument) - if **in != nil { - in, out := *in, *out - *out = make([]Argument, len(*in)) - copy(*out, *in) - } - } - } if in.Parameters != nil { in, out := &in.Parameters, &out.Parameters if *in == nil { diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index e07888318e7..ff5113a716f 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -643,16 +643,6 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Value: bus.Name, }, ) - if bus.Spec.Arguments != nil { - for _, arg := range *bus.Spec.Arguments { - container.Env = append(container.Env, - corev1.EnvVar{ - Name: arg.Name, - Value: arg.Value, - }, - ) - } - } volumes := []corev1.Volume{} if bus.Spec.Volumes != nil { volumes = *bus.Spec.Volumes @@ -756,16 +746,6 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Value: bus.Name, }, ) - if bus.Spec.Arguments != nil { - for _, arg := range *bus.Spec.Arguments { - container.Env = append(container.Env, - corev1.EnvVar{ - Name: arg.Name, - Value: arg.Value, - }, - ) - } - } volumes := []corev1.Volume{} if bus.Spec.Volumes != nil { volumes = *bus.Spec.Volumes From 9f261d1aae360a8f0fb9466540f31f961be1b56d Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 15 Jun 2018 10:48:01 -0400 Subject: [PATCH 10/19] Remove length check --- pkg/buses/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 2c6ebdcde35..191b56458cf 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -240,7 +240,7 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a } m.mutex.Lock() - subscriptions := make([]channelsv1alpha1.SubscriptionSpec, len(summary.Subscriptions)-1) + subscriptions := []channelsv1alpha1.SubscriptionSpec{} for _, subscription := range summary.Subscriptions { subscriptions = append(subscriptions, subscription.Subscription) } From a7449c1e4b23e5492ff804c72c06032480f091fc Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 15 Jun 2018 11:20:54 -0400 Subject: [PATCH 11/19] Cleanup Argument/Parameter relationship --- pkg/buses/monitor.go | 73 +++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 191b56458cf..965b5215bdd 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -250,52 +250,57 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a } // ChannelParams resolve parameters for a channel -func (m *Monitor) ChannelParams(channel channelsv1alpha1.ChannelSpec) map[string]string { - params := make(map[string]string) - - // apply bus defaults - if m.bus.Parameters != nil { - for _, param := range *m.bus.Parameters { - if param.Default != nil { - params[param.Name] = *param.Default - } - } - } - // apply channel arguments - if channel.Arguments != nil { - for _, arg := range *channel.Arguments { - // TODO ignore arguments not defined by parameters - params[arg.Name] = arg.Value - } - } - - return params +func (m *Monitor) ChannelParams(channel channelsv1alpha1.ChannelSpec) (map[string]string, error) { + return m.resolveArguments(m.bus.Parameters, channel.Arguments) } // SubscriptionParams resolve parameters for a subscription func (m *Monitor) SubscriptionParams( channel channelsv1alpha1.ChannelSpec, subscription channelsv1alpha1.SubscriptionSpec, -) map[string]string { - params := m.ChannelParams(channel) - - // apply channel defaults - if channel.Parameters != nil { - for _, param := range *channel.Parameters { - if _, ok := params[param.Name]; !ok && param.Default != nil { - params[param.Name] = *param.Default +) (map[string]string, error) { + return m.resolveArguments(channel.Parameters, subscription.Arguments) +} + +func (m *Monitor) resolveArguments(parameters *[]channelsv1alpha1.Parameter, arguments *[]channelsv1alpha1.Argument) (map[string]string, error) { + resolved := make(map[string]string) + known := make(map[string]interface{}) + required := make(map[string]interface{}) + + // apply parameters + if parameters != nil { + for _, param := range *parameters { + known[param.Name] = true + if param.Default != nil { + resolved[param.Name] = *param.Default + } else { + required[param.Name] = true } } } - // apply subscription arguments - if subscription.Arguments != nil { - for _, arg := range *subscription.Arguments { - // TODO ignore arguments not defined by parameters - params[arg.Name] = arg.Value + // apply arguments + if arguments != nil { + for _, arg := range *arguments { + if _, ok := known[arg.Name]; ok { + // ignore arguments not defined by parameters + glog.Warningf("Skipping unknown argument: %s\n", arg.Name) + continue + } + delete(required, arg.Name) + resolved[arg.Name] = arg.Value + } + } + + // check for missing arguments + if len(required) != 0 { + missing := []string{} + for name := range required { + missing = append(missing, name) } + return nil, fmt.Errorf("missing required arguments: %v", missing) } - return params + return resolved, nil } func (m *Monitor) RequeueSubscription(subscription channelsv1alpha1.Subscription) { From 065449634186ca061fbff940ca8b459184486747 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 15 Jun 2018 11:40:23 -0400 Subject: [PATCH 12/19] Expose attributes directly to ProvisionFunc and SubscribeFunc --- pkg/buses/monitor.go | 61 ++++++++++++++++++++++++++---------------- pkg/buses/stub/main.go | 4 +-- 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 965b5215bdd..83b04b1c5da 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -58,44 +58,59 @@ type Monitor struct { mutex *sync.Mutex } +type Attributes = map[string]string + // MonitorEventHandlerFuncs handler functions for channel and subscription provisioning type MonitorEventHandlerFuncs struct { BusFunc func(bus channelsv1alpha1.Bus) error - ProvisionFunc func(channel channelsv1alpha1.Channel) error + ProvisionFunc func(channel channelsv1alpha1.Channel, attributes Attributes) error UnprovisionFunc func(channel channelsv1alpha1.Channel) error - SubscribeFunc func(subscription channelsv1alpha1.Subscription) error + SubscribeFunc func(subscription channelsv1alpha1.Subscription, attributes Attributes) error UnsubscribeFunc func(subscription channelsv1alpha1.Subscription) error } -func (h MonitorEventHandlerFuncs) onBus(bus channelsv1alpha1.Bus) error { +func (h MonitorEventHandlerFuncs) onBus(bus channelsv1alpha1.Bus, monitor *Monitor) error { if h.BusFunc != nil { return h.BusFunc(bus) } return nil } -func (h MonitorEventHandlerFuncs) onProvision(channel channelsv1alpha1.Channel) error { +func (h MonitorEventHandlerFuncs) onProvision(channel channelsv1alpha1.Channel, monitor *Monitor) error { if h.ProvisionFunc != nil { - return h.ProvisionFunc(channel) + attributes, err := monitor.ChannelAttributes(channel.Spec) + if err != nil { + return err + } + return h.ProvisionFunc(channel, attributes) } return nil } -func (h MonitorEventHandlerFuncs) onUnprovision(channel channelsv1alpha1.Channel) error { +func (h MonitorEventHandlerFuncs) onUnprovision(channel channelsv1alpha1.Channel, monitor *Monitor) error { if h.UnprovisionFunc != nil { return h.UnprovisionFunc(channel) } return nil } -func (h MonitorEventHandlerFuncs) onSubscribe(subscription channelsv1alpha1.Subscription) error { +func (h MonitorEventHandlerFuncs) onSubscribe(subscription channelsv1alpha1.Subscription, monitor *Monitor) error { if h.SubscribeFunc != nil { - return h.SubscribeFunc(subscription) + channel := monitor.Channel(subscription.Spec.Channel, subscription.Namespace) + if channel == nil { + // should never get here, but to be safe + return fmt.Errorf("unknown channel %q for subscription", subscription.Spec.Channel) + } + attributes, err := monitor.SubscriptionAttributes(channel.Spec, subscription.Spec) + if err != nil { + return err + } + return h.SubscribeFunc(subscription, attributes) } return nil } -func (h MonitorEventHandlerFuncs) onUnsubscribe(subscription channelsv1alpha1.Subscription) error { +func (h MonitorEventHandlerFuncs) onUnsubscribe(subscription channelsv1alpha1.Subscription, monitor *Monitor) error { if h.UnsubscribeFunc != nil { return h.UnsubscribeFunc(subscription) } @@ -249,21 +264,21 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a return &subscriptions } -// ChannelParams resolve parameters for a channel -func (m *Monitor) ChannelParams(channel channelsv1alpha1.ChannelSpec) (map[string]string, error) { - return m.resolveArguments(m.bus.Parameters, channel.Arguments) +// ChannelParams resolve attributes for a channel +func (m *Monitor) ChannelAttributes(channel channelsv1alpha1.ChannelSpec) (Attributes, error) { + return m.resolveAttributes(m.bus.Parameters, channel.Arguments) } -// SubscriptionParams resolve parameters for a subscription -func (m *Monitor) SubscriptionParams( +// SubscriptionParams resolve attributes for a subscription +func (m *Monitor) SubscriptionAttributes( channel channelsv1alpha1.ChannelSpec, subscription channelsv1alpha1.SubscriptionSpec, -) (map[string]string, error) { - return m.resolveArguments(channel.Parameters, subscription.Arguments) +) (Attributes, error) { + return m.resolveAttributes(channel.Parameters, subscription.Arguments) } -func (m *Monitor) resolveArguments(parameters *[]channelsv1alpha1.Parameter, arguments *[]channelsv1alpha1.Argument) (map[string]string, error) { - resolved := make(map[string]string) +func (m *Monitor) resolveAttributes(parameters *[]channelsv1alpha1.Parameter, arguments *[]channelsv1alpha1.Argument) (Attributes, error) { + resolved := make(Attributes) known := make(map[string]interface{}) required := make(map[string]interface{}) @@ -532,7 +547,7 @@ func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) error { if !reflect.DeepEqual(m.bus, bus.Spec) { m.bus = &bus.Spec - err := m.handler.onBus(bus) + err := m.handler.onBus(bus, m) if err != nil { return err } @@ -562,7 +577,7 @@ func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error m.mutex.Unlock() if m.isChannelForBus(channel) && !reflect.DeepEqual(old, new) { - err := m.handler.onProvision(channel) + err := m.handler.onProvision(channel, m) if err != nil { return err } @@ -586,7 +601,7 @@ func (m *Monitor) removeChannel(namespace string, name string) error { summary.Channel = nil m.mutex.Unlock() - err := m.handler.onUnprovision(channel) + err := m.handler.onUnprovision(channel, m) if err != nil { return err } @@ -635,7 +650,7 @@ func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subsc } if !m.isSubscriptionProvisioned(subscription) || !reflect.DeepEqual(old.Subscription, new.Subscription) { - err := m.handler.onSubscribe(subscription) + err := m.handler.onSubscribe(subscription, m) if err != nil { return err } @@ -660,7 +675,7 @@ func (m *Monitor) removeSubscription(namespace string, name string) error { delete(summary.Subscriptions, subscriptionKey) m.mutex.Unlock() - err := m.handler.onUnsubscribe(subscription) + err := m.handler.onUnsubscribe(subscription, m) if err != nil { return err } diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index 81a678e6650..c0d57306af3 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -158,7 +158,7 @@ func main() { informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) monitor := buses.NewMonitor(name, informerFactory, buses.MonitorEventHandlerFuncs{ - ProvisionFunc: func(channel channelsv1alpha1.Channel) error { + ProvisionFunc: func(channel channelsv1alpha1.Channel, attributes buses.Attributes) error { glog.Infof("Provision channel %q\n", channel.Name) return nil }, @@ -166,7 +166,7 @@ func main() { glog.Infof("Unprovision channel %q\n", channel.Name) return nil }, - SubscribeFunc: func(subscription channelsv1alpha1.Subscription) error { + SubscribeFunc: func(subscription channelsv1alpha1.Subscription, attributes buses.Attributes) error { glog.Infof("Subscribe %q to %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel) return nil }, From a468a1ef2079536deee346844135730f705ba122 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Mon, 18 Jun 2018 12:17:45 -0400 Subject: [PATCH 13/19] Define subscription params on the bus --- pkg/apis/channels/v1alpha1/bus_types.go | 14 ++++- pkg/apis/channels/v1alpha1/channel_types.go | 5 +- .../channels/v1alpha1/subscription_types.go | 2 +- .../v1alpha1/zz_generated.deepcopy.go | 61 +++++++++++++------ pkg/buses/monitor.go | 32 +++++----- 5 files changed, 73 insertions(+), 41 deletions(-) diff --git a/pkg/apis/channels/v1alpha1/bus_types.go b/pkg/apis/channels/v1alpha1/bus_types.go index 75e672eb76b..17e9fdc3975 100644 --- a/pkg/apis/channels/v1alpha1/bus_types.go +++ b/pkg/apis/channels/v1alpha1/bus_types.go @@ -37,8 +37,8 @@ type Bus struct { // BusSpec (what the user wants) for a bus type BusSpec struct { - // Parameters configuration params for channels on the bus - Parameters *[]Parameter `json:"parameters,omitempty"` + // Parameters exposed by the bus for channels and subscriptions + Parameters *BusParameters `json:"parameters,omitempty"` // Provisioner container definition to manage channels on the bus. Provisioner *kapi.Container `json:"provisioner,omitempty"` @@ -50,6 +50,16 @@ type BusSpec struct { Volumes *[]kapi.Volume `json:"volumes,omitempty"` } +// BusParameters parameters exposed by the bus +type BusParameters struct { + + // Channel configuration params for channels on the bus + Channel *[]Parameter `json:"channel,omitempty"` + + // Subscription configuration params for subscriptions on the bus + Subscription *[]Parameter `json:"subscription,omitempty"` +} + // BusStatus (computed) for a bus type BusStatus struct { } diff --git a/pkg/apis/channels/v1alpha1/channel_types.go b/pkg/apis/channels/v1alpha1/channel_types.go index 82095e3da75..37c02f4a178 100644 --- a/pkg/apis/channels/v1alpha1/channel_types.go +++ b/pkg/apis/channels/v1alpha1/channel_types.go @@ -39,11 +39,8 @@ type ChannelSpec struct { // Name of the bus backing this channel (optional) Bus string `json:"bus` - // Arguments configuration arguments for the bus + // Arguments configuration arguments for the channel Arguments *[]Argument `json:"arguments,omitempty"` - - // Parameters configuration params for the channel - Parameters *[]Parameter `json:"parameters,omitempty"` } // ChannelStatus (computed) for a channel diff --git a/pkg/apis/channels/v1alpha1/subscription_types.go b/pkg/apis/channels/v1alpha1/subscription_types.go index 19ea529978b..9244d44e151 100644 --- a/pkg/apis/channels/v1alpha1/subscription_types.go +++ b/pkg/apis/channels/v1alpha1/subscription_types.go @@ -42,7 +42,7 @@ type SubscriptionSpec struct { // Name of the subscriber service Subscriber string `json:"subscriber"` - // Arguments for the channel + // Arguments for the subscription Arguments *[]Argument `json:"arguments,omitempty"` } diff --git a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go index 468851ee3c4..e9b763a38cf 100644 --- a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go @@ -111,10 +111,25 @@ func (in *BusList) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BusSpec) DeepCopyInto(out *BusSpec) { +func (in *BusParameters) DeepCopyInto(out *BusParameters) { *out = *in - if in.Parameters != nil { - in, out := &in.Parameters, &out.Parameters + if in.Channel != nil { + in, out := &in.Channel, &out.Channel + if *in == nil { + *out = nil + } else { + *out = new([]Parameter) + if **in != nil { + in, out := *in, *out + *out = make([]Parameter, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + } + } + if in.Subscription != nil { + in, out := &in.Subscription, &out.Subscription if *in == nil { *out = nil } else { @@ -128,6 +143,31 @@ func (in *BusSpec) DeepCopyInto(out *BusSpec) { } } } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BusParameters. +func (in *BusParameters) DeepCopy() *BusParameters { + if in == nil { + return nil + } + out := new(BusParameters) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BusSpec) DeepCopyInto(out *BusSpec) { + *out = *in + if in.Parameters != nil { + in, out := &in.Parameters, &out.Parameters + if *in == nil { + *out = nil + } else { + *out = new(BusParameters) + (*in).DeepCopyInto(*out) + } + } if in.Provisioner != nil { in, out := &in.Provisioner, &out.Provisioner if *in == nil { @@ -267,21 +307,6 @@ func (in *ChannelSpec) DeepCopyInto(out *ChannelSpec) { } } } - if in.Parameters != nil { - in, out := &in.Parameters, &out.Parameters - if *in == nil { - *out = nil - } else { - *out = new([]Parameter) - if **in != nil { - in, out := *in, *out - *out = make([]Parameter, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } - } - } return } diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 83b04b1c5da..da2edde8e6e 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -78,7 +78,7 @@ func (h MonitorEventHandlerFuncs) onBus(bus channelsv1alpha1.Bus, monitor *Monit func (h MonitorEventHandlerFuncs) onProvision(channel channelsv1alpha1.Channel, monitor *Monitor) error { if h.ProvisionFunc != nil { - attributes, err := monitor.ChannelAttributes(channel.Spec) + attributes, err := monitor.channelAttributes(channel.Spec) if err != nil { return err } @@ -96,12 +96,7 @@ func (h MonitorEventHandlerFuncs) onUnprovision(channel channelsv1alpha1.Channel func (h MonitorEventHandlerFuncs) onSubscribe(subscription channelsv1alpha1.Subscription, monitor *Monitor) error { if h.SubscribeFunc != nil { - channel := monitor.Channel(subscription.Spec.Channel, subscription.Namespace) - if channel == nil { - // should never get here, but to be safe - return fmt.Errorf("unknown channel %q for subscription", subscription.Spec.Channel) - } - attributes, err := monitor.SubscriptionAttributes(channel.Spec, subscription.Spec) + attributes, err := monitor.subscriptionAttributes(subscription.Spec) if err != nil { return err } @@ -264,17 +259,22 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a return &subscriptions } -// ChannelParams resolve attributes for a channel -func (m *Monitor) ChannelAttributes(channel channelsv1alpha1.ChannelSpec) (Attributes, error) { - return m.resolveAttributes(m.bus.Parameters, channel.Arguments) +func (m *Monitor) channelAttributes(channel channelsv1alpha1.ChannelSpec) (Attributes, error) { + busParameters := m.bus.Parameters + var parameters *[]channelsv1alpha1.Parameter + if busParameters != nil { + parameters = busParameters.Channel + } + return m.resolveAttributes(parameters, channel.Arguments) } -// SubscriptionParams resolve attributes for a subscription -func (m *Monitor) SubscriptionAttributes( - channel channelsv1alpha1.ChannelSpec, - subscription channelsv1alpha1.SubscriptionSpec, -) (Attributes, error) { - return m.resolveAttributes(channel.Parameters, subscription.Arguments) +func (m *Monitor) subscriptionAttributes(subscription channelsv1alpha1.SubscriptionSpec) (Attributes, error) { + busParameters := m.bus.Parameters + var parameters *[]channelsv1alpha1.Parameter + if busParameters != nil { + parameters = busParameters.Subscription + } + return m.resolveAttributes(parameters, subscription.Arguments) } func (m *Monitor) resolveAttributes(parameters *[]channelsv1alpha1.Parameter, arguments *[]channelsv1alpha1.Argument) (Attributes, error) { From fb308f7bc4164bac2adad57a82420a26613b43ca Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Mon, 18 Jun 2018 22:47:19 -0400 Subject: [PATCH 14/19] Emit events processed resources --- config/clusterrole.yaml | 5 +- pkg/buses/monitor.go | 103 +++++++++++++++++++++++++++++++++------- pkg/buses/stub/main.go | 8 +++- 3 files changed, 97 insertions(+), 19 deletions(-) diff --git a/config/clusterrole.yaml b/config/clusterrole.yaml index 5e37fa384c5..3ca9065a2de 100644 --- a/config/clusterrole.yaml +++ b/config/clusterrole.yaml @@ -18,4 +18,7 @@ metadata: rules: - apiGroups: ["channels.knative.dev"] resources: ["buses", "channels", "subscriptions"] - verbs: ["get", "watch", "list"] \ No newline at end of file + verbs: ["get", "watch", "list"] +- apiGroups: [""] + resources: ["events"] + verbs: ["create"] diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index da2edde8e6e..c71f723ebb2 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -25,19 +25,34 @@ import ( "github.com/golang/glog" channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + channelscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/pkg/client/informers/externalversions" listers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ) const ( + Dispatcher = "dispatcher" + Provisioner = "provisioner" + busKind = "Bus" channelKind = "Channel" subscriptionKind = "Subscription" + + // SuccessSynced is used as part of the Event 'reason' when a resource is synced + successSynced = "Synced" + // ErrResourceSync is used as part of the Event 'reason' when a resource fails + // to sync. + errResourceSync = "ErrResourceSync" ) // Monitor utility to manage channels and subscriptions for a bus @@ -51,11 +66,20 @@ type Monitor struct { channelsSynced cache.InformerSynced subscriptionsLister listers.SubscriptionLister subscriptionsSynced cache.InformerSynced - workqueue workqueue.RateLimitingInterface cache map[channelKey]*channelSummary provisionedChannels map[resourceKey]channelsv1alpha1.Channel provisionedSubscriptions map[resourceKey]channelsv1alpha1.Subscription mutex *sync.Mutex + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder } type Attributes = map[string]string @@ -71,7 +95,13 @@ type MonitorEventHandlerFuncs struct { func (h MonitorEventHandlerFuncs) onBus(bus channelsv1alpha1.Bus, monitor *Monitor) error { if h.BusFunc != nil { - return h.BusFunc(bus) + err := h.BusFunc(bus) + if err != nil { + monitor.recorder.Eventf(&bus, corev1.EventTypeWarning, errResourceSync, "Error syncing bus: %s", err) + } else { + monitor.recorder.Event(&bus, corev1.EventTypeNormal, successSynced, "Bus synched successfully") + } + return err } return nil } @@ -82,14 +112,26 @@ func (h MonitorEventHandlerFuncs) onProvision(channel channelsv1alpha1.Channel, if err != nil { return err } - return h.ProvisionFunc(channel, attributes) + err = h.ProvisionFunc(channel, attributes) + if err != nil { + monitor.recorder.Eventf(&channel, corev1.EventTypeWarning, errResourceSync, "Error provisoning channel: %s", err) + } else { + monitor.recorder.Event(&channel, corev1.EventTypeNormal, successSynced, "Channel provisioned successfully") + } + return err } return nil } func (h MonitorEventHandlerFuncs) onUnprovision(channel channelsv1alpha1.Channel, monitor *Monitor) error { if h.UnprovisionFunc != nil { - return h.UnprovisionFunc(channel) + err := h.UnprovisionFunc(channel) + if err != nil { + monitor.recorder.Eventf(&channel, corev1.EventTypeWarning, errResourceSync, "Error unprovisioning channel: %s", err) + } else { + monitor.recorder.Event(&channel, corev1.EventTypeNormal, successSynced, "Channel unprovisioned successfully") + } + return err } return nil } @@ -100,14 +142,26 @@ func (h MonitorEventHandlerFuncs) onSubscribe(subscription channelsv1alpha1.Subs if err != nil { return err } - return h.SubscribeFunc(subscription, attributes) + err = h.SubscribeFunc(subscription, attributes) + if err != nil { + monitor.recorder.Eventf(&subscription, corev1.EventTypeWarning, errResourceSync, "Error subscribing: %s", err) + } else { + monitor.recorder.Event(&subscription, corev1.EventTypeNormal, successSynced, "Subscribed successfully") + } + return err } return nil } func (h MonitorEventHandlerFuncs) onUnsubscribe(subscription channelsv1alpha1.Subscription, monitor *Monitor) error { if h.UnsubscribeFunc != nil { - return h.UnsubscribeFunc(subscription) + err := h.UnsubscribeFunc(subscription) + if err != nil { + monitor.recorder.Eventf(&subscription, corev1.EventTypeWarning, errResourceSync, "Error unsubscribing: %s", err) + } else { + monitor.recorder.Event(&subscription, corev1.EventTypeNormal, successSynced, "Unsubscribed successfully") + } + return err } return nil } @@ -122,12 +176,28 @@ type subscriptionSummary struct { } // NewMonitor creates a monitor for a bus -func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, handler MonitorEventHandlerFuncs) *Monitor { +func NewMonitor( + busName, role string, + kubeclientset kubernetes.Interface, + informerFactory informers.SharedInformerFactory, + handler MonitorEventHandlerFuncs, +) *Monitor { + component := fmt.Sprintf("%s-%s", busName, role) busInformer := informerFactory.Channels().V1alpha1().Buses() channelInformer := informerFactory.Channels().V1alpha1().Channels() subscriptionInformer := informerFactory.Channels().V1alpha1().Subscriptions() + // Create event broadcaster + // Add bus-controller types to the default Kubernetes Scheme so Events can be + // logged for bus-controller types. + channelscheme.AddToScheme(scheme.Scheme) + glog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: component}) + monitor := &Monitor{ busName: busName, handler: handler, @@ -139,11 +209,13 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, channelsSynced: channelInformer.Informer().HasSynced, subscriptionsLister: subscriptionInformer.Lister(), subscriptionsSynced: subscriptionInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Monitor"), cache: make(map[channelKey]*channelSummary), provisionedChannels: make(map[resourceKey]channelsv1alpha1.Channel), provisionedSubscriptions: make(map[resourceKey]channelsv1alpha1.Subscription), mutex: &sync.Mutex{}, + + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Monitor"), + recorder: recorder, } glog.Info("Setting up event handlers") @@ -426,6 +498,11 @@ func (m *Monitor) syncHandler(key string) error { return nil } + if m.bus == nil && kind != busKind { + // don't attempt tp sync until we have seen the bus for this monitor + return fmt.Errorf("Unknown bus for monitor") + } + switch kind { case busKind: err = m.syncBus(namespace, name) @@ -451,10 +528,7 @@ func (m *Monitor) syncBus(namespace string, name string) error { if err != nil { // The Bus resource may no longer exist if errors.IsNotFound(err) { - err = m.removeBus(namespace, name) - if err != nil { - return err - } + // nothing to do return nil } @@ -556,11 +630,6 @@ func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) error { return nil } -func (m *Monitor) removeBus(namespace string, name string) error { - // nothing to do - return nil -} - func (m *Monitor) isChannelForBus(channel channelsv1alpha1.Channel) bool { return channel.Spec.Bus == m.busName } diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index c0d57306af3..d5c33364522 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -32,6 +32,7 @@ import ( clientset "github.com/knative/eventing/pkg/client/clientset/versioned" informers "github.com/knative/eventing/pkg/client/informers/externalversions" "github.com/knative/eventing/pkg/signals" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) @@ -149,6 +150,11 @@ func main() { glog.Fatalf("Error building kubeconfig: %s", err.Error()) } + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + client, err := clientset.NewForConfig(cfg) if err != nil { glog.Fatalf("Error building clientset: %s", err.Error()) @@ -157,7 +163,7 @@ func main() { name := os.Getenv("BUS_NAME") informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) - monitor := buses.NewMonitor(name, informerFactory, buses.MonitorEventHandlerFuncs{ + monitor := buses.NewMonitor(name, buses.Dispatcher, kubeClient, informerFactory, buses.MonitorEventHandlerFuncs{ ProvisionFunc: func(channel channelsv1alpha1.Channel, attributes buses.Attributes) error { glog.Infof("Provision channel %q\n", channel.Name) return nil From aa822d6f578f8cfe01112cdcb17747b8ef4b01a1 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Mon, 18 Jun 2018 23:25:13 -0400 Subject: [PATCH 15/19] Lookup target bus before processing informer queue --- pkg/buses/monitor.go | 32 +++++++++++++++++--------------- pkg/buses/stub/main.go | 7 +++++-- pkg/controller/bus/controller.go | 8 ++++++++ 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index c71f723ebb2..e7a04aab908 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -57,9 +57,8 @@ const ( // Monitor utility to manage channels and subscriptions for a bus type Monitor struct { - busName string + bus *channelsv1alpha1.Bus handler MonitorEventHandlerFuncs - bus *channelsv1alpha1.BusSpec busesLister listers.BusLister busesSynced cache.InformerSynced channelsLister listers.ChannelLister @@ -177,13 +176,11 @@ type subscriptionSummary struct { // NewMonitor creates a monitor for a bus func NewMonitor( - busName, role string, + component string, kubeclientset kubernetes.Interface, informerFactory informers.SharedInformerFactory, handler MonitorEventHandlerFuncs, ) *Monitor { - component := fmt.Sprintf("%s-%s", busName, role) - busInformer := informerFactory.Channels().V1alpha1().Buses() channelInformer := informerFactory.Channels().V1alpha1().Channels() subscriptionInformer := informerFactory.Channels().V1alpha1().Subscriptions() @@ -199,10 +196,9 @@ func NewMonitor( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: component}) monitor := &Monitor{ - busName: busName, + bus: nil, handler: handler, - bus: nil, busesLister: busInformer.Lister(), busesSynced: busInformer.Informer().HasSynced, channelsLister: channelInformer.Lister(), @@ -316,7 +312,7 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a return nil } - if summary.Channel.Bus != m.busName { + if summary.Channel.Bus != m.bus.Name { // the channel is not for this bus return nil } @@ -332,7 +328,7 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a } func (m *Monitor) channelAttributes(channel channelsv1alpha1.ChannelSpec) (Attributes, error) { - busParameters := m.bus.Parameters + busParameters := m.bus.Spec.Parameters var parameters *[]channelsv1alpha1.Parameter if busParameters != nil { parameters = busParameters.Channel @@ -341,7 +337,7 @@ func (m *Monitor) channelAttributes(channel channelsv1alpha1.ChannelSpec) (Attri } func (m *Monitor) subscriptionAttributes(subscription channelsv1alpha1.SubscriptionSpec) (Attributes, error) { - busParameters := m.bus.Parameters + busParameters := m.bus.Spec.Parameters var parameters *[]channelsv1alpha1.Parameter if busParameters != nil { parameters = busParameters.Subscription @@ -399,7 +395,7 @@ func (m *Monitor) RequeueSubscription(subscription channelsv1alpha1.Subscription // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (m *Monitor) Run(threadiness int, stopCh <-chan struct{}) error { +func (m *Monitor) Run(namespace, name string, threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer m.workqueue.ShutDown() @@ -412,6 +408,12 @@ func (m *Monitor) Run(threadiness int, stopCh <-chan struct{}) error { return fmt.Errorf("failed to wait for caches to sync") } + bus, err := m.busesLister.Buses(namespace).Get(name) + if err != nil { + glog.Fatalf("Unknown bus '%s/%s'", namespace, name) + } + m.bus = bus + glog.Info("Starting workers") // Launch two workers to process Bus resources for i := 0; i < threadiness; i++ { @@ -614,13 +616,13 @@ func (m *Monitor) getOrCreateChannelSummary(key channelKey) *channelSummary { } func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) error { - if bus.Name != m.busName { + if bus.Name != m.bus.Name { // this is not our bus return nil } if !reflect.DeepEqual(m.bus, bus.Spec) { - m.bus = &bus.Spec + m.bus = &bus err := m.handler.onBus(bus, m) if err != nil { return err @@ -631,7 +633,7 @@ func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) error { } func (m *Monitor) isChannelForBus(channel channelsv1alpha1.Channel) bool { - return channel.Spec.Bus == m.busName + return channel.Spec.Bus == m.bus.Name } func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error { @@ -694,7 +696,7 @@ func (m *Monitor) isSubscriptionProvisioned(subscription channelsv1alpha1.Subscr func (m *Monitor) isSubscriptionForBus(subscription channelsv1alpha1.Subscription) bool { channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getChannelSummary(channelKey) - return summary != nil && summary.Channel != nil && summary.Channel.Bus == m.busName + return summary != nil && summary.Channel != nil && summary.Channel.Bus == m.bus.Name } func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subscription) error { diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index d5c33364522..a3338c22d32 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -19,6 +19,7 @@ package main import ( "bytes" "flag" + "fmt" "io/ioutil" "net/http" "net/url" @@ -160,10 +161,12 @@ func main() { glog.Fatalf("Error building clientset: %s", err.Error()) } + namespace := os.Getenv("BUS_NAMESPACE") name := os.Getenv("BUS_NAME") + component := fmt.Sprintf("%s-%s", name, buses.Dispatcher) informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) - monitor := buses.NewMonitor(name, buses.Dispatcher, kubeClient, informerFactory, buses.MonitorEventHandlerFuncs{ + monitor := buses.NewMonitor(component, kubeClient, informerFactory, buses.MonitorEventHandlerFuncs{ ProvisionFunc: func(channel channelsv1alpha1.Channel, attributes buses.Attributes) error { glog.Infof("Provision channel %q\n", channel.Name) return nil @@ -185,7 +188,7 @@ func main() { go informerFactory.Start(stopCh) go func() { - if err := monitor.Run(threadsPerMonitor, stopCh); err != nil { + if err := monitor.Run(namespace, name, threadsPerMonitor, stopCh); err != nil { glog.Fatalf("Error running monitor: %s", err.Error()) } }() diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index ff5113a716f..43990f84842 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -638,6 +638,10 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Name: "PORT", Value: "8080", }, + corev1.EnvVar{ + Name: "BUS_NAMESPACE", + Value: bus.Namespace, + }, corev1.EnvVar{ Name: "BUS_NAME", Value: bus.Name, @@ -741,6 +745,10 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { one := int32(1) container := bus.Spec.Provisioner.DeepCopy() container.Env = append(container.Env, + corev1.EnvVar{ + Name: "BUS_NAMESPACE", + Value: bus.Namespace, + }, corev1.EnvVar{ Name: "BUS_NAME", Value: bus.Name, From ce5debda07a827193d216156acbd828de79ed055 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 19 Jun 2018 00:02:52 -0400 Subject: [PATCH 16/19] Drop resourceKey, normalize resource types --- pkg/buses/monitor.go | 150 +++++++++++++++++++---------------------- pkg/buses/stub/main.go | 8 +-- 2 files changed, 72 insertions(+), 86 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index e7a04aab908..118eb7f35db 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -66,8 +66,8 @@ type Monitor struct { subscriptionsLister listers.SubscriptionLister subscriptionsSynced cache.InformerSynced cache map[channelKey]*channelSummary - provisionedChannels map[resourceKey]channelsv1alpha1.Channel - provisionedSubscriptions map[resourceKey]channelsv1alpha1.Subscription + provisionedChannels map[channelKey]*channelsv1alpha1.Channel + provisionedSubscriptions map[subscriptionKey]*channelsv1alpha1.Subscription mutex *sync.Mutex // workqueue is a rate limited work queue. This is used to queue work to be @@ -85,27 +85,27 @@ type Attributes = map[string]string // MonitorEventHandlerFuncs handler functions for channel and subscription provisioning type MonitorEventHandlerFuncs struct { - BusFunc func(bus channelsv1alpha1.Bus) error - ProvisionFunc func(channel channelsv1alpha1.Channel, attributes Attributes) error - UnprovisionFunc func(channel channelsv1alpha1.Channel) error - SubscribeFunc func(subscription channelsv1alpha1.Subscription, attributes Attributes) error - UnsubscribeFunc func(subscription channelsv1alpha1.Subscription) error + BusFunc func(bus *channelsv1alpha1.Bus) error + ProvisionFunc func(channel *channelsv1alpha1.Channel, attributes Attributes) error + UnprovisionFunc func(channel *channelsv1alpha1.Channel) error + SubscribeFunc func(subscription *channelsv1alpha1.Subscription, attributes Attributes) error + UnsubscribeFunc func(subscription *channelsv1alpha1.Subscription) error } -func (h MonitorEventHandlerFuncs) onBus(bus channelsv1alpha1.Bus, monitor *Monitor) error { +func (h MonitorEventHandlerFuncs) onBus(bus *channelsv1alpha1.Bus, monitor *Monitor) error { if h.BusFunc != nil { err := h.BusFunc(bus) if err != nil { - monitor.recorder.Eventf(&bus, corev1.EventTypeWarning, errResourceSync, "Error syncing bus: %s", err) + monitor.recorder.Eventf(bus, corev1.EventTypeWarning, errResourceSync, "Error syncing bus: %s", err) } else { - monitor.recorder.Event(&bus, corev1.EventTypeNormal, successSynced, "Bus synched successfully") + monitor.recorder.Event(bus, corev1.EventTypeNormal, successSynced, "Bus synched successfully") } return err } return nil } -func (h MonitorEventHandlerFuncs) onProvision(channel channelsv1alpha1.Channel, monitor *Monitor) error { +func (h MonitorEventHandlerFuncs) onProvision(channel *channelsv1alpha1.Channel, monitor *Monitor) error { if h.ProvisionFunc != nil { attributes, err := monitor.channelAttributes(channel.Spec) if err != nil { @@ -113,29 +113,29 @@ func (h MonitorEventHandlerFuncs) onProvision(channel channelsv1alpha1.Channel, } err = h.ProvisionFunc(channel, attributes) if err != nil { - monitor.recorder.Eventf(&channel, corev1.EventTypeWarning, errResourceSync, "Error provisoning channel: %s", err) + monitor.recorder.Eventf(channel, corev1.EventTypeWarning, errResourceSync, "Error provisoning channel: %s", err) } else { - monitor.recorder.Event(&channel, corev1.EventTypeNormal, successSynced, "Channel provisioned successfully") + monitor.recorder.Event(channel, corev1.EventTypeNormal, successSynced, "Channel provisioned successfully") } return err } return nil } -func (h MonitorEventHandlerFuncs) onUnprovision(channel channelsv1alpha1.Channel, monitor *Monitor) error { +func (h MonitorEventHandlerFuncs) onUnprovision(channel *channelsv1alpha1.Channel, monitor *Monitor) error { if h.UnprovisionFunc != nil { err := h.UnprovisionFunc(channel) if err != nil { - monitor.recorder.Eventf(&channel, corev1.EventTypeWarning, errResourceSync, "Error unprovisioning channel: %s", err) + monitor.recorder.Eventf(channel, corev1.EventTypeWarning, errResourceSync, "Error unprovisioning channel: %s", err) } else { - monitor.recorder.Event(&channel, corev1.EventTypeNormal, successSynced, "Channel unprovisioned successfully") + monitor.recorder.Event(channel, corev1.EventTypeNormal, successSynced, "Channel unprovisioned successfully") } return err } return nil } -func (h MonitorEventHandlerFuncs) onSubscribe(subscription channelsv1alpha1.Subscription, monitor *Monitor) error { +func (h MonitorEventHandlerFuncs) onSubscribe(subscription *channelsv1alpha1.Subscription, monitor *Monitor) error { if h.SubscribeFunc != nil { attributes, err := monitor.subscriptionAttributes(subscription.Spec) if err != nil { @@ -143,22 +143,22 @@ func (h MonitorEventHandlerFuncs) onSubscribe(subscription channelsv1alpha1.Subs } err = h.SubscribeFunc(subscription, attributes) if err != nil { - monitor.recorder.Eventf(&subscription, corev1.EventTypeWarning, errResourceSync, "Error subscribing: %s", err) + monitor.recorder.Eventf(subscription, corev1.EventTypeWarning, errResourceSync, "Error subscribing: %s", err) } else { - monitor.recorder.Event(&subscription, corev1.EventTypeNormal, successSynced, "Subscribed successfully") + monitor.recorder.Event(subscription, corev1.EventTypeNormal, successSynced, "Subscribed successfully") } return err } return nil } -func (h MonitorEventHandlerFuncs) onUnsubscribe(subscription channelsv1alpha1.Subscription, monitor *Monitor) error { +func (h MonitorEventHandlerFuncs) onUnsubscribe(subscription *channelsv1alpha1.Subscription, monitor *Monitor) error { if h.UnsubscribeFunc != nil { err := h.UnsubscribeFunc(subscription) if err != nil { - monitor.recorder.Eventf(&subscription, corev1.EventTypeWarning, errResourceSync, "Error unsubscribing: %s", err) + monitor.recorder.Eventf(subscription, corev1.EventTypeWarning, errResourceSync, "Error unsubscribing: %s", err) } else { - monitor.recorder.Event(&subscription, corev1.EventTypeNormal, successSynced, "Unsubscribed successfully") + monitor.recorder.Event(subscription, corev1.EventTypeNormal, successSynced, "Unsubscribed successfully") } return err } @@ -206,8 +206,8 @@ func NewMonitor( subscriptionsLister: subscriptionInformer.Lister(), subscriptionsSynced: subscriptionInformer.Informer().HasSynced, cache: make(map[channelKey]*channelSummary), - provisionedChannels: make(map[resourceKey]channelsv1alpha1.Channel), - provisionedSubscriptions: make(map[resourceKey]channelsv1alpha1.Subscription), + provisionedChannels: make(map[channelKey]*channelsv1alpha1.Channel), + provisionedSubscriptions: make(map[subscriptionKey]*channelsv1alpha1.Subscription), mutex: &sync.Mutex{}, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Monitor"), @@ -286,25 +286,25 @@ func NewMonitor( // Channel for a channel name and namespace func (m *Monitor) Channel(name string, namespace string) *channelsv1alpha1.Channel { - resourceKey := makeResourceKey(channelKind, namespace, name) - if channel, ok := m.provisionedChannels[resourceKey]; ok { - return &channel + channelKey := makeChannelKeyWithNames(namespace, name) + if channel, ok := m.provisionedChannels[channelKey]; ok { + return channel } return nil } // Subscription for a subscription name and namespace func (m *Monitor) Subscription(name string, namespace string) *channelsv1alpha1.Subscription { - resourceKey := makeResourceKey(subscriptionKind, namespace, name) - if subscription, ok := m.provisionedSubscriptions[resourceKey]; ok { - return &subscription + subscriptionKey := makeSubscriptionKeyWithNames(namespace, name) + if subscription, ok := m.provisionedSubscriptions[subscriptionKey]; ok { + return subscription } return nil } // Subscriptions for a channel name and namespace func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1alpha1.SubscriptionSpec { - channelKey := makeChannelKeyWithNames(channel, namespace) + channelKey := makeChannelKeyWithNames(namespace, channel) summary := m.getChannelSummary(channelKey) if summary == nil || summary.Channel == nil { @@ -538,7 +538,7 @@ func (m *Monitor) syncBus(namespace string, name string) error { } // Sync the Bus - err = m.createOrUpdateBus(*bus) + err = m.createOrUpdateBus(bus) if err != nil { return err } @@ -563,7 +563,7 @@ func (m *Monitor) syncChannel(namespace string, name string) error { } // Sync the Channel - err = m.createOrUpdateChannel(*channel) + err = m.createOrUpdateChannel(channel) if err != nil { return err } @@ -588,7 +588,7 @@ func (m *Monitor) syncSubscription(namespace string, name string) error { } // Sync the Subscription - err = m.createOrUpdateSubscription(*subscription) + err = m.createOrUpdateSubscription(subscription) if err != nil { return err } @@ -615,14 +615,14 @@ func (m *Monitor) getOrCreateChannelSummary(key channelKey) *channelSummary { return summary } -func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) error { +func (m *Monitor) createOrUpdateBus(bus *channelsv1alpha1.Bus) error { if bus.Name != m.bus.Name { // this is not our bus return nil } if !reflect.DeepEqual(m.bus, bus.Spec) { - m.bus = &bus + m.bus = bus err := m.handler.onBus(bus, m) if err != nil { return err @@ -632,12 +632,11 @@ func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) error { return nil } -func (m *Monitor) isChannelForBus(channel channelsv1alpha1.Channel) bool { +func (m *Monitor) isChannelForBus(channel *channelsv1alpha1.Channel) bool { return channel.Spec.Bus == m.bus.Name } -func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error { - resourceKey := makeResourceKey(channelKind, channel.Namespace, channel.Name) +func (m *Monitor) createOrUpdateChannel(channel *channelsv1alpha1.Channel) error { channelKey := makeChannelKeyFromChannel(channel) summary := m.getOrCreateChannelSummary(channelKey) @@ -652,20 +651,19 @@ func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) error if err != nil { return err } - m.provisionedChannels[resourceKey] = channel + m.provisionedChannels[channelKey] = channel } return nil } func (m *Monitor) removeChannel(namespace string, name string) error { - resourceKey := makeResourceKey(channelKind, namespace, name) - channel, ok := m.provisionedChannels[resourceKey] + channelKey := makeChannelKeyWithNames(namespace, name) + channel, ok := m.provisionedChannels[channelKey] if !ok { return nil } - channelKey := makeChannelKeyFromChannel(channel) summary := m.getOrCreateChannelSummary(channelKey) m.mutex.Lock() @@ -676,34 +674,33 @@ func (m *Monitor) removeChannel(namespace string, name string) error { if err != nil { return err } - delete(m.provisionedChannels, resourceKey) + delete(m.provisionedChannels, channelKey) return nil } -func (m *Monitor) isChannelKnown(subscription channelsv1alpha1.Subscription) bool { +func (m *Monitor) isChannelKnown(subscription *channelsv1alpha1.Subscription) bool { channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getChannelSummary(channelKey) return summary != nil && summary.Channel != nil } -func (m *Monitor) isSubscriptionProvisioned(subscription channelsv1alpha1.Subscription) bool { - resourceKey := makeResourceKey(subscriptionKind, subscription.Namespace, subscription.Name) - _, ok := m.provisionedSubscriptions[resourceKey] +func (m *Monitor) isSubscriptionProvisioned(subscription *channelsv1alpha1.Subscription) bool { + subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) + _, ok := m.provisionedSubscriptions[subscriptionKey] return ok } -func (m *Monitor) isSubscriptionForBus(subscription channelsv1alpha1.Subscription) bool { +func (m *Monitor) isSubscriptionForBus(subscription *channelsv1alpha1.Subscription) bool { channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getChannelSummary(channelKey) return summary != nil && summary.Channel != nil && summary.Channel.Bus == m.bus.Name } -func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subscription) error { - resourceKey := makeResourceKey(subscriptionKind, subscription.Namespace, subscription.Name) +func (m *Monitor) createOrUpdateSubscription(subscription *channelsv1alpha1.Subscription) error { + subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getOrCreateChannelSummary(channelKey) - subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) m.mutex.Lock() old := summary.Subscriptions[subscriptionKey] @@ -725,22 +722,21 @@ func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subsc if err != nil { return err } - m.provisionedSubscriptions[resourceKey] = subscription + m.provisionedSubscriptions[subscriptionKey] = subscription } return nil } func (m *Monitor) removeSubscription(namespace string, name string) error { - resourceKey := makeResourceKey(subscriptionKind, namespace, name) - subscription, ok := m.provisionedSubscriptions[resourceKey] + subscriptionKey := makeSubscriptionKeyWithNames(namespace, name) + subscription, ok := m.provisionedSubscriptions[subscriptionKey] if !ok { return nil } channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getOrCreateChannelSummary(channelKey) - subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) m.mutex.Lock() delete(summary.Subscriptions, subscriptionKey) @@ -750,54 +746,44 @@ func (m *Monitor) removeSubscription(namespace string, name string) error { if err != nil { return err } - delete(m.provisionedSubscriptions, resourceKey) + delete(m.provisionedSubscriptions, subscriptionKey) return nil } -type resourceKey struct { - Kind string - Namespace string - Name string -} - -func makeResourceKey(kind string, namespace string, name string) resourceKey { - return resourceKey{ - Kind: kind, - Namespace: namespace, - Name: name, - } -} - type channelKey struct { - Name string Namespace string + Name string } -func makeChannelKeyFromChannel(channel channelsv1alpha1.Channel) channelKey { - return makeChannelKeyWithNames(channel.Name, channel.Namespace) +func makeChannelKeyFromChannel(channel *channelsv1alpha1.Channel) channelKey { + return makeChannelKeyWithNames(channel.Namespace, channel.Name) } -func makeChannelKeyFromSubscription(subscription channelsv1alpha1.Subscription) channelKey { - return makeChannelKeyWithNames(subscription.Spec.Channel, subscription.Namespace) +func makeChannelKeyFromSubscription(subscription *channelsv1alpha1.Subscription) channelKey { + return makeChannelKeyWithNames(subscription.Namespace, subscription.Spec.Channel) } -func makeChannelKeyWithNames(name string, namespace string) channelKey { +func makeChannelKeyWithNames(namespace string, name string) channelKey { return channelKey{ - Name: name, Namespace: namespace, + Name: name, } } type subscriptionKey struct { - Name string Namespace string + Name string +} + +func makeSubscriptionKeyFromSubscription(subscription *channelsv1alpha1.Subscription) subscriptionKey { + return makeSubscriptionKeyWithNames(subscription.Namespace, subscription.Name) } -func makeSubscriptionKeyFromSubscription(subscription channelsv1alpha1.Subscription) subscriptionKey { +func makeSubscriptionKeyWithNames(namespace string, name string) subscriptionKey { return subscriptionKey{ - Name: subscription.Name, - Namespace: subscription.Namespace, + Namespace: namespace, + Name: name, } } diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index a3338c22d32..a83684df06d 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -167,19 +167,19 @@ func main() { informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) monitor := buses.NewMonitor(component, kubeClient, informerFactory, buses.MonitorEventHandlerFuncs{ - ProvisionFunc: func(channel channelsv1alpha1.Channel, attributes buses.Attributes) error { + ProvisionFunc: func(channel *channelsv1alpha1.Channel, attributes buses.Attributes) error { glog.Infof("Provision channel %q\n", channel.Name) return nil }, - UnprovisionFunc: func(channel channelsv1alpha1.Channel) error { + UnprovisionFunc: func(channel *channelsv1alpha1.Channel) error { glog.Infof("Unprovision channel %q\n", channel.Name) return nil }, - SubscribeFunc: func(subscription channelsv1alpha1.Subscription, attributes buses.Attributes) error { + SubscribeFunc: func(subscription *channelsv1alpha1.Subscription, attributes buses.Attributes) error { glog.Infof("Subscribe %q to %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel) return nil }, - UnsubscribeFunc: func(subscription channelsv1alpha1.Subscription) error { + UnsubscribeFunc: func(subscription *channelsv1alpha1.Subscription) error { glog.Infof("Unubscribe %q from %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel) return nil }, From 61d9f85bc729a00f98c15421cf24a7a1d28756cb Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 19 Jun 2018 00:42:55 -0400 Subject: [PATCH 17/19] More type alignment --- config/clusterrole.yaml | 2 +- pkg/buses/monitor.go | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/config/clusterrole.yaml b/config/clusterrole.yaml index 3ca9065a2de..0778c2e456e 100644 --- a/config/clusterrole.yaml +++ b/config/clusterrole.yaml @@ -21,4 +21,4 @@ rules: verbs: ["get", "watch", "list"] - apiGroups: [""] resources: ["events"] - verbs: ["create"] + verbs: ["create", "patch"] diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 118eb7f35db..666cd1821d5 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -219,7 +219,7 @@ func NewMonitor( busInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { bus := obj.(*channelsv1alpha1.Bus) - monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(*bus)) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(bus)) }, UpdateFunc: func(old, new interface{}) { oldBus := old.(*channelsv1alpha1.Bus) @@ -231,14 +231,14 @@ func NewMonitor( return } - monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(*newBus)) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus)) }, }) // Set up an event handler for when Channel resources change channelInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { channel := obj.(*channelsv1alpha1.Channel) - monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(*channel)) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(channel)) }, UpdateFunc: func(old, new interface{}) { oldChannel := old.(*channelsv1alpha1.Channel) @@ -250,18 +250,18 @@ func NewMonitor( return } - monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(*newChannel)) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(newChannel)) }, DeleteFunc: func(obj interface{}) { channel := obj.(*channelsv1alpha1.Channel) - monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(*channel)) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(channel)) }, }) // Set up an event handler for when Subscription resources change subscriptionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { subscription := obj.(*channelsv1alpha1.Subscription) - monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(*subscription)) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(subscription)) }, UpdateFunc: func(old, new interface{}) { oldSubscription := old.(*channelsv1alpha1.Subscription) @@ -273,11 +273,11 @@ func NewMonitor( return } - monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(*newSubscription)) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(newSubscription)) }, DeleteFunc: func(obj interface{}) { subscription := obj.(*channelsv1alpha1.Subscription) - monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(*subscription)) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(subscription)) }, }) @@ -386,7 +386,7 @@ func (m *Monitor) resolveAttributes(parameters *[]channelsv1alpha1.Parameter, ar return resolved, nil } -func (m *Monitor) RequeueSubscription(subscription channelsv1alpha1.Subscription) { +func (m *Monitor) RequeueSubscription(subscription *channelsv1alpha1.Subscription) { glog.Infof("Requeue subscription %q\n", subscription.Name) m.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(subscription)) } @@ -787,15 +787,15 @@ func makeSubscriptionKeyWithNames(namespace string, name string) subscriptionKey } } -func makeWorkqueueKeyForBus(bus channelsv1alpha1.Bus) string { +func makeWorkqueueKeyForBus(bus *channelsv1alpha1.Bus) string { return makeWorkqueueKey(busKind, bus.Namespace, bus.Name) } -func makeWorkqueueKeyForChannel(channel channelsv1alpha1.Channel) string { +func makeWorkqueueKeyForChannel(channel *channelsv1alpha1.Channel) string { return makeWorkqueueKey(channelKind, channel.Namespace, channel.Name) } -func makeWorkqueueKeyForSubscription(subscription channelsv1alpha1.Subscription) string { +func makeWorkqueueKeyForSubscription(subscription *channelsv1alpha1.Subscription) string { return makeWorkqueueKey(subscriptionKind, subscription.Namespace, subscription.Name) } From c2e4f4f7d736b647188cee8d222471e102dfd86c Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 19 Jun 2018 17:50:29 -0400 Subject: [PATCH 18/19] Move informers into monitor --- pkg/buses/monitor.go | 32 ++++++++++++++++++++++++++------ pkg/buses/stub/main.go | 24 +----------------------- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 666cd1821d5..29b500673dc 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -27,8 +27,10 @@ import ( channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" channelscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" - informers "github.com/knative/eventing/pkg/client/informers/externalversions" listers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + + clientset "github.com/knative/eventing/pkg/client/clientset/versioned" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" @@ -36,6 +38,7 @@ import ( "k8s.io/client-go/kubernetes" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ) @@ -59,6 +62,7 @@ const ( type Monitor struct { bus *channelsv1alpha1.Bus handler MonitorEventHandlerFuncs + informerFactory informers.SharedInformerFactory busesLister listers.BusLister busesSynced cache.InformerSynced channelsLister listers.ChannelLister @@ -176,11 +180,25 @@ type subscriptionSummary struct { // NewMonitor creates a monitor for a bus func NewMonitor( - component string, - kubeclientset kubernetes.Interface, - informerFactory informers.SharedInformerFactory, + component, masterURL, kubeconfig string, handler MonitorEventHandlerFuncs, ) *Monitor { + cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + if err != nil { + glog.Fatalf("Error building kubeconfig: %s", err.Error()) + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + + client, err := clientset.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building clientset: %s", err.Error()) + } + + informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) busInformer := informerFactory.Channels().V1alpha1().Buses() channelInformer := informerFactory.Channels().V1alpha1().Channels() subscriptionInformer := informerFactory.Channels().V1alpha1().Subscriptions() @@ -192,13 +210,14 @@ func NewMonitor( glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: component}) monitor := &Monitor{ bus: nil, handler: handler, + informerFactory: informerFactory, busesLister: busInformer.Lister(), busesSynced: busInformer.Informer().HasSynced, channelsLister: channelInformer.Lister(), @@ -401,6 +420,7 @@ func (m *Monitor) Run(namespace, name string, threadiness int, stopCh <-chan str // Start the informer factories to begin populating the informer caches glog.Info("Starting monitor") + go m.informerFactory.Start(stopCh) // Wait for the caches to be synced before starting workers glog.Info("Waiting for informer caches to sync") @@ -621,7 +641,7 @@ func (m *Monitor) createOrUpdateBus(bus *channelsv1alpha1.Bus) error { return nil } - if !reflect.DeepEqual(m.bus, bus.Spec) { + if !reflect.DeepEqual(m.bus.Spec, bus.Spec) { m.bus = bus err := m.handler.onBus(bus, m) if err != nil { diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index a83684df06d..0e60c0ae710 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -25,16 +25,11 @@ import ( "net/url" "os" "strings" - "time" "github.com/golang/glog" channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" "github.com/knative/eventing/pkg/buses" - clientset "github.com/knative/eventing/pkg/client/clientset/versioned" - informers "github.com/knative/eventing/pkg/client/informers/externalversions" "github.com/knative/eventing/pkg/signals" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" ) const ( @@ -146,27 +141,11 @@ func main() { // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() - cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) - if err != nil { - glog.Fatalf("Error building kubeconfig: %s", err.Error()) - } - - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) - } - - client, err := clientset.NewForConfig(cfg) - if err != nil { - glog.Fatalf("Error building clientset: %s", err.Error()) - } - namespace := os.Getenv("BUS_NAMESPACE") name := os.Getenv("BUS_NAME") component := fmt.Sprintf("%s-%s", name, buses.Dispatcher) - informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) - monitor := buses.NewMonitor(component, kubeClient, informerFactory, buses.MonitorEventHandlerFuncs{ + monitor := buses.NewMonitor(component, masterURL, kubeconfig, buses.MonitorEventHandlerFuncs{ ProvisionFunc: func(channel *channelsv1alpha1.Channel, attributes buses.Attributes) error { glog.Infof("Provision channel %q\n", channel.Name) return nil @@ -186,7 +165,6 @@ func main() { }) bus := NewStubBus(name, monitor) - go informerFactory.Start(stopCh) go func() { if err := monitor.Run(namespace, name, threadsPerMonitor, stopCh); err != nil { glog.Fatalf("Error running monitor: %s", err.Error()) From fe854e28b1c96e89e102ef80471ce313b79fa7b5 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 22 Jun 2018 11:16:04 -0400 Subject: [PATCH 19/19] Fix unknown arguments check --- pkg/buses/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 29b500673dc..c4f043ac793 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -383,7 +383,7 @@ func (m *Monitor) resolveAttributes(parameters *[]channelsv1alpha1.Parameter, ar // apply arguments if arguments != nil { for _, arg := range *arguments { - if _, ok := known[arg.Name]; ok { + if _, ok := known[arg.Name]; !ok { // ignore arguments not defined by parameters glog.Warningf("Skipping unknown argument: %s\n", arg.Name) continue