diff --git a/config/clusterrole.yaml b/config/clusterrole.yaml index 5e37fa384c5..0778c2e456e 100644 --- a/config/clusterrole.yaml +++ b/config/clusterrole.yaml @@ -18,4 +18,7 @@ metadata: rules: - apiGroups: ["channels.knative.dev"] resources: ["buses", "channels", "subscriptions"] - verbs: ["get", "watch", "list"] \ No newline at end of file + verbs: ["get", "watch", "list"] +- apiGroups: [""] + resources: ["events"] + verbs: ["create", "patch"] diff --git a/pkg/apis/channels/v1alpha1/bus_types.go b/pkg/apis/channels/v1alpha1/bus_types.go index 7cc89b856f1..9ebf8f2478d 100644 --- a/pkg/apis/channels/v1alpha1/bus_types.go +++ b/pkg/apis/channels/v1alpha1/bus_types.go @@ -39,14 +39,27 @@ type Bus struct { // BusSpec (what the user wants) for a bus type BusSpec struct { - // Parameters configuration params for the bus - Parameters *[]Parameter `json:"parameters,omitempty"` + // Parameters exposed by the bus for channels and subscriptions + Parameters *BusParameters `json:"parameters,omitempty"` // Provisioner container definition to manage channels on the bus. Provisioner *kapi.Container `json:"provisioner,omitempty"` // Dispatcher container definition to use for the bus data plane. Dispatcher kapi.Container `json:"dispatcher"` + + // Volumes to be mounted inside the provisioner or dispatcher containers + Volumes *[]kapi.Volume `json:"volumes,omitempty"` +} + +// BusParameters parameters exposed by the bus +type BusParameters struct { + + // Channel configuration params for channels on the bus + Channel *[]Parameter `json:"channel,omitempty"` + + // Subscription configuration params for subscriptions on the bus + Subscription *[]Parameter `json:"subscription,omitempty"` } // BusStatus (computed) for a bus diff --git a/pkg/apis/channels/v1alpha1/channel_types.go b/pkg/apis/channels/v1alpha1/channel_types.go index c3fabda2fa5..db558595882 100644 --- a/pkg/apis/channels/v1alpha1/channel_types.go +++ b/pkg/apis/channels/v1alpha1/channel_types.go @@ -41,11 +41,8 @@ type ChannelSpec struct { // Name of the bus backing this channel (optional) Bus string `json:"bus` - // Arguments configuration arguments for the bus + // Arguments configuration arguments for the channel Arguments *[]Argument `json:"arguments,omitempty"` - - // Parameters configuration params for the channel - Parameters *[]Parameter `json:"parameters,omitempty"` } // ChannelStatus (computed) for a channel diff --git a/pkg/apis/channels/v1alpha1/subscription_types.go b/pkg/apis/channels/v1alpha1/subscription_types.go index 1c92c479d60..3cc91c6f965 100644 --- a/pkg/apis/channels/v1alpha1/subscription_types.go +++ b/pkg/apis/channels/v1alpha1/subscription_types.go @@ -44,7 +44,7 @@ type SubscriptionSpec struct { // Name of the subscriber service Subscriber string `json:"subscriber"` - // Arguments for the channel + // Arguments for the subscription Arguments *[]Argument `json:"arguments,omitempty"` } diff --git a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go index 33aac9e1ee5..e9b763a38cf 100644 --- a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go @@ -111,10 +111,25 @@ func (in *BusList) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BusSpec) DeepCopyInto(out *BusSpec) { +func (in *BusParameters) DeepCopyInto(out *BusParameters) { *out = *in - if in.Parameters != nil { - in, out := &in.Parameters, &out.Parameters + if in.Channel != nil { + in, out := &in.Channel, &out.Channel + if *in == nil { + *out = nil + } else { + *out = new([]Parameter) + if **in != nil { + in, out := *in, *out + *out = make([]Parameter, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + } + } + if in.Subscription != nil { + in, out := &in.Subscription, &out.Subscription if *in == nil { *out = nil } else { @@ -128,6 +143,31 @@ func (in *BusSpec) DeepCopyInto(out *BusSpec) { } } } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BusParameters. +func (in *BusParameters) DeepCopy() *BusParameters { + if in == nil { + return nil + } + out := new(BusParameters) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BusSpec) DeepCopyInto(out *BusSpec) { + *out = *in + if in.Parameters != nil { + in, out := &in.Parameters, &out.Parameters + if *in == nil { + *out = nil + } else { + *out = new(BusParameters) + (*in).DeepCopyInto(*out) + } + } if in.Provisioner != nil { in, out := &in.Provisioner, &out.Provisioner if *in == nil { @@ -138,6 +178,21 @@ func (in *BusSpec) DeepCopyInto(out *BusSpec) { } } in.Dispatcher.DeepCopyInto(&out.Dispatcher) + if in.Volumes != nil { + in, out := &in.Volumes, &out.Volumes + if *in == nil { + *out = nil + } else { + *out = new([]v1.Volume) + if **in != nil { + in, out := *in, *out + *out = make([]v1.Volume, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + } + } return } @@ -252,21 +307,6 @@ func (in *ChannelSpec) DeepCopyInto(out *ChannelSpec) { } } } - if in.Parameters != nil { - in, out := &in.Parameters, &out.Parameters - if *in == nil { - *out = nil - } else { - *out = new([]Parameter) - if **in != nil { - in, out := *in, *out - *out = make([]Parameter, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } - } - } return } diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 220f1491e27..c4f043ac793 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -17,31 +17,156 @@ package buses import ( + "fmt" "reflect" + "strings" "sync" + "time" "github.com/golang/glog" channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + channelscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + listers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + + clientset "github.com/knative/eventing/pkg/client/clientset/versioned" informers "github.com/knative/eventing/pkg/client/informers/externalversions" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" +) + +const ( + Dispatcher = "dispatcher" + Provisioner = "provisioner" + + busKind = "Bus" + channelKind = "Channel" + subscriptionKind = "Subscription" + + // SuccessSynced is used as part of the Event 'reason' when a resource is synced + successSynced = "Synced" + // ErrResourceSync is used as part of the Event 'reason' when a resource fails + // to sync. + errResourceSync = "ErrResourceSync" ) // Monitor utility to manage channels and subscriptions for a bus type Monitor struct { - busName string - handler MonitorEventHandlerFuncs - - bus *channelsv1alpha1.BusSpec - cache map[channelKey]*channelSummary - mutex *sync.Mutex + bus *channelsv1alpha1.Bus + handler MonitorEventHandlerFuncs + informerFactory informers.SharedInformerFactory + busesLister listers.BusLister + busesSynced cache.InformerSynced + channelsLister listers.ChannelLister + channelsSynced cache.InformerSynced + subscriptionsLister listers.SubscriptionLister + subscriptionsSynced cache.InformerSynced + cache map[channelKey]*channelSummary + provisionedChannels map[channelKey]*channelsv1alpha1.Channel + provisionedSubscriptions map[subscriptionKey]*channelsv1alpha1.Subscription + mutex *sync.Mutex + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder } +type Attributes = map[string]string + // MonitorEventHandlerFuncs handler functions for channel and subscription provisioning type MonitorEventHandlerFuncs struct { - ProvisionFunc func(channel channelsv1alpha1.Channel) - UnprovisionFunc func(channel channelsv1alpha1.Channel) - SubscribeFunc func(subscription channelsv1alpha1.Subscription) - UnsubscribeFunc func(subscription channelsv1alpha1.Subscription) + BusFunc func(bus *channelsv1alpha1.Bus) error + ProvisionFunc func(channel *channelsv1alpha1.Channel, attributes Attributes) error + UnprovisionFunc func(channel *channelsv1alpha1.Channel) error + SubscribeFunc func(subscription *channelsv1alpha1.Subscription, attributes Attributes) error + UnsubscribeFunc func(subscription *channelsv1alpha1.Subscription) error +} + +func (h MonitorEventHandlerFuncs) onBus(bus *channelsv1alpha1.Bus, monitor *Monitor) error { + if h.BusFunc != nil { + err := h.BusFunc(bus) + if err != nil { + monitor.recorder.Eventf(bus, corev1.EventTypeWarning, errResourceSync, "Error syncing bus: %s", err) + } else { + monitor.recorder.Event(bus, corev1.EventTypeNormal, successSynced, "Bus synched successfully") + } + return err + } + return nil +} + +func (h MonitorEventHandlerFuncs) onProvision(channel *channelsv1alpha1.Channel, monitor *Monitor) error { + if h.ProvisionFunc != nil { + attributes, err := monitor.channelAttributes(channel.Spec) + if err != nil { + return err + } + err = h.ProvisionFunc(channel, attributes) + if err != nil { + monitor.recorder.Eventf(channel, corev1.EventTypeWarning, errResourceSync, "Error provisoning channel: %s", err) + } else { + monitor.recorder.Event(channel, corev1.EventTypeNormal, successSynced, "Channel provisioned successfully") + } + return err + } + return nil +} + +func (h MonitorEventHandlerFuncs) onUnprovision(channel *channelsv1alpha1.Channel, monitor *Monitor) error { + if h.UnprovisionFunc != nil { + err := h.UnprovisionFunc(channel) + if err != nil { + monitor.recorder.Eventf(channel, corev1.EventTypeWarning, errResourceSync, "Error unprovisioning channel: %s", err) + } else { + monitor.recorder.Event(channel, corev1.EventTypeNormal, successSynced, "Channel unprovisioned successfully") + } + return err + } + return nil +} + +func (h MonitorEventHandlerFuncs) onSubscribe(subscription *channelsv1alpha1.Subscription, monitor *Monitor) error { + if h.SubscribeFunc != nil { + attributes, err := monitor.subscriptionAttributes(subscription.Spec) + if err != nil { + return err + } + err = h.SubscribeFunc(subscription, attributes) + if err != nil { + monitor.recorder.Eventf(subscription, corev1.EventTypeWarning, errResourceSync, "Error subscribing: %s", err) + } else { + monitor.recorder.Event(subscription, corev1.EventTypeNormal, successSynced, "Subscribed successfully") + } + return err + } + return nil +} + +func (h MonitorEventHandlerFuncs) onUnsubscribe(subscription *channelsv1alpha1.Subscription, monitor *Monitor) error { + if h.UnsubscribeFunc != nil { + err := h.UnsubscribeFunc(subscription) + if err != nil { + monitor.recorder.Eventf(subscription, corev1.EventTypeWarning, errResourceSync, "Error unsubscribing: %s", err) + } else { + monitor.recorder.Event(subscription, corev1.EventTypeNormal, successSynced, "Unsubscribed successfully") + } + return err + } + return nil } type channelSummary struct { @@ -54,19 +179,58 @@ type subscriptionSummary struct { } // NewMonitor creates a monitor for a bus -func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, handler MonitorEventHandlerFuncs) *Monitor { +func NewMonitor( + component, masterURL, kubeconfig string, + handler MonitorEventHandlerFuncs, +) *Monitor { + cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + if err != nil { + glog.Fatalf("Error building kubeconfig: %s", err.Error()) + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + client, err := clientset.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building clientset: %s", err.Error()) + } + + informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) busInformer := informerFactory.Channels().V1alpha1().Buses() channelInformer := informerFactory.Channels().V1alpha1().Channels() subscriptionInformer := informerFactory.Channels().V1alpha1().Subscriptions() + // Create event broadcaster + // Add bus-controller types to the default Kubernetes Scheme so Events can be + // logged for bus-controller types. + channelscheme.AddToScheme(scheme.Scheme) + glog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: component}) + monitor := &Monitor{ - busName: busName, + bus: nil, handler: handler, - bus: nil, - cache: make(map[channelKey]*channelSummary), + informerFactory: informerFactory, + busesLister: busInformer.Lister(), + busesSynced: busInformer.Informer().HasSynced, + channelsLister: channelInformer.Lister(), + channelsSynced: channelInformer.Informer().HasSynced, + subscriptionsLister: subscriptionInformer.Lister(), + subscriptionsSynced: subscriptionInformer.Informer().HasSynced, + cache: make(map[channelKey]*channelSummary), + provisionedChannels: make(map[channelKey]*channelsv1alpha1.Channel), + provisionedSubscriptions: make(map[subscriptionKey]*channelsv1alpha1.Subscription), mutex: &sync.Mutex{}, + + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Monitor"), + recorder: recorder, } glog.Info("Setting up event handlers") @@ -74,7 +238,7 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, busInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { bus := obj.(*channelsv1alpha1.Bus) - monitor.createOrUpdateBus(*bus) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(bus)) }, UpdateFunc: func(old, new interface{}) { oldBus := old.(*channelsv1alpha1.Bus) @@ -86,14 +250,14 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, return } - monitor.createOrUpdateBus(*newBus) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus)) }, }) // Set up an event handler for when Channel resources change channelInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { channel := obj.(*channelsv1alpha1.Channel) - monitor.createOrUpdateChannel(*channel) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(channel)) }, UpdateFunc: func(old, new interface{}) { oldChannel := old.(*channelsv1alpha1.Channel) @@ -105,21 +269,18 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, return } - monitor.createOrUpdateChannel(*newChannel) - if oldChannel.Spec.Bus != newChannel.Spec.Bus { - monitor.removeChannel(*oldChannel) - } + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(newChannel)) }, DeleteFunc: func(obj interface{}) { channel := obj.(*channelsv1alpha1.Channel) - monitor.removeChannel(*channel) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(channel)) }, }) // Set up an event handler for when Subscription resources change subscriptionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { subscription := obj.(*channelsv1alpha1.Subscription) - monitor.createOrUpdateSubscription(*subscription) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(subscription)) }, UpdateFunc: func(old, new interface{}) { oldSubscription := old.(*channelsv1alpha1.Subscription) @@ -131,14 +292,11 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, return } - monitor.createOrUpdateSubscription(*newSubscription) - if oldSubscription.Spec.Channel != newSubscription.Spec.Channel { - monitor.removeSubscription(*oldSubscription) - } + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(newSubscription)) }, DeleteFunc: func(obj interface{}) { subscription := obj.(*channelsv1alpha1.Subscription) - monitor.removeSubscription(*subscription) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(subscription)) }, }) @@ -146,19 +304,26 @@ func NewMonitor(busName string, informerFactory informers.SharedInformerFactory, } // Channel for a channel name and namespace -func (m *Monitor) Channel(channel string, namespace string) *channelsv1alpha1.ChannelSpec { - channelKey := makeChannelKeyWithNames(channel, namespace) - summary := m.getChannelSummary(channelKey) +func (m *Monitor) Channel(name string, namespace string) *channelsv1alpha1.Channel { + channelKey := makeChannelKeyWithNames(namespace, name) + if channel, ok := m.provisionedChannels[channelKey]; ok { + return channel + } + return nil +} - if summary == nil { - return nil +// Subscription for a subscription name and namespace +func (m *Monitor) Subscription(name string, namespace string) *channelsv1alpha1.Subscription { + subscriptionKey := makeSubscriptionKeyWithNames(namespace, name) + if subscription, ok := m.provisionedSubscriptions[subscriptionKey]; ok { + return subscription } - return summary.Channel + return nil } // Subscriptions for a channel name and namespace func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1alpha1.SubscriptionSpec { - channelKey := makeChannelKeyWithNames(channel, namespace) + channelKey := makeChannelKeyWithNames(namespace, channel) summary := m.getChannelSummary(channelKey) if summary == nil || summary.Channel == nil { @@ -166,13 +331,13 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a return nil } - if summary.Channel.Bus != m.busName { + if summary.Channel.Bus != m.bus.Name { // the channel is not for this bus return nil } m.mutex.Lock() - subscriptions := make([]channelsv1alpha1.SubscriptionSpec, len(summary.Subscriptions)-1) + subscriptions := []channelsv1alpha1.SubscriptionSpec{} for _, subscription := range summary.Subscriptions { subscriptions = append(subscriptions, subscription.Subscription) } @@ -181,53 +346,274 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a return &subscriptions } -// ChannelParams resolve parameters for a channel -func (m *Monitor) ChannelParams(channel channelsv1alpha1.ChannelSpec) map[string]string { - params := make(map[string]string) +func (m *Monitor) channelAttributes(channel channelsv1alpha1.ChannelSpec) (Attributes, error) { + busParameters := m.bus.Spec.Parameters + var parameters *[]channelsv1alpha1.Parameter + if busParameters != nil { + parameters = busParameters.Channel + } + return m.resolveAttributes(parameters, channel.Arguments) +} + +func (m *Monitor) subscriptionAttributes(subscription channelsv1alpha1.SubscriptionSpec) (Attributes, error) { + busParameters := m.bus.Spec.Parameters + var parameters *[]channelsv1alpha1.Parameter + if busParameters != nil { + parameters = busParameters.Subscription + } + return m.resolveAttributes(parameters, subscription.Arguments) +} + +func (m *Monitor) resolveAttributes(parameters *[]channelsv1alpha1.Parameter, arguments *[]channelsv1alpha1.Argument) (Attributes, error) { + resolved := make(Attributes) + known := make(map[string]interface{}) + required := make(map[string]interface{}) - // apply bus defaults - if m.bus.Parameters != nil { - for _, param := range *m.bus.Parameters { + // apply parameters + if parameters != nil { + for _, param := range *parameters { + known[param.Name] = true if param.Default != nil { - params[param.Name] = *param.Default + resolved[param.Name] = *param.Default + } else { + required[param.Name] = true } } } - // apply channel arguments - if channel.Arguments != nil { - for _, arg := range *channel.Arguments { - // TODO ignore arguments not defined by parameters - params[arg.Name] = arg.Value + // apply arguments + if arguments != nil { + for _, arg := range *arguments { + if _, ok := known[arg.Name]; !ok { + // ignore arguments not defined by parameters + glog.Warningf("Skipping unknown argument: %s\n", arg.Name) + continue + } + delete(required, arg.Name) + resolved[arg.Name] = arg.Value + } + } + + // check for missing arguments + if len(required) != 0 { + missing := []string{} + for name := range required { + missing = append(missing, name) + } + return nil, fmt.Errorf("missing required arguments: %v", missing) + } + + return resolved, nil +} + +func (m *Monitor) RequeueSubscription(subscription *channelsv1alpha1.Subscription) { + glog.Infof("Requeue subscription %q\n", subscription.Name) + m.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(subscription)) +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (m *Monitor) Run(namespace, name string, threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer m.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + glog.Info("Starting monitor") + go m.informerFactory.Start(stopCh) + + // Wait for the caches to be synced before starting workers + glog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, m.busesSynced, m.channelsSynced, m.subscriptionsSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + bus, err := m.busesLister.Buses(namespace).Get(name) + if err != nil { + glog.Fatalf("Unknown bus '%s/%s'", namespace, name) + } + m.bus = bus + + glog.Info("Starting workers") + // Launch two workers to process Bus resources + for i := 0; i < threadiness; i++ { + go wait.Until(m.runWorker, time.Second, stopCh) + } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + + return nil +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (m *Monitor) runWorker() { + for m.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (m *Monitor) processNextWorkItem() bool { + obj, shutdown := m.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer m.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer m.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + m.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil } + // Run the syncHandler, passing it the namespace/name string of the + // Bus resource to be synced. + if err := m.syncHandler(key); err != nil { + m.workqueue.AddRateLimited(obj) + return fmt.Errorf("error syncing monitor '%s': %s", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + m.workqueue.Forget(obj) + glog.Infof("Successfully synced monitor '%s'", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Bus resource +// with the current status of the resource. +func (m *Monitor) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + kind, namespace, name, err := splitWorkqueueKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + if m.bus == nil && kind != busKind { + // don't attempt tp sync until we have seen the bus for this monitor + return fmt.Errorf("Unknown bus for monitor") } - return params + switch kind { + case busKind: + err = m.syncBus(namespace, name) + case channelKind: + err = m.syncChannel(namespace, name) + case subscriptionKind: + err = m.syncSubscription(namespace, name) + default: + runtime.HandleError(fmt.Errorf("Unknown resource kind %s", kind)) + return nil + } + + if err != nil { + return err + } + + return nil } -// SubscriptionParams resolve parameters for a subscription -func (m *Monitor) SubscriptionParams( - channel channelsv1alpha1.ChannelSpec, - subscription channelsv1alpha1.SubscriptionSpec, -) map[string]string { - params := m.ChannelParams(channel) +func (m *Monitor) syncBus(namespace string, name string) error { + // Get the Bus resource with this namespace/name + bus, err := m.busesLister.Buses(namespace).Get(name) + if err != nil { + // The Bus resource may no longer exist + if errors.IsNotFound(err) { + // nothing to do + return nil + } - // apply channel defaults - if channel.Parameters != nil { - for _, param := range *channel.Parameters { - if _, ok := params[param.Name]; !ok && param.Default != nil { - params[param.Name] = *param.Default + return err + } + + // Sync the Bus + err = m.createOrUpdateBus(bus) + if err != nil { + return err + } + + return nil +} + +func (m *Monitor) syncChannel(namespace string, name string) error { + // Get the Channel resource with this namespace/name + channel, err := m.channelsLister.Channels(namespace).Get(name) + if err != nil { + // The Channel resource may no longer exist + if errors.IsNotFound(err) { + err = m.removeChannel(namespace, name) + if err != nil { + return err } + return nil } + + return err } - // apply subscription arguments - if subscription.Arguments != nil { - for _, arg := range *subscription.Arguments { - // TODO ignore arguments not defined by parameters - params[arg.Name] = arg.Value + + // Sync the Channel + err = m.createOrUpdateChannel(channel) + if err != nil { + return err + } + + return nil +} + +func (m *Monitor) syncSubscription(namespace string, name string) error { + // Get the Subscription resource with this namespace/name + subscription, err := m.subscriptionsLister.Subscriptions(namespace).Get(name) + if err != nil { + // The Subscription resource may no longer exist + if errors.IsNotFound(err) { + err = m.removeSubscription(namespace, name) + if err != nil { + return err + } + return nil } + + return err + } + + // Sync the Subscription + err = m.createOrUpdateSubscription(subscription) + if err != nil { + return err } - return params + return nil } func (m *Monitor) getChannelSummary(key channelKey) *channelSummary { @@ -249,17 +635,28 @@ func (m *Monitor) getOrCreateChannelSummary(key channelKey) *channelSummary { return summary } -func (m *Monitor) createOrUpdateBus(bus channelsv1alpha1.Bus) { - if bus.Name != m.busName { +func (m *Monitor) createOrUpdateBus(bus *channelsv1alpha1.Bus) error { + if bus.Name != m.bus.Name { // this is not our bus - return + return nil } - if !reflect.DeepEqual(m.bus, bus.Spec) { - m.bus = &bus.Spec + + if !reflect.DeepEqual(m.bus.Spec, bus.Spec) { + m.bus = bus + err := m.handler.onBus(bus, m) + if err != nil { + return err + } } + + return nil } -func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) { +func (m *Monitor) isChannelForBus(channel *channelsv1alpha1.Channel) bool { + return channel.Spec.Bus == m.bus.Name +} + +func (m *Monitor) createOrUpdateChannel(channel *channelsv1alpha1.Channel) error { channelKey := makeChannelKeyFromChannel(channel) summary := m.getOrCreateChannelSummary(channelKey) @@ -269,26 +666,61 @@ func (m *Monitor) createOrUpdateChannel(channel channelsv1alpha1.Channel) { summary.Channel = new m.mutex.Unlock() - if !reflect.DeepEqual(old, new) { - m.handler.ProvisionFunc(channel) + if m.isChannelForBus(channel) && !reflect.DeepEqual(old, new) { + err := m.handler.onProvision(channel, m) + if err != nil { + return err + } + m.provisionedChannels[channelKey] = channel } + + return nil } -func (m *Monitor) removeChannel(channel channelsv1alpha1.Channel) { - channelKey := makeChannelKeyFromChannel(channel) +func (m *Monitor) removeChannel(namespace string, name string) error { + channelKey := makeChannelKeyWithNames(namespace, name) + channel, ok := m.provisionedChannels[channelKey] + if !ok { + return nil + } + summary := m.getOrCreateChannelSummary(channelKey) m.mutex.Lock() summary.Channel = nil m.mutex.Unlock() - m.handler.UnprovisionFunc(channel) + err := m.handler.onUnprovision(channel, m) + if err != nil { + return err + } + delete(m.provisionedChannels, channelKey) + + return nil } -func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subscription) { +func (m *Monitor) isChannelKnown(subscription *channelsv1alpha1.Subscription) bool { channelKey := makeChannelKeyFromSubscription(subscription) - summary := m.getOrCreateChannelSummary(channelKey) + summary := m.getChannelSummary(channelKey) + return summary != nil && summary.Channel != nil +} + +func (m *Monitor) isSubscriptionProvisioned(subscription *channelsv1alpha1.Subscription) bool { + subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) + _, ok := m.provisionedSubscriptions[subscriptionKey] + return ok +} + +func (m *Monitor) isSubscriptionForBus(subscription *channelsv1alpha1.Subscription) bool { + channelKey := makeChannelKeyFromSubscription(subscription) + summary := m.getChannelSummary(channelKey) + return summary != nil && summary.Channel != nil && summary.Channel.Bus == m.bus.Name +} + +func (m *Monitor) createOrUpdateSubscription(subscription *channelsv1alpha1.Subscription) error { subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) + channelKey := makeChannelKeyFromSubscription(subscription) + summary := m.getOrCreateChannelSummary(channelKey) m.mutex.Lock() old := summary.Subscriptions[subscriptionKey] @@ -298,51 +730,106 @@ func (m *Monitor) createOrUpdateSubscription(subscription channelsv1alpha1.Subsc summary.Subscriptions[subscriptionKey] = new m.mutex.Unlock() - if !reflect.DeepEqual(old.Subscription, new.Subscription) { - m.handler.SubscribeFunc(subscription) + if !m.isChannelKnown(subscription) { + return fmt.Errorf("Unknown channel %q for subscription", subscription.Spec.Channel) } + if !m.isSubscriptionForBus(subscription) { + return nil + } + + if !m.isSubscriptionProvisioned(subscription) || !reflect.DeepEqual(old.Subscription, new.Subscription) { + err := m.handler.onSubscribe(subscription, m) + if err != nil { + return err + } + m.provisionedSubscriptions[subscriptionKey] = subscription + } + + return nil } -func (m *Monitor) removeSubscription(subscription channelsv1alpha1.Subscription) { +func (m *Monitor) removeSubscription(namespace string, name string) error { + subscriptionKey := makeSubscriptionKeyWithNames(namespace, name) + subscription, ok := m.provisionedSubscriptions[subscriptionKey] + if !ok { + return nil + } + channelKey := makeChannelKeyFromSubscription(subscription) summary := m.getOrCreateChannelSummary(channelKey) - subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) m.mutex.Lock() delete(summary.Subscriptions, subscriptionKey) m.mutex.Unlock() - m.handler.UnsubscribeFunc(subscription) + err := m.handler.onUnsubscribe(subscription, m) + if err != nil { + return err + } + delete(m.provisionedSubscriptions, subscriptionKey) + + return nil } type channelKey struct { - Name string Namespace string + Name string } -func makeChannelKeyFromChannel(channel channelsv1alpha1.Channel) channelKey { - return makeChannelKeyWithNames(channel.Name, channel.Namespace) +func makeChannelKeyFromChannel(channel *channelsv1alpha1.Channel) channelKey { + return makeChannelKeyWithNames(channel.Namespace, channel.Name) } -func makeChannelKeyFromSubscription(subscription channelsv1alpha1.Subscription) channelKey { - return makeChannelKeyWithNames(subscription.Spec.Channel, subscription.Namespace) +func makeChannelKeyFromSubscription(subscription *channelsv1alpha1.Subscription) channelKey { + return makeChannelKeyWithNames(subscription.Namespace, subscription.Spec.Channel) } -func makeChannelKeyWithNames(name string, namespace string) channelKey { +func makeChannelKeyWithNames(namespace string, name string) channelKey { return channelKey{ - Name: name, Namespace: namespace, + Name: name, } } type subscriptionKey struct { - Name string Namespace string + Name string } -func makeSubscriptionKeyFromSubscription(subscription channelsv1alpha1.Subscription) subscriptionKey { +func makeSubscriptionKeyFromSubscription(subscription *channelsv1alpha1.Subscription) subscriptionKey { + return makeSubscriptionKeyWithNames(subscription.Namespace, subscription.Name) +} + +func makeSubscriptionKeyWithNames(namespace string, name string) subscriptionKey { return subscriptionKey{ - Name: subscription.Name, - Namespace: subscription.Namespace, + Namespace: namespace, + Name: name, + } +} + +func makeWorkqueueKeyForBus(bus *channelsv1alpha1.Bus) string { + return makeWorkqueueKey(busKind, bus.Namespace, bus.Name) +} + +func makeWorkqueueKeyForChannel(channel *channelsv1alpha1.Channel) string { + return makeWorkqueueKey(channelKind, channel.Namespace, channel.Name) +} + +func makeWorkqueueKeyForSubscription(subscription *channelsv1alpha1.Subscription) string { + return makeWorkqueueKey(subscriptionKind, subscription.Namespace, subscription.Name) +} + +func makeWorkqueueKey(kind string, namespace string, name string) string { + return fmt.Sprintf("%s/%s/%s", kind, namespace, name) +} + +func splitWorkqueueKey(key string) (string, string, string, error) { + chunks := strings.Split(key, "/") + if len(chunks) != 3 { + return "", "", "", fmt.Errorf("Unknown workqueue key %v", key) } + kind := chunks[0] + namespace := chunks[1] + name := chunks[2] + return kind, namespace, name, nil } diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index b1c20eaa5df..0e60c0ae710 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -19,20 +19,21 @@ package main import ( "bytes" "flag" + "fmt" "io/ioutil" "net/http" "net/url" "os" "strings" - "time" "github.com/golang/glog" channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" "github.com/knative/eventing/pkg/buses" - clientset "github.com/knative/eventing/pkg/client/clientset/versioned" - informers "github.com/knative/eventing/pkg/client/informers/externalversions" "github.com/knative/eventing/pkg/signals" - "k8s.io/client-go/tools/clientcmd" +) + +const ( + threadsPerMonitor = 1 ) var ( @@ -140,37 +141,37 @@ func main() { // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() - cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) - if err != nil { - glog.Fatalf("Error building kubeconfig: %s", err.Error()) - } - - client, err := clientset.NewForConfig(cfg) - if err != nil { - glog.Fatalf("Error building clientset: %s", err.Error()) - } - + namespace := os.Getenv("BUS_NAMESPACE") name := os.Getenv("BUS_NAME") + component := fmt.Sprintf("%s-%s", name, buses.Dispatcher) - informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) - monitor := buses.NewMonitor(name, informerFactory, buses.MonitorEventHandlerFuncs{ - ProvisionFunc: func(channel channelsv1alpha1.Channel) { + monitor := buses.NewMonitor(component, masterURL, kubeconfig, buses.MonitorEventHandlerFuncs{ + ProvisionFunc: func(channel *channelsv1alpha1.Channel, attributes buses.Attributes) error { glog.Infof("Provision channel %q\n", channel.Name) + return nil }, - UnprovisionFunc: func(channel channelsv1alpha1.Channel) { + UnprovisionFunc: func(channel *channelsv1alpha1.Channel) error { glog.Infof("Unprovision channel %q\n", channel.Name) + return nil }, - SubscribeFunc: func(subscription channelsv1alpha1.Subscription) { + SubscribeFunc: func(subscription *channelsv1alpha1.Subscription, attributes buses.Attributes) error { glog.Infof("Subscribe %q to %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel) + return nil }, - UnsubscribeFunc: func(subscription channelsv1alpha1.Subscription) { + UnsubscribeFunc: func(subscription *channelsv1alpha1.Subscription) error { glog.Infof("Unubscribe %q from %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel) + return nil }, }) bus := NewStubBus(name, monitor) - go informerFactory.Start(stopCh) + go func() { + if err := monitor.Run(namespace, name, threadsPerMonitor, stopCh); err != nil { + glog.Fatalf("Error running monitor: %s", err.Error()) + } + }() + glog.Infoln("Starting web server") http.HandleFunc("/", bus.handleEvent) glog.Fatal(http.ListenAndServe(":8080", nil)) diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index acde824c29b..95752985d68 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -639,11 +639,19 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Name: "PORT", Value: "8080", }, + corev1.EnvVar{ + Name: "BUS_NAMESPACE", + Value: bus.Namespace, + }, corev1.EnvVar{ Name: "BUS_NAME", Value: bus.Name, }, ) + volumes := []corev1.Volume{} + if bus.Spec.Volumes != nil { + volumes = *bus.Spec.Volumes + } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: controller.BusDispatcherDeploymentName(bus.ObjectMeta.Name), @@ -670,6 +678,7 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Containers: []corev1.Container{ *container, }, + Volumes: volumes, }, }, }, @@ -737,11 +746,19 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { one := int32(1) container := bus.Spec.Provisioner.DeepCopy() container.Env = append(container.Env, + corev1.EnvVar{ + Name: "BUS_NAMESPACE", + Value: bus.Namespace, + }, corev1.EnvVar{ Name: "BUS_NAME", Value: bus.Name, }, ) + volumes := []corev1.Volume{} + if bus.Spec.Volumes != nil { + volumes = *bus.Spec.Volumes + } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: controller.BusProvisionerDeploymentName(bus.ObjectMeta.Name), @@ -768,6 +785,7 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Containers: []corev1.Container{ *container, }, + Volumes: volumes, }, }, }, diff --git a/sample/hello/hello-channel.yaml b/sample/hello/hello-channel.yaml index 482f12da2de..7791e7168ed 100644 --- a/sample/hello/hello-channel.yaml +++ b/sample/hello/hello-channel.yaml @@ -4,12 +4,4 @@ metadata: name: aloha spec: bus: stub - ---- -apiVersion: channels.knative.dev/v1alpha1 -kind: Subscription -metadata: - name: aloha2hello -spec: - channel: aloha - subscriber: hello-00001-service + \ No newline at end of file diff --git a/sample/hello/hello-subscription.yaml b/sample/hello/hello-subscription.yaml new file mode 100644 index 00000000000..ed2cf6338cf --- /dev/null +++ b/sample/hello/hello-subscription.yaml @@ -0,0 +1,7 @@ +apiVersion: channels.knative.dev/v1alpha1 +kind: Subscription +metadata: + name: aloha2hello +spec: + channel: aloha + subscriber: hello-00001-service