diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 79ad4434e70..a79b1cd7bc1 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -38,6 +38,7 @@ import ( "github.com/knative/eventing/pkg/controller/bind" "github.com/knative/eventing/pkg/controller/bus" "github.com/knative/eventing/pkg/controller/channel" + "github.com/knative/eventing/pkg/controller/clusterbus" "github.com/knative/eventing/pkg/signals" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -88,6 +89,7 @@ func main() { ctors := []controller.Constructor{ bind.NewController, bus.NewController, + clusterbus.NewController, channel.NewController, } diff --git a/config/clusterbus.yaml b/config/clusterbus.yaml new file mode 100644 index 00000000000..ab68f6e6fc3 --- /dev/null +++ b/config/clusterbus.yaml @@ -0,0 +1,25 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: clusterbuses.channels.knative.dev +spec: + scope: Cluster + group: channels.knative.dev + version: v1alpha1 + names: + kind: ClusterBus + plural: clusterbuses + singular: clusterbus diff --git a/config/clusterrole.yaml b/config/clusterrole.yaml index 2a64ff9c6c8..e17c9fc5576 100644 --- a/config/clusterrole.yaml +++ b/config/clusterrole.yaml @@ -17,7 +17,7 @@ metadata: name: knative-channels-bus rules: - apiGroups: ["channels.knative.dev"] - resources: ["buses", "channels", "subscriptions"] + resources: ["buses", "clusterbuses", "channels", "subscriptions"] verbs: ["get", "watch", "list"] - apiGroups: [""] resources: ["events"] diff --git a/config/clusterrolebinding.yaml b/config/clusterrolebinding.yaml index b6ef6ed08d0..5cb663a2fcf 100644 --- a/config/clusterrolebinding.yaml +++ b/config/clusterrolebinding.yaml @@ -23,3 +23,17 @@ roleRef: kind: ClusterRole name: cluster-admin apiGroup: rbac.authorization.k8s.io + +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: clusterbus-controller-manage +subjects: + - kind: ServiceAccount + name: clusterbus-controller + namespace: knative-eventing +roleRef: + kind: ClusterRole + name: knative-channels-bus + apiGroup: rbac.authorization.k8s.io diff --git a/config/serviceaccount.yaml b/config/serviceaccount.yaml index c4c032a8beb..5facc458ea6 100644 --- a/config/serviceaccount.yaml +++ b/config/serviceaccount.yaml @@ -16,3 +16,10 @@ kind: ServiceAccount metadata: name: bind-controller namespace: knative-eventing + +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: clusterbus-controller + namespace: knative-eventing diff --git a/pkg/apis/channels/v1alpha1/bus_types.go b/pkg/apis/channels/v1alpha1/bus_types.go index e4137f3c800..96bc509247b 100644 --- a/pkg/apis/channels/v1alpha1/bus_types.go +++ b/pkg/apis/channels/v1alpha1/bus_types.go @@ -21,6 +21,7 @@ import ( kapi "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" ) // +genclient @@ -32,8 +33,8 @@ import ( type Bus struct { meta_v1.TypeMeta `json:",inline"` meta_v1.ObjectMeta `json:"metadata"` - Spec BusSpec `json:"spec"` - Status *BusStatus `json:"status,omitempty"` + Spec BusSpec `json:"spec"` + Status BusStatus `json:"status,omitempty"` } // BusSpec (what the user wants) for a bus @@ -66,6 +67,14 @@ type BusParameters struct { type BusStatus struct { } +func (b *Bus) BacksChannel(channel *Channel) bool { + return b.Namespace == channel.Namespace && b.Name == channel.Spec.Bus +} + +func (b *Bus) GetSpec() *BusSpec { + return &b.Spec +} + func (b *Bus) GetSpecJSON() ([]byte, error) { return json.Marshal(b.Spec) } @@ -78,3 +87,11 @@ type BusList struct { meta_v1.ListMeta `json:"metadata"` Items []Bus `json:"items"` } + +// GenericBus may be backed by Bus or ClusterBus +type GenericBus interface { + runtime.Object + meta_v1.ObjectMetaAccessor + BacksChannel(channel *Channel) bool + GetSpec() *BusSpec +} diff --git a/pkg/apis/channels/v1alpha1/channel_types.go b/pkg/apis/channels/v1alpha1/channel_types.go index 01123e43733..18f53102d2f 100644 --- a/pkg/apis/channels/v1alpha1/channel_types.go +++ b/pkg/apis/channels/v1alpha1/channel_types.go @@ -31,15 +31,18 @@ import ( type Channel struct { meta_v1.TypeMeta `json:",inline"` meta_v1.ObjectMeta `json:"metadata"` - Spec ChannelSpec `json:"spec"` - Status *ChannelStatus `json:"status,omitempty"` + Spec ChannelSpec `json:"spec"` + Status ChannelStatus `json:"status,omitempty"` } // ChannelSpec (what the user wants) for a channel type ChannelSpec struct { - // Name of the bus backing this channel (optional) - Bus string `json:"bus` + // Bus name of the bus backing this channel (mutually exclusive with ClusterBus) + Bus string `json:"bus"` + + // ClusterBus name of the clusterbus backing this channel (mutually exclusive with Bus) + ClusterBus string `json:"clusterBus"` // Arguments configuration arguments for the channel Arguments *[]Argument `json:"arguments,omitempty"` diff --git a/pkg/apis/channels/v1alpha1/clusterbus_types.go b/pkg/apis/channels/v1alpha1/clusterbus_types.go new file mode 100644 index 00000000000..6b726e1fe15 --- /dev/null +++ b/pkg/apis/channels/v1alpha1/clusterbus_types.go @@ -0,0 +1,65 @@ +/* + * Copyright 2018 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "encoding/json" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +genclient:noStatus +// +genclient:nonNamespaced +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:defaulter-gen=true + +// ClusterBus represents the clusterbuses.channels.knative.dev CRD +type ClusterBus struct { + meta_v1.TypeMeta `json:",inline"` + meta_v1.ObjectMeta `json:"metadata"` + Spec ClusterBusSpec `json:"spec"` + Status ClusterBusStatus `json:"status,omitempty"` +} + +// ClusterBusSpec (what the user wants) for a clusterbus +type ClusterBusSpec = BusSpec + +// ClusterBusStatus (computed) for a clusterbus +type ClusterBusStatus struct { +} + +func (b *ClusterBus) BacksChannel(channel *Channel) bool { + return len(b.Namespace) == 0 && b.Name == channel.Spec.ClusterBus +} + +func (b *ClusterBus) GetSpec() *BusSpec { + return &b.Spec +} + +func (b *ClusterBus) GetSpecJSON() ([]byte, error) { + return json.Marshal(b.Spec) +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ClusterBusList returned in list operations +type ClusterBusList struct { + meta_v1.TypeMeta `json:",inline"` + meta_v1.ListMeta `json:"metadata"` + Items []ClusterBus `json:"items"` +} diff --git a/pkg/apis/channels/v1alpha1/register.go b/pkg/apis/channels/v1alpha1/register.go index b39481399cf..32ce1eb5370 100644 --- a/pkg/apis/channels/v1alpha1/register.go +++ b/pkg/apis/channels/v1alpha1/register.go @@ -47,6 +47,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &Bus{}, &BusList{}, + &ClusterBus{}, + &ClusterBusList{}, &Channel{}, &ChannelList{}, &Subscription{}, diff --git a/pkg/apis/channels/v1alpha1/subscription_types.go b/pkg/apis/channels/v1alpha1/subscription_types.go index 942202bb666..e3efb5de4f4 100644 --- a/pkg/apis/channels/v1alpha1/subscription_types.go +++ b/pkg/apis/channels/v1alpha1/subscription_types.go @@ -31,8 +31,8 @@ import ( type Subscription struct { meta_v1.TypeMeta `json:",inline"` meta_v1.ObjectMeta `json:"metadata"` - Spec SubscriptionSpec `json:"spec"` - Status *SubscriptionStatus `json:"status,omitempty"` + Spec SubscriptionSpec `json:"spec"` + Status SubscriptionStatus `json:"status,omitempty"` } // SubscriptionSpec (what the user wants) for a subscription diff --git a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go index a93df7a0c63..14bc4b7b849 100644 --- a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go @@ -47,15 +47,7 @@ func (in *Bus) DeepCopyInto(out *Bus) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - if in.Status != nil { - in, out := &in.Status, &out.Status - if *in == nil { - *out = nil - } else { - *out = new(BusStatus) - **out = **in - } - } + out.Status = in.Status return } @@ -228,15 +220,7 @@ func (in *Channel) DeepCopyInto(out *Channel) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - if in.Status != nil { - in, out := &in.Status, &out.Status - if *in == nil { - *out = nil - } else { - *out = new(ChannelStatus) - **out = **in - } - } + out.Status = in.Status return } @@ -336,6 +320,83 @@ func (in *ChannelStatus) DeepCopy() *ChannelStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterBus) DeepCopyInto(out *ClusterBus) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterBus. +func (in *ClusterBus) DeepCopy() *ClusterBus { + if in == nil { + return nil + } + out := new(ClusterBus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterBus) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterBusList) DeepCopyInto(out *ClusterBusList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ClusterBus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterBusList. +func (in *ClusterBusList) DeepCopy() *ClusterBusList { + if in == nil { + return nil + } + out := new(ClusterBusList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterBusList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterBusStatus) DeepCopyInto(out *ClusterBusStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterBusStatus. +func (in *ClusterBusStatus) DeepCopy() *ClusterBusStatus { + if in == nil { + return nil + } + out := new(ClusterBusStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Parameter) DeepCopyInto(out *Parameter) { *out = *in @@ -367,15 +428,7 @@ func (in *Subscription) DeepCopyInto(out *Subscription) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - if in.Status != nil { - in, out := &in.Status, &out.Status - if *in == nil { - *out = nil - } else { - *out = new(SubscriptionStatus) - **out = **in - } - } + out.Status = in.Status return } diff --git a/pkg/buses/monitor.go b/pkg/buses/monitor.go index 16c1dfdc450..db5ada947ab 100644 --- a/pkg/buses/monitor.go +++ b/pkg/buses/monitor.go @@ -48,6 +48,7 @@ const ( Provisioner = "provisioner" busKind = "Bus" + clusterBusKind = "ClusterBus" channelKind = "Channel" subscriptionKind = "Subscription" @@ -58,13 +59,15 @@ const ( errResourceSync = "ErrResourceSync" ) -// Monitor utility to manage channels and subscriptions for a bus +// Monitor utility to manage channels and subscriptions for a GenericBus type Monitor struct { - bus *channelsv1alpha1.Bus + bus channelsv1alpha1.GenericBus handler MonitorEventHandlerFuncs informerFactory informers.SharedInformerFactory busesLister listers.BusLister busesSynced cache.InformerSynced + clusterBusesLister listers.ClusterBusLister + clusterBusesSynced cache.InformerSynced channelsLister listers.ChannelLister channelsSynced cache.InformerSynced subscriptionsLister listers.SubscriptionLister @@ -89,18 +92,18 @@ type Attributes = map[string]string // MonitorEventHandlerFuncs handler functions for channel and subscription provisioning type MonitorEventHandlerFuncs struct { - BusFunc func(bus *channelsv1alpha1.Bus) error + BusFunc func(bus channelsv1alpha1.GenericBus) error ProvisionFunc func(channel *channelsv1alpha1.Channel, attributes Attributes) error UnprovisionFunc func(channel *channelsv1alpha1.Channel) error SubscribeFunc func(subscription *channelsv1alpha1.Subscription, attributes Attributes) error UnsubscribeFunc func(subscription *channelsv1alpha1.Subscription) error } -func (h MonitorEventHandlerFuncs) onBus(bus *channelsv1alpha1.Bus, monitor *Monitor) error { +func (h MonitorEventHandlerFuncs) onBus(bus channelsv1alpha1.GenericBus, monitor *Monitor) error { if h.BusFunc != nil { err := h.BusFunc(bus) if err != nil { - monitor.recorder.Eventf(bus, corev1.EventTypeWarning, errResourceSync, "Error syncing bus: %s", err) + monitor.recorder.Eventf(bus, corev1.EventTypeWarning, errResourceSync, "Error syncing Bus: %s", err) } else { monitor.recorder.Event(bus, corev1.EventTypeNormal, successSynced, "Bus synched successfully") } @@ -178,7 +181,7 @@ type subscriptionSummary struct { Subscription channelsv1alpha1.SubscriptionSpec } -// NewMonitor creates a monitor for a bus +// NewMonitor creates a monitor for a GenericBus func NewMonitor( component, masterURL, kubeconfig string, handler MonitorEventHandlerFuncs, @@ -200,12 +203,12 @@ func NewMonitor( informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) busInformer := informerFactory.Channels().V1alpha1().Buses() + clusterBusInformer := informerFactory.Channels().V1alpha1().ClusterBuses() channelInformer := informerFactory.Channels().V1alpha1().Channels() subscriptionInformer := informerFactory.Channels().V1alpha1().Subscriptions() // Create event broadcaster - // Add bus-controller types to the default Kubernetes Scheme so Events can be - // logged for bus-controller types. + // Add types to the default Kubernetes Scheme so Events can be logged for the component. channelscheme.AddToScheme(scheme.Scheme) glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() @@ -220,6 +223,8 @@ func NewMonitor( informerFactory: informerFactory, busesLister: busInformer.Lister(), busesSynced: busInformer.Informer().HasSynced, + clusterBusesLister: clusterBusInformer.Lister(), + clusterBusesSynced: clusterBusInformer.Informer().HasSynced, channelsLister: channelInformer.Lister(), channelsSynced: channelInformer.Informer().HasSynced, subscriptionsLister: subscriptionInformer.Lister(), @@ -253,6 +258,25 @@ func NewMonitor( monitor.workqueue.AddRateLimited(makeWorkqueueKeyForBus(newBus)) }, }) + // Set up an event handler for when ClusterBus resources change + clusterBusInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + clusterBus := obj.(*channelsv1alpha1.ClusterBus) + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(clusterBus)) + }, + UpdateFunc: func(old, new interface{}) { + oldClusterBus := old.(*channelsv1alpha1.ClusterBus) + newClusterBus := new.(*channelsv1alpha1.ClusterBus) + + if oldClusterBus.ResourceVersion == newClusterBus.ResourceVersion { + // Periodic resync will send update events for all known ClusterBuses. + // Two different versions of the same ClusterBus will always have different RVs. + return + } + + monitor.workqueue.AddRateLimited(makeWorkqueueKeyForClusterBus(newClusterBus)) + }, + }) // Set up an event handler for when Channel resources change channelInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -322,16 +346,17 @@ func (m *Monitor) Subscription(name string, namespace string) *channelsv1alpha1. } // Subscriptions for a channel name and namespace -func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1alpha1.SubscriptionSpec { - channelKey := makeChannelKeyWithNames(namespace, channel) +func (m *Monitor) Subscriptions(channelName string, namespace string) *[]channelsv1alpha1.SubscriptionSpec { + channelKey := makeChannelKeyWithNames(namespace, channelName) summary := m.getChannelSummary(channelKey) + channel := m.Channel(channelName, namespace) - if summary == nil || summary.Channel == nil { + if summary == nil || summary.Channel == nil || channel == nil { // the channel is unknown return nil } - if summary.Channel.Bus != m.bus.Name { + if !m.bus.BacksChannel(channel) { // the channel is not for this bus return nil } @@ -347,19 +372,19 @@ func (m *Monitor) Subscriptions(channel string, namespace string) *[]channelsv1a } func (m *Monitor) channelAttributes(channel channelsv1alpha1.ChannelSpec) (Attributes, error) { - busParameters := m.bus.Spec.Parameters + genericBusParameters := m.bus.GetSpec().Parameters var parameters *[]channelsv1alpha1.Parameter - if busParameters != nil { - parameters = busParameters.Channel + if genericBusParameters != nil { + parameters = genericBusParameters.Channel } return m.resolveAttributes(parameters, channel.Arguments) } func (m *Monitor) subscriptionAttributes(subscription channelsv1alpha1.SubscriptionSpec) (Attributes, error) { - busParameters := m.bus.Spec.Parameters + genericBusParameters := m.bus.GetSpec().Parameters var parameters *[]channelsv1alpha1.Parameter - if busParameters != nil { - parameters = busParameters.Subscription + if genericBusParameters != nil { + parameters = genericBusParameters.Subscription } return m.resolveAttributes(parameters, subscription.Arguments) } @@ -414,7 +439,7 @@ func (m *Monitor) RequeueSubscription(subscription *channelsv1alpha1.Subscriptio // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (m *Monitor) Run(namespace, name string, threadiness int, stopCh <-chan struct{}) error { +func (m *Monitor) Run(busNamespace, busName string, threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer m.workqueue.ShutDown() @@ -424,18 +449,28 @@ func (m *Monitor) Run(namespace, name string, threadiness int, stopCh <-chan str // Wait for the caches to be synced before starting workers glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, m.busesSynced, m.channelsSynced, m.subscriptionsSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, m.busesSynced, m.clusterBusesSynced, m.channelsSynced, m.subscriptionsSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } - bus, err := m.busesLister.Buses(namespace).Get(name) - if err != nil { - glog.Fatalf("Unknown bus '%s/%s'", namespace, name) + if len(busNamespace) == 0 { + // monitor is for a ClusterBus + clusterBus, err := m.clusterBusesLister.Get(busName) + if err != nil { + glog.Fatalf("Unknown clusterbus %q", busName) + } + m.bus = clusterBus + } else { + // monitor is for a namespaced Bus + bus, err := m.busesLister.Buses(busNamespace).Get(busName) + if err != nil { + glog.Fatalf("Unknown bus '%s/%s'", busNamespace, busName) + } + m.bus = bus } - m.bus = bus glog.Info("Starting workers") - // Launch two workers to process Bus resources + // Launch workers to process resources for i := 0; i < threadiness; i++ { go wait.Until(m.runWorker, time.Second, stopCh) } @@ -488,8 +523,7 @@ func (m *Monitor) processNextWorkItem() bool { runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } - // Run the syncHandler, passing it the namespace/name string of the - // Bus resource to be synced. + // Run the syncHandler, passing it the name string of the resource to be synced. if err := m.syncHandler(key); err != nil { m.workqueue.AddRateLimited(obj) return fmt.Errorf("error syncing monitor '%s': %s", key, err.Error()) @@ -510,8 +544,8 @@ func (m *Monitor) processNextWorkItem() bool { } // syncHandler compares the actual state with the desired, and attempts to -// converge the two. It then updates the Status block of the Bus resource -// with the current status of the resource. +// converge the two. It then updates the Status block of the resource with the +// current status. func (m *Monitor) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name kind, namespace, name, err := splitWorkqueueKey(key) @@ -520,14 +554,16 @@ func (m *Monitor) syncHandler(key string) error { return nil } - if m.bus == nil && kind != busKind { - // don't attempt tp sync until we have seen the bus for this monitor + if m.bus == nil && !(kind == busKind || kind == clusterBusKind) { + // don't attempt to sync until we have seen the bus for this monitor return fmt.Errorf("Unknown bus for monitor") } switch kind { case busKind: err = m.syncBus(namespace, name) + case clusterBusKind: + err = m.syncClusterBus(name) case channelKind: err = m.syncChannel(namespace, name) case subscriptionKind: @@ -566,6 +602,28 @@ func (m *Monitor) syncBus(namespace string, name string) error { return nil } +func (m *Monitor) syncClusterBus(name string) error { + // Get the ClusterBus resource with this name + clusterBus, err := m.clusterBusesLister.Get(name) + if err != nil { + // The ClusterBus resource may no longer exist + if errors.IsNotFound(err) { + // nothing to do + return nil + } + + return err + } + + // Sync the ClusterBus + err = m.createOrUpdateClusterBus(clusterBus) + if err != nil { + return err + } + + return nil +} + func (m *Monitor) syncChannel(namespace string, name string) error { // Get the Channel resource with this namespace/name channel, err := m.channelsLister.Channels(namespace).Get(name) @@ -636,12 +694,13 @@ func (m *Monitor) getOrCreateChannelSummary(key channelKey) *channelSummary { } func (m *Monitor) createOrUpdateBus(bus *channelsv1alpha1.Bus) error { - if bus.Name != m.bus.Name { + if bus.Namespace != m.bus.GetObjectMeta().GetNamespace() || + bus.Name != m.bus.GetObjectMeta().GetName() { // this is not our bus return nil } - if !reflect.DeepEqual(m.bus.Spec, bus.Spec) { + if !reflect.DeepEqual(m.bus.GetSpec(), bus.Spec) { m.bus = bus err := m.handler.onBus(bus, m) if err != nil { @@ -652,8 +711,21 @@ func (m *Monitor) createOrUpdateBus(bus *channelsv1alpha1.Bus) error { return nil } -func (m *Monitor) isChannelForBus(channel *channelsv1alpha1.Channel) bool { - return channel.Spec.Bus == m.bus.Name +func (m *Monitor) createOrUpdateClusterBus(clusterBus *channelsv1alpha1.ClusterBus) error { + if clusterBus.Name != m.bus.GetObjectMeta().GetName() { + // this is not our clusterbus + return nil + } + + if !reflect.DeepEqual(m.bus.GetSpec(), clusterBus.Spec) { + m.bus = clusterBus + err := m.handler.onBus(clusterBus, m) + if err != nil { + return err + } + } + + return nil } func (m *Monitor) createOrUpdateChannel(channel *channelsv1alpha1.Channel) error { @@ -666,7 +738,7 @@ func (m *Monitor) createOrUpdateChannel(channel *channelsv1alpha1.Channel) error summary.Channel = new m.mutex.Unlock() - if m.isChannelForBus(channel) && !reflect.DeepEqual(old, new) { + if m.bus.BacksChannel(channel) && !reflect.DeepEqual(old, new) { err := m.handler.onProvision(channel, m) if err != nil { return err @@ -699,24 +771,12 @@ func (m *Monitor) removeChannel(namespace string, name string) error { return nil } -func (m *Monitor) isChannelKnown(subscription *channelsv1alpha1.Subscription) bool { - channelKey := makeChannelKeyFromSubscription(subscription) - summary := m.getChannelSummary(channelKey) - return summary != nil && summary.Channel != nil -} - func (m *Monitor) isSubscriptionProvisioned(subscription *channelsv1alpha1.Subscription) bool { subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) _, ok := m.provisionedSubscriptions[subscriptionKey] return ok } -func (m *Monitor) isSubscriptionForBus(subscription *channelsv1alpha1.Subscription) bool { - channelKey := makeChannelKeyFromSubscription(subscription) - summary := m.getChannelSummary(channelKey) - return summary != nil && summary.Channel != nil && summary.Channel.Bus == m.bus.Name -} - func (m *Monitor) createOrUpdateSubscription(subscription *channelsv1alpha1.Subscription) error { subscriptionKey := makeSubscriptionKeyFromSubscription(subscription) channelKey := makeChannelKeyFromSubscription(subscription) @@ -730,10 +790,11 @@ func (m *Monitor) createOrUpdateSubscription(subscription *channelsv1alpha1.Subs summary.Subscriptions[subscriptionKey] = new m.mutex.Unlock() - if !m.isChannelKnown(subscription) { - return fmt.Errorf("Unknown channel %q for subscription", subscription.Spec.Channel) + channel := m.Channel(subscription.Spec.Channel, subscription.Namespace) + if channel == nil { + return fmt.Errorf("unknown channel %q for subscription", subscription.Spec.Channel) } - if !m.isSubscriptionForBus(subscription) { + if !m.bus.BacksChannel(channel) { return nil } @@ -811,6 +872,10 @@ func makeWorkqueueKeyForBus(bus *channelsv1alpha1.Bus) string { return makeWorkqueueKey(busKind, bus.Namespace, bus.Name) } +func makeWorkqueueKeyForClusterBus(clusterBus *channelsv1alpha1.ClusterBus) string { + return makeWorkqueueKey(clusterBusKind, "", clusterBus.Name) +} + func makeWorkqueueKeyForChannel(channel *channelsv1alpha1.Channel) string { return makeWorkqueueKey(channelKind, channel.Namespace, channel.Name) } diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index 6d877acc20a..019290a9daa 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -73,7 +73,7 @@ func (b *StubBus) handleEvent(res http.ResponseWriter, req *http.Request) { safeHeaders.Set("x-bus", b.name) safeHeaders.Set("x-channel", channel) for _, subscription := range *subscriptions { - subscriber := subscription.Subscriber + subscriber := b.resolveSubscriber(subscription, namespace) glog.Infof("Sending to %q for %q\n", subscriber, channel) go b.dispatchEvent(subscriber, body, safeHeaders) } @@ -96,6 +96,14 @@ func (b *StubBus) dispatchEvent(subscriber string, body []byte, headers http.Hea } } +func (b *StubBus) resolveSubscriber(subscription channelsv1alpha1.SubscriptionSpec, namespace string) string { + subscriber := subscription.Subscriber + if strings.Index(subscriber, ".") == -1 { + subscriber = fmt.Sprintf("%s.%s", subscriber, namespace) + } + return subscriber +} + func (b *StubBus) splitChannelName(host string) (string, string) { chunks := strings.Split(host, ".") channel := chunks[0] @@ -141,9 +149,9 @@ func main() { // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() - namespace := os.Getenv("BUS_NAMESPACE") - name := os.Getenv("BUS_NAME") - component := fmt.Sprintf("%s-%s", name, buses.Dispatcher) + busNamespace := os.Getenv("BUS_NAMESPACE") + busName := os.Getenv("BUS_NAME") + component := fmt.Sprintf("%s-%s", busName, buses.Dispatcher) monitor := buses.NewMonitor(component, masterURL, kubeconfig, buses.MonitorEventHandlerFuncs{ ProvisionFunc: func(channel *channelsv1alpha1.Channel, attributes buses.Attributes) error { @@ -163,10 +171,10 @@ func main() { return nil }, }) - bus := NewStubBus(name, monitor) + bus := NewStubBus(busName, monitor) go func() { - if err := monitor.Run(namespace, name, threadsPerMonitor, stopCh); err != nil { + if err := monitor.Run(busNamespace, busName, threadsPerMonitor, stopCh); err != nil { glog.Fatalf("Error running monitor: %s", err.Error()) } }() diff --git a/pkg/client/clientset/versioned/typed/channels/v1alpha1/channels_client.go b/pkg/client/clientset/versioned/typed/channels/v1alpha1/channels_client.go index cc83d1fd55e..d99d1390cef 100644 --- a/pkg/client/clientset/versioned/typed/channels/v1alpha1/channels_client.go +++ b/pkg/client/clientset/versioned/typed/channels/v1alpha1/channels_client.go @@ -29,6 +29,7 @@ type ChannelsV1alpha1Interface interface { RESTClient() rest.Interface BusesGetter ChannelsGetter + ClusterBusesGetter SubscriptionsGetter } @@ -45,6 +46,10 @@ func (c *ChannelsV1alpha1Client) Channels(namespace string) ChannelInterface { return newChannels(c, namespace) } +func (c *ChannelsV1alpha1Client) ClusterBuses() ClusterBusInterface { + return newClusterBuses(c) +} + func (c *ChannelsV1alpha1Client) Subscriptions(namespace string) SubscriptionInterface { return newSubscriptions(c, namespace) } diff --git a/pkg/client/clientset/versioned/typed/channels/v1alpha1/clusterbus.go b/pkg/client/clientset/versioned/typed/channels/v1alpha1/clusterbus.go new file mode 100644 index 00000000000..707d2e04e4e --- /dev/null +++ b/pkg/client/clientset/versioned/typed/channels/v1alpha1/clusterbus.go @@ -0,0 +1,147 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + scheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// ClusterBusesGetter has a method to return a ClusterBusInterface. +// A group's client should implement this interface. +type ClusterBusesGetter interface { + ClusterBuses() ClusterBusInterface +} + +// ClusterBusInterface has methods to work with ClusterBus resources. +type ClusterBusInterface interface { + Create(*v1alpha1.ClusterBus) (*v1alpha1.ClusterBus, error) + Update(*v1alpha1.ClusterBus) (*v1alpha1.ClusterBus, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.ClusterBus, error) + List(opts v1.ListOptions) (*v1alpha1.ClusterBusList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ClusterBus, err error) + ClusterBusExpansion +} + +// clusterBuses implements ClusterBusInterface +type clusterBuses struct { + client rest.Interface +} + +// newClusterBuses returns a ClusterBuses +func newClusterBuses(c *ChannelsV1alpha1Client) *clusterBuses { + return &clusterBuses{ + client: c.RESTClient(), + } +} + +// Get takes name of the clusterBus, and returns the corresponding clusterBus object, and an error if there is any. +func (c *clusterBuses) Get(name string, options v1.GetOptions) (result *v1alpha1.ClusterBus, err error) { + result = &v1alpha1.ClusterBus{} + err = c.client.Get(). + Resource("clusterbuses"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of ClusterBuses that match those selectors. +func (c *clusterBuses) List(opts v1.ListOptions) (result *v1alpha1.ClusterBusList, err error) { + result = &v1alpha1.ClusterBusList{} + err = c.client.Get(). + Resource("clusterbuses"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested clusterBuses. +func (c *clusterBuses) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Resource("clusterbuses"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a clusterBus and creates it. Returns the server's representation of the clusterBus, and an error, if there is any. +func (c *clusterBuses) Create(clusterBus *v1alpha1.ClusterBus) (result *v1alpha1.ClusterBus, err error) { + result = &v1alpha1.ClusterBus{} + err = c.client.Post(). + Resource("clusterbuses"). + Body(clusterBus). + Do(). + Into(result) + return +} + +// Update takes the representation of a clusterBus and updates it. Returns the server's representation of the clusterBus, and an error, if there is any. +func (c *clusterBuses) Update(clusterBus *v1alpha1.ClusterBus) (result *v1alpha1.ClusterBus, err error) { + result = &v1alpha1.ClusterBus{} + err = c.client.Put(). + Resource("clusterbuses"). + Name(clusterBus.Name). + Body(clusterBus). + Do(). + Into(result) + return +} + +// Delete takes name of the clusterBus and deletes it. Returns an error if one occurs. +func (c *clusterBuses) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Resource("clusterbuses"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *clusterBuses) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Resource("clusterbuses"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched clusterBus. +func (c *clusterBuses) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ClusterBus, err error) { + result = &v1alpha1.ClusterBus{} + err = c.client.Patch(pt). + Resource("clusterbuses"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/client/clientset/versioned/typed/channels/v1alpha1/fake/fake_channels_client.go b/pkg/client/clientset/versioned/typed/channels/v1alpha1/fake/fake_channels_client.go index ad49ab6ad76..3c498bd6dcb 100644 --- a/pkg/client/clientset/versioned/typed/channels/v1alpha1/fake/fake_channels_client.go +++ b/pkg/client/clientset/versioned/typed/channels/v1alpha1/fake/fake_channels_client.go @@ -36,6 +36,10 @@ func (c *FakeChannelsV1alpha1) Channels(namespace string) v1alpha1.ChannelInterf return &FakeChannels{c, namespace} } +func (c *FakeChannelsV1alpha1) ClusterBuses() v1alpha1.ClusterBusInterface { + return &FakeClusterBuses{c} +} + func (c *FakeChannelsV1alpha1) Subscriptions(namespace string) v1alpha1.SubscriptionInterface { return &FakeSubscriptions{c, namespace} } diff --git a/pkg/client/clientset/versioned/typed/channels/v1alpha1/fake/fake_clusterbus.go b/pkg/client/clientset/versioned/typed/channels/v1alpha1/fake/fake_clusterbus.go new file mode 100644 index 00000000000..c73436022d6 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/channels/v1alpha1/fake/fake_clusterbus.go @@ -0,0 +1,120 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeClusterBuses implements ClusterBusInterface +type FakeClusterBuses struct { + Fake *FakeChannelsV1alpha1 +} + +var clusterbusesResource = schema.GroupVersionResource{Group: "channels.knative.dev", Version: "v1alpha1", Resource: "clusterbuses"} + +var clusterbusesKind = schema.GroupVersionKind{Group: "channels.knative.dev", Version: "v1alpha1", Kind: "ClusterBus"} + +// Get takes name of the clusterBus, and returns the corresponding clusterBus object, and an error if there is any. +func (c *FakeClusterBuses) Get(name string, options v1.GetOptions) (result *v1alpha1.ClusterBus, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetAction(clusterbusesResource, name), &v1alpha1.ClusterBus{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ClusterBus), err +} + +// List takes label and field selectors, and returns the list of ClusterBuses that match those selectors. +func (c *FakeClusterBuses) List(opts v1.ListOptions) (result *v1alpha1.ClusterBusList, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootListAction(clusterbusesResource, clusterbusesKind, opts), &v1alpha1.ClusterBusList{}) + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.ClusterBusList{} + for _, item := range obj.(*v1alpha1.ClusterBusList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested clusterBuses. +func (c *FakeClusterBuses) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewRootWatchAction(clusterbusesResource, opts)) +} + +// Create takes the representation of a clusterBus and creates it. Returns the server's representation of the clusterBus, and an error, if there is any. +func (c *FakeClusterBuses) Create(clusterBus *v1alpha1.ClusterBus) (result *v1alpha1.ClusterBus, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootCreateAction(clusterbusesResource, clusterBus), &v1alpha1.ClusterBus{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ClusterBus), err +} + +// Update takes the representation of a clusterBus and updates it. Returns the server's representation of the clusterBus, and an error, if there is any. +func (c *FakeClusterBuses) Update(clusterBus *v1alpha1.ClusterBus) (result *v1alpha1.ClusterBus, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateAction(clusterbusesResource, clusterBus), &v1alpha1.ClusterBus{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ClusterBus), err +} + +// Delete takes name of the clusterBus and deletes it. Returns an error if one occurs. +func (c *FakeClusterBuses) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewRootDeleteAction(clusterbusesResource, name), &v1alpha1.ClusterBus{}) + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeClusterBuses) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewRootDeleteCollectionAction(clusterbusesResource, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.ClusterBusList{}) + return err +} + +// Patch applies the patch and returns the patched clusterBus. +func (c *FakeClusterBuses) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ClusterBus, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootPatchSubresourceAction(clusterbusesResource, name, data, subresources...), &v1alpha1.ClusterBus{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ClusterBus), err +} diff --git a/pkg/client/clientset/versioned/typed/channels/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/channels/v1alpha1/generated_expansion.go index 5d6be8ae3c0..8979577193d 100644 --- a/pkg/client/clientset/versioned/typed/channels/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/channels/v1alpha1/generated_expansion.go @@ -22,4 +22,6 @@ type BusExpansion interface{} type ChannelExpansion interface{} +type ClusterBusExpansion interface{} + type SubscriptionExpansion interface{} diff --git a/pkg/client/informers/externalversions/channels/v1alpha1/clusterbus.go b/pkg/client/informers/externalversions/channels/v1alpha1/clusterbus.go new file mode 100644 index 00000000000..109af846e71 --- /dev/null +++ b/pkg/client/informers/externalversions/channels/v1alpha1/clusterbus.go @@ -0,0 +1,88 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + channels_v1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + versioned "github.com/knative/eventing/pkg/client/clientset/versioned" + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// ClusterBusInformer provides access to a shared informer and lister for +// ClusterBuses. +type ClusterBusInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.ClusterBusLister +} + +type clusterBusInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// NewClusterBusInformer constructs a new informer for ClusterBus type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewClusterBusInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredClusterBusInformer(client, resyncPeriod, indexers, nil) +} + +// NewFilteredClusterBusInformer constructs a new informer for ClusterBus type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredClusterBusInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ChannelsV1alpha1().ClusterBuses().List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ChannelsV1alpha1().ClusterBuses().Watch(options) + }, + }, + &channels_v1alpha1.ClusterBus{}, + resyncPeriod, + indexers, + ) +} + +func (f *clusterBusInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredClusterBusInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *clusterBusInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&channels_v1alpha1.ClusterBus{}, f.defaultInformer) +} + +func (f *clusterBusInformer) Lister() v1alpha1.ClusterBusLister { + return v1alpha1.NewClusterBusLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/channels/v1alpha1/interface.go b/pkg/client/informers/externalversions/channels/v1alpha1/interface.go index 7a34535020f..0831ac85fba 100644 --- a/pkg/client/informers/externalversions/channels/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/channels/v1alpha1/interface.go @@ -28,6 +28,8 @@ type Interface interface { Buses() BusInformer // Channels returns a ChannelInformer. Channels() ChannelInformer + // ClusterBuses returns a ClusterBusInformer. + ClusterBuses() ClusterBusInformer // Subscriptions returns a SubscriptionInformer. Subscriptions() SubscriptionInformer } @@ -53,6 +55,11 @@ func (v *version) Channels() ChannelInformer { return &channelInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// ClusterBuses returns a ClusterBusInformer. +func (v *version) ClusterBuses() ClusterBusInformer { + return &clusterBusInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} +} + // Subscriptions returns a SubscriptionInformer. func (v *version) Subscriptions() SubscriptionInformer { return &subscriptionInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index b8680d85c8a..5c93b4c19cb 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -59,6 +59,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Channels().V1alpha1().Buses().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("channels"): return &genericInformer{resource: resource.GroupResource(), informer: f.Channels().V1alpha1().Channels().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("clusterbuses"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Channels().V1alpha1().ClusterBuses().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("subscriptions"): return &genericInformer{resource: resource.GroupResource(), informer: f.Channels().V1alpha1().Subscriptions().Informer()}, nil diff --git a/pkg/client/listers/channels/v1alpha1/clusterbus.go b/pkg/client/listers/channels/v1alpha1/clusterbus.go new file mode 100644 index 00000000000..5415692f299 --- /dev/null +++ b/pkg/client/listers/channels/v1alpha1/clusterbus.go @@ -0,0 +1,65 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// ClusterBusLister helps list ClusterBuses. +type ClusterBusLister interface { + // List lists all ClusterBuses in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.ClusterBus, err error) + // Get retrieves the ClusterBus from the index for a given name. + Get(name string) (*v1alpha1.ClusterBus, error) + ClusterBusListerExpansion +} + +// clusterBusLister implements the ClusterBusLister interface. +type clusterBusLister struct { + indexer cache.Indexer +} + +// NewClusterBusLister returns a new ClusterBusLister. +func NewClusterBusLister(indexer cache.Indexer) ClusterBusLister { + return &clusterBusLister{indexer: indexer} +} + +// List lists all ClusterBuses in the indexer. +func (s *clusterBusLister) List(selector labels.Selector) (ret []*v1alpha1.ClusterBus, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.ClusterBus)) + }) + return ret, err +} + +// Get retrieves the ClusterBus from the index for a given name. +func (s *clusterBusLister) Get(name string) (*v1alpha1.ClusterBus, error) { + obj, exists, err := s.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("clusterbus"), name) + } + return obj.(*v1alpha1.ClusterBus), nil +} diff --git a/pkg/client/listers/channels/v1alpha1/expansion_generated.go b/pkg/client/listers/channels/v1alpha1/expansion_generated.go index 1f925d7e5d6..96d1fd82104 100644 --- a/pkg/client/listers/channels/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/channels/v1alpha1/expansion_generated.go @@ -34,6 +34,10 @@ type ChannelListerExpansion interface{} // ChannelNamespaceLister. type ChannelNamespaceListerExpansion interface{} +// ClusterBusListerExpansion allows custom methods to be added to +// ClusterBusLister. +type ClusterBusListerExpansion interface{} + // SubscriptionListerExpansion allows custom methods to be added to // SubscriptionLister. type SubscriptionListerExpansion interface{} diff --git a/pkg/controller/channel/controller.go b/pkg/controller/channel/controller.go index deb5dc2b492..ef521804901 100644 --- a/pkg/controller/channel/controller.go +++ b/pkg/controller/channel/controller.go @@ -38,6 +38,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "github.com/knative/eventing/pkg" clientset "github.com/knative/eventing/pkg/client/clientset/versioned" channelscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/pkg/client/informers/externalversions" @@ -455,9 +456,17 @@ func newService(channel *channelsv1alpha1.Channel) *corev1.Service { // the Channel resource that 'owns' it. func newVirtualService(channel *channelsv1alpha1.Channel) *istiov1alpha3.VirtualService { labels := map[string]string{ - "bus": channel.Spec.Bus, "channel": channel.Name, } + var destinationHost string + if len(channel.Spec.Bus) != 0 { + labels["bus"] = channel.Spec.Bus + destinationHost = controller.ServiceHostName(controller.BusDispatcherServiceName(channel.Spec.Bus), channel.Namespace) + } + if len(channel.Spec.ClusterBus) != 0 { + labels["clusterBus"] = channel.Spec.ClusterBus + destinationHost = controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.ClusterBus), pkg.GetEventingSystemNamespace()) + } return &istiov1alpha3.VirtualService{ ObjectMeta: metav1.ObjectMeta{ Name: controller.ChannelVirtualServiceName(channel.Name), @@ -484,7 +493,7 @@ func newVirtualService(channel *channelsv1alpha1.Channel) *istiov1alpha3.Virtual Route: []istiov1alpha3.DestinationWeight{ { Destination: istiov1alpha3.Destination{ - Host: controller.ServiceHostName(controller.BusDispatcherServiceName(channel.Spec.Bus), channel.Namespace), + Host: destinationHost, }, }, }, diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go new file mode 100644 index 00000000000..643ab659ec5 --- /dev/null +++ b/pkg/controller/clusterbus/controller.go @@ -0,0 +1,646 @@ +/* +Copyright 2017 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterbus + +import ( + "fmt" + "reflect" + "time" + + "github.com/golang/glog" + "github.com/knative/eventing/pkg/controller" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + appslisters "k8s.io/client-go/listers/apps/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "github.com/knative/eventing/pkg" + clientset "github.com/knative/eventing/pkg/client/clientset/versioned" + channelscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" + listers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + + servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" + + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" +) + +const ( + controllerAgentName = "clusterbus-controller" + clusterBusControllerServiceAccountName = "clusterbus-controller" +) + +const ( + // SuccessSynced is used as part of the Event 'reason' when a ClusterBus is synced + SuccessSynced = "Synced" + // ErrResourceExists is used as part of the Event 'reason' when a ClusterBus fails + // to sync due to a Service of the same name already existing. + ErrResourceExists = "ErrResourceExists" + + // MessageResourceExists is the message used for Events when a resource + // fails to sync due to a Service already existing + MessageResourceExists = "Resource %q already exists and is not managed by ClusterBus" + // MessageResourceSynced is the message used for an Event fired when a ClusterBus + // is synced successfully + MessageResourceSynced = "ClusterBus synced successfully" +) + +// Controller is the controller implementation for ClusterBus resources +type Controller struct { + // kubeclientset is a standard kubernetes clientset + kubeclientset kubernetes.Interface + // clusterbusclientset is a clientset for our own API group + clusterbusclientset clientset.Interface + + deploymentsLister appslisters.DeploymentLister + deploymentsSynced cache.InformerSynced + servicesLister corelisters.ServiceLister + servicesSynced cache.InformerSynced + clusterBusesLister listers.ClusterBusLister + clusterBusesSynced cache.InformerSynced + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder +} + +// NewController returns a new clusterbus controller +func NewController( + kubeclientset kubernetes.Interface, + clusterbusclientset clientset.Interface, + kubeInformerFactory kubeinformers.SharedInformerFactory, + clusterBusInformerFactory informers.SharedInformerFactory, + routeInformerFactory servinginformers.SharedInformerFactory) controller.Interface { + + // obtain references to shared index informers for the ClusterBus, Deployment and Service + // types. + clusterBusInformer := clusterBusInformerFactory.Channels().V1alpha1().ClusterBuses() + deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() + serviceInformer := kubeInformerFactory.Core().V1().Services() + + // Create event broadcaster + // Add clusterbus-controller types to the default Kubernetes Scheme so Events can be + // logged for clusterbus-controller types. + channelscheme.AddToScheme(scheme.Scheme) + glog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + controller := &Controller{ + kubeclientset: kubeclientset, + clusterbusclientset: clusterbusclientset, + deploymentsLister: deploymentInformer.Lister(), + deploymentsSynced: deploymentInformer.Informer().HasSynced, + servicesLister: serviceInformer.Lister(), + servicesSynced: serviceInformer.Informer().HasSynced, + clusterBusesLister: clusterBusInformer.Lister(), + clusterBusesSynced: clusterBusInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterBuses"), + recorder: recorder, + } + + glog.Info("Setting up event handlers") + // Set up an event handler for when ClusterBus resources change + clusterBusInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueClusterBus, + UpdateFunc: func(old, new interface{}) { + controller.enqueueClusterBus(new) + }, + }) + // Set up an event handler for when Service resources change. This + // handler will lookup the owner of the given Service, and if it is + // owned by a ClusterBus resource will enqueue that ClusterBus resource for + // processing. This way, we don't need to implement custom logic for + // handling Service resources. More info on this pattern: + // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md + serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newService := new.(*corev1.Service) + oldService := old.(*corev1.Service) + if newService.ResourceVersion == oldService.ResourceVersion { + // Periodic resync will send update events for all known Services. + // Two different versions of the same Service will always have different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + + return controller +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + glog.Info("Starting ClusterBus controller") + + // Wait for the caches to be synced before starting workers + glog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.servicesSynced, c.clusterBusesSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + glog.Info("Starting workers") + // Launch two workers to process ClusterBus resources + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + + return nil +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (c *Controller) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *Controller) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // ClusterBus resource to be synced. + if err := c.syncHandler(key); err != nil { + return fmt.Errorf("error syncing clusterbus '%s': %s", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + glog.Infof("Successfully synced clusterbus '%s'", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the ClusterBus resource +// with the current status of the resource. +func (c *Controller) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + // Get the ClusterBus resource with this name + clusterBus, err := c.clusterBusesLister.Get(name) + if err != nil { + // The ClusterBus resource may no longer exist, in which case we stop + // processing. + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("clusterbus '%s' in work queue no longer exists", key)) + return nil + } + + return err + } + + // Sync Service derived from the ClusterBus + dispatcherService, err := c.syncClusterBusDispatcherService(clusterBus) + if err != nil { + return err + } + + // Sync Deployment derived from the ClusterBus + dispatcherDeployment, err := c.syncClusterBusDispatcherDeployment(clusterBus) + if err != nil { + return err + } + + // Sync Deployment derived from the ClusterBus + provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBus) + if err != nil { + return err + } + + // Finally, we update the status block of the ClusterBus resource to reflect the + // current state of the world + err = c.updateClusterBusStatus(clusterBus, dispatcherService, dispatcherDeployment, provisionerDeployment) + if err != nil { + return err + } + + c.recorder.Event(clusterBus, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) + return nil +} + +func (c *Controller) syncClusterBusDispatcherService(clusterBus *channelsv1alpha1.ClusterBus) (*corev1.Service, error) { + // Get the service with the specified service name + serviceName := controller.ClusterBusDispatcherServiceName(clusterBus.ObjectMeta.Name) + service, err := c.servicesLister.Services(pkg.GetEventingSystemNamespace()).Get(serviceName) + // If the resource doesn't exist, we'll create it + if errors.IsNotFound(err) { + service, err = c.kubeclientset.CoreV1().Services(pkg.GetEventingSystemNamespace()).Create(newDispatcherService(clusterBus)) + } + + // If an error occurs during Get/Create, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + + // If the Service is not controlled by this ClusterBus resource, we should log + // a warning to the event recorder and return + if !metav1.IsControlledBy(service, clusterBus) { + msg := fmt.Sprintf(MessageResourceExists, service.Name) + c.recorder.Event(clusterBus, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + return service, nil +} + +func (c *Controller) syncClusterBusDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { + // Get the deployment with the specified deployment name + deploymentName := controller.ClusterBusDispatcherDeploymentName(clusterBus.ObjectMeta.Name) + deployment, err := c.deploymentsLister.Deployments(pkg.GetEventingSystemNamespace()).Get(deploymentName) + // If the resource doesn't exist, we'll create it + if errors.IsNotFound(err) { + deployment, err = c.kubeclientset.AppsV1().Deployments(pkg.GetEventingSystemNamespace()).Create(newDispatcherDeployment(clusterBus)) + } + + // If an error occurs during Get/Create, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + + // If the Deployment is not controlled by this ClusterBus resource, we should log + // a warning to the event recorder and return + if !metav1.IsControlledBy(deployment, clusterBus) { + msg := fmt.Sprintf(MessageResourceExists, deployment.Name) + c.recorder.Event(clusterBus, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + // If the Deployment does not match the ClusterBus's proposed Deployment we should update + // the Deployment resource. + proposedDeployment := newDispatcherDeployment(clusterBus) + if !reflect.DeepEqual(proposedDeployment.Spec, deployment.Spec) { + glog.V(4).Infof("ClusterBus %s dispatcher spec updated", clusterBus.Name) + deployment, err = c.kubeclientset.AppsV1().Deployments(pkg.GetEventingSystemNamespace()).Update(proposedDeployment) + + if err != nil { + return nil, err + } + } + + return deployment, nil +} + +func (c *Controller) syncClusterBusProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { + provisioner := clusterBus.Spec.Provisioner + + // Get the deployment with the specified deployment name + deploymentName := controller.ClusterBusProvisionerDeploymentName(clusterBus.ObjectMeta.Name) + deployment, err := c.deploymentsLister.Deployments(pkg.GetEventingSystemNamespace()).Get(deploymentName) + + // If the resource shouldn't exists + if provisioner == nil { + // If the resource exists, we'll delete it + if deployment != nil { + err = c.kubeclientset.AppsV1().Deployments(pkg.GetEventingSystemNamespace()).Delete(deploymentName, nil) + } + if errors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + + // If the resource doesn't exist, we'll create it + if errors.IsNotFound(err) { + deployment, err = c.kubeclientset.AppsV1().Deployments(pkg.GetEventingSystemNamespace()).Create(newProvisionerDeployment(clusterBus)) + } + + // If an error occurs during Get/Create, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + + // If the Deployment is not controlled by this ClusterBus resource, we should log + // a warning to the event recorder and return + if !metav1.IsControlledBy(deployment, clusterBus) { + msg := fmt.Sprintf(MessageResourceExists, deployment.Name) + c.recorder.Event(clusterBus, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + // If the Deployment does not match the ClusterBus's proposed Deployment we should update + // the Deployment resource. + proposedDeployment := newProvisionerDeployment(clusterBus) + if !reflect.DeepEqual(proposedDeployment.Spec, deployment.Spec) { + glog.V(4).Infof("ClusterBus %s provisioner spec updated", clusterBus.Name) + deployment, err = c.kubeclientset.AppsV1().Deployments(pkg.GetEventingSystemNamespace()).Update(proposedDeployment) + + if err != nil { + return nil, err + } + } + + return deployment, nil +} + +func (c *Controller) updateClusterBusStatus( + clusterBus *channelsv1alpha1.ClusterBus, + dispatcherService *corev1.Service, + dispatcherDeployment *appsv1.Deployment, + provisionerDeployment *appsv1.Deployment, +) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + clusterBusCopy := clusterBus.DeepCopy() + // If the CustomResourceSubresources feature gate is not enabled, + // we must use Update instead of UpdateStatus to update the Status block of the ClusterBus resource. + // UpdateStatus will not allow changes to the Spec of the resource, + // which is ideal for ensuring nothing other than resource status has been updated. + _, err := c.clusterbusclientset.ChannelsV1alpha1().ClusterBuses().Update(clusterBusCopy) + return err +} + +// enqueueClusterBus takes a ClusterBus resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than ClusterBus. +func (c *Controller) enqueueClusterBus(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(err) + return + } + c.workqueue.AddRateLimited(key) +} + +// handleObject will take any resource implementing metav1.Object and attempt +// to find the ClusterBus resource that 'owns' it. It does this by looking at the +// objects metadata.ownerReferences field for an appropriate OwnerReference. +// It then enqueues that ClusterBus resource to be processed. If the object does not +// have an appropriate OwnerReference, it will simply be skipped. +func (c *Controller) handleObject(obj interface{}) { + var object metav1.Object + var ok bool + if object, ok = obj.(metav1.Object); !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + runtime.HandleError(fmt.Errorf("error decoding object, invalid type")) + return + } + object, ok = tombstone.Obj.(metav1.Object) + if !ok { + runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) + return + } + glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) + } + glog.V(4).Infof("Processing object: %s", object.GetName()) + if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { + // If this object is not owned by a ClusterBus, we should not do anything more + // with it. + if ownerRef.Kind != "ClusterBus" { + return + } + + clusterBus, err := c.clusterBusesLister.Get(ownerRef.Name) + if err != nil { + glog.V(4).Infof("ignoring orphaned object '%s' of clusterbus '%s'", object.GetSelfLink(), ownerRef.Name) + return + } + + c.enqueueClusterBus(clusterBus) + return + } +} + +// newDispatcherService creates a new Service for a ClusterBus resource. It also sets +// the appropriate OwnerReferences on the resource so handleObject can discover +// the ClusterBus resource that 'owns' it. +func newDispatcherService(clusterBus *channelsv1alpha1.ClusterBus) *corev1.Service { + labels := map[string]string{ + "clusterBus": clusterBus.Name, + "role": "dispatcher", + } + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ClusterBusDispatcherServiceName(clusterBus.ObjectMeta.Name), + Namespace: pkg.GetEventingSystemNamespace(), + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(clusterBus, schema.GroupVersionKind{ + Group: channelsv1alpha1.SchemeGroupVersion.Group, + Version: channelsv1alpha1.SchemeGroupVersion.Version, + Kind: "ClusterBus", + }), + }, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + } +} + +// newDispatcherDeployment creates a new Deployment for a ClusterBus resource. It also sets +// the appropriate OwnerReferences on the resource so handleObject can discover +// the ClusterBus resource that 'owns' it. +func newDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) *appsv1.Deployment { + labels := map[string]string{ + "clusterBus": clusterBus.Name, + "role": "dispatcher", + } + one := int32(1) + container := clusterBus.Spec.Dispatcher.DeepCopy() + container.Env = append(container.Env, + corev1.EnvVar{ + Name: "PORT", + Value: "8080", + }, + corev1.EnvVar{ + Name: "BUS_NAME", + Value: clusterBus.Name, + }, + ) + volumes := []corev1.Volume{} + if clusterBus.Spec.Volumes != nil { + volumes = *clusterBus.Spec.Volumes + } + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ClusterBusDispatcherDeploymentName(clusterBus.ObjectMeta.Name), + Namespace: pkg.GetEventingSystemNamespace(), + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(clusterBus, schema.GroupVersionKind{ + Group: channelsv1alpha1.SchemeGroupVersion.Group, + Version: channelsv1alpha1.SchemeGroupVersion.Version, + Kind: "ClusterBus", + }), + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &one, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: clusterBusControllerServiceAccountName, + Containers: []corev1.Container{ + *container, + }, + Volumes: volumes, + }, + }, + }, + } +} + +// newProvisionerDeployment creates a new Deployment for a ClusterBus resource. It also sets +// the appropriate OwnerReferences on the resource so handleObject can discover +// the ClusterBus resource that 'owns' it. +func newProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) *appsv1.Deployment { + labels := map[string]string{ + "clusterBus": clusterBus.Name, + "role": "provisioner", + } + one := int32(1) + container := clusterBus.Spec.Provisioner.DeepCopy() + container.Env = append(container.Env, + corev1.EnvVar{ + Name: "BUS_NAME", + Value: clusterBus.Name, + }, + ) + volumes := []corev1.Volume{} + if clusterBus.Spec.Volumes != nil { + volumes = *clusterBus.Spec.Volumes + } + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ClusterBusProvisionerDeploymentName(clusterBus.ObjectMeta.Name), + Namespace: pkg.GetEventingSystemNamespace(), + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(clusterBus, schema.GroupVersionKind{ + Group: channelsv1alpha1.SchemeGroupVersion.Group, + Version: channelsv1alpha1.SchemeGroupVersion.Version, + Kind: "ClusterBus", + }), + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &one, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: clusterBusControllerServiceAccountName, + Containers: []corev1.Container{ + *container, + }, + Volumes: volumes, + }, + }, + }, + } +} diff --git a/pkg/controller/names.go b/pkg/controller/names.go index d59e99a05ef..e8deb1a68a9 100644 --- a/pkg/controller/names.go +++ b/pkg/controller/names.go @@ -38,6 +38,18 @@ func BusDispatcherServiceName(busName string) string { return fmt.Sprintf("%s-bus", busName) } +func ClusterBusProvisionerDeploymentName(clusterBusName string) string { + return fmt.Sprintf("%s-clusterbus-provisioner", clusterBusName) +} + +func ClusterBusDispatcherDeploymentName(clusterBusName string) string { + return fmt.Sprintf("%s-clusterbus", clusterBusName) +} + +func ClusterBusDispatcherServiceName(clusterBusName string) string { + return fmt.Sprintf("%s-clusterbus", clusterBusName) +} + func ChannelVirtualServiceName(channelName string) string { return fmt.Sprintf("%s-channel", channelName) } diff --git a/pkg/webhook/channel.go b/pkg/webhook/channel.go index 88879705580..8b16604289f 100644 --- a/pkg/webhook/channel.go +++ b/pkg/webhook/channel.go @@ -25,9 +25,11 @@ import ( ) var ( - errInvalidChannelInput = errors.New("failed to convert input into Channel") - errInvalidChannelBusMissing = errors.New("the Channel must reference a Bus") - errInvalidChannelBusMutation = errors.New("the Channel's Bus may not change") + errInvalidChannelInput = errors.New("failed to convert input into Channel") + errInvalidChannelBusMissing = errors.New("the Channel must reference a Bus or ClusterBus") + errInvalidChannelBusExclusivity = errors.New("the Channel must reference either a Bus or ClusterBus, not both") + errInvalidChannelBusMutation = errors.New("the Channel's Bus may not change") + errInvalidChannelClusterBusMutation = errors.New("the Channel's ClusterBus may not change") ) // ValidateChannel is Channel resource specific validation and mutation handler @@ -43,11 +45,20 @@ func ValidateChannel(ctx context.Context) ResourceCallback { } func validateChannel(old, new *v1alpha1.Channel) error { - if len(new.Spec.Bus) == 0 { + refsBus := len(new.Spec.Bus) != 0 + refsClusterBus := len(new.Spec.ClusterBus) != 0 + if !refsBus && !refsClusterBus { return errInvalidChannelBusMissing + } else if refsBus && refsClusterBus { + return errInvalidChannelBusExclusivity } - if old != nil && old.Spec.Bus != new.Spec.Bus { - return errInvalidChannelBusMutation + if old != nil { + if old.Spec.Bus != new.Spec.Bus { + return errInvalidChannelBusMutation + } + if old.Spec.ClusterBus != new.Spec.ClusterBus { + return errInvalidChannelClusterBusMutation + } } return nil } diff --git a/pkg/webhook/channel_test.go b/pkg/webhook/channel_test.go index 21fc229eb2e..11c2ec12ecb 100644 --- a/pkg/webhook/channel_test.go +++ b/pkg/webhook/channel_test.go @@ -19,15 +19,22 @@ import ( "testing" ) -func TestNewChannel(t *testing.T) { - c := createChannel(testChannelName, testBusName) +func TestNewChannelNSBus(t *testing.T) { + c := createChannel(testChannelName, testBusName, "") + if err := ValidateChannel(testCtx)(nil, nil, &c); err != nil { + t.Errorf("Expected success, but failed with: %s", err) + } +} + +func TestNewChannelClusterBus(t *testing.T) { + c := createChannel(testChannelName, "", testClusterBusName) if err := ValidateChannel(testCtx)(nil, nil, &c); err != nil { t.Errorf("Expected success, but failed with: %s", err) } } func TestNewEmptyChannel(t *testing.T) { - c := createChannel(testChannelName, "") + c := createChannel(testChannelName, "", "") err := ValidateChannel(testCtx)(nil, nil, &c) if err == nil { t.Errorf("Expected failure, but succeeded with: %+v", c) @@ -37,16 +44,27 @@ func TestNewEmptyChannel(t *testing.T) { } } -func TestChannelMutation(t *testing.T) { - c := createChannel(testChannelName, testBusName) +func TestNewExclusiveChannel(t *testing.T) { + c := createChannel(testChannelName, testBusName, testClusterBusName) + err := ValidateChannel(testCtx)(nil, nil, &c) + if err == nil { + t.Errorf("Expected failure, but succeeded with: %+v", c) + } + if e, a := errInvalidChannelBusExclusivity, err; e != a { + t.Errorf("Expected %s got %s", e, a) + } +} + +func TestChannelNoopMutation(t *testing.T) { + c := createChannel(testChannelName, testBusName, "") if err := ValidateChannel(testCtx)(nil, &c, &c); err != nil { t.Errorf("Expected success, but failed with: %s", err) } } -func TestChannelBusMutation(t *testing.T) { - old := createChannel(testChannelName, "stub") - new := createChannel(testChannelName, "pubsub") +func TestChannelNSBusMutation(t *testing.T) { + old := createChannel(testChannelName, "stub", "") + new := createChannel(testChannelName, "pubsub", "") err := ValidateChannel(testCtx)(nil, &old, &new) if err == nil { t.Errorf("Expected failure, but succeeded with: %+v %+v", old, new) @@ -55,3 +73,15 @@ func TestChannelBusMutation(t *testing.T) { t.Errorf("Expected %s got %s", e, a) } } + +func TestChannelClusterBusMutation(t *testing.T) { + old := createChannel(testChannelName, "", "stub") + new := createChannel(testChannelName, "", "pubsub") + err := ValidateChannel(testCtx)(nil, &old, &new) + if err == nil { + t.Errorf("Expected failure, but succeeded with: %+v %+v", old, new) + } + if e, a := errInvalidChannelClusterBusMutation, err; e != a { + t.Errorf("Expected %s got %s", e, a) + } +} diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go index bd24d1d19e8..5a4c073ecb5 100644 --- a/pkg/webhook/webhook_test.go +++ b/pkg/webhook/webhook_test.go @@ -47,6 +47,7 @@ func newDefaultOptions() ControllerOptions { const ( testNamespace = "test-namespace" testBusName = "test-bus" + testClusterBusName = "test-clusterbus" testChannelName = "test-channel" testSubscriptionName = "test-subscription" ) @@ -133,7 +134,7 @@ func TestInvalidNewChannelNameFails(t *testing.T) { Kind: metav1.GroupVersionKind{Kind: "Channel"}, } invalidName := "channel.example" - channel := createChannel(invalidName, "bus-name") + channel := createChannel(invalidName, testBusName, "") marshaled, err := json.Marshal(channel) if err != nil { t.Fatalf("Failed to marshal channel: %s", err) @@ -142,7 +143,7 @@ func TestInvalidNewChannelNameFails(t *testing.T) { expectFailsWith(t, ac.admit(testCtx, req), "Invalid resource name") invalidName = strings.Repeat("a", 64) - channel = createChannel(invalidName, "bus-name") + channel = createChannel(invalidName, testBusName, "") marshaled, err = json.Marshal(channel) if err != nil { t.Fatalf("Failed to marshal channel: %s", err) @@ -158,10 +159,19 @@ func TestValidNewChannelObject(t *testing.T) { expectPatches(t, resp.Patch, []jsonpatch.JsonPatchOperation{}) } -func TestValidChannelNoChanges(t *testing.T) { +func TestValidChannelNSBusNoChanges(t *testing.T) { _, ac := newNonRunningTestAdmissionController(t, newDefaultOptions()) - old := createChannel(testChannelName, testBusName) - new := createChannel(testChannelName, testBusName) + old := createChannel(testChannelName, testBusName, "") + new := createChannel(testChannelName, testBusName, "") + resp := ac.admit(testCtx, createUpdateChannel(&old, &new)) + expectAllowed(t, resp) + expectPatches(t, resp.Patch, []jsonpatch.JsonPatchOperation{}) +} + +func TestValidChannelClusterBusNoChanges(t *testing.T) { + _, ac := newNonRunningTestAdmissionController(t, newDefaultOptions()) + old := createChannel(testChannelName, "", testClusterBusName) + new := createChannel(testChannelName, "", testClusterBusName) resp := ac.admit(testCtx, createUpdateChannel(&old, &new)) expectAllowed(t, resp) expectPatches(t, resp.Patch, []jsonpatch.JsonPatchOperation{}) @@ -345,17 +355,18 @@ func createCreateChannel(channel v1alpha1.Channel) *admissionv1beta1.AdmissionRe } func createValidCreateChannel() *admissionv1beta1.AdmissionRequest { - return createCreateChannel(createChannel(testChannelName, testBusName)) + return createCreateChannel(createChannel(testChannelName, testBusName, "")) } -func createChannel(channelName string, busName string) v1alpha1.Channel { +func createChannel(channelName string, busName, clusterBusName string) v1alpha1.Channel { return v1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNamespace, Name: channelName, }, Spec: v1alpha1.ChannelSpec{ - Bus: busName, + Bus: busName, + ClusterBus: clusterBusName, }, } } diff --git a/sample/hello/hello-channel.yaml b/sample/hello/hello-channel.yaml index 7791e7168ed..b7d8c7dbc49 100644 --- a/sample/hello/hello-channel.yaml +++ b/sample/hello/hello-channel.yaml @@ -4,4 +4,3 @@ metadata: name: aloha spec: bus: stub - \ No newline at end of file