diff --git a/cmd/fanoutsidecar/main.go b/cmd/fanoutsidecar/main.go index 1ece5804172..629babe71e2 100644 --- a/cmd/fanoutsidecar/main.go +++ b/cmd/fanoutsidecar/main.go @@ -21,9 +21,13 @@ package main import ( "context" - "errors" "flag" "fmt" + "log" + "net/http" + "strings" + "time" + "github.com/knative/eventing/pkg/sidecar/configmap/filesystem" "github.com/knative/eventing/pkg/sidecar/configmap/watcher" "github.com/knative/eventing/pkg/sidecar/swappable" @@ -31,17 +35,13 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" "k8s.io/client-go/kubernetes" - "log" - "net/http" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" - "strings" - "time" ) const ( - defaultConfigMapName = "in-memory-bus-config" + defaultConfigMapName = "in-memory-channel-dispatcher-config-map" // The following are the only valid values of the config_map_noticer flag. cmnfVolume = "volume" @@ -60,12 +60,12 @@ var ( func init() { flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.") - flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerFlags())) + flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerValues())) flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace, "The namespace of the ConfigMap that is watched for configuration.") flag.StringVar(&configMapName, "config_map_name", defaultConfigMapName, "The name of the ConfigMap that is watched for configuration.") } -func configMapNoticerFlags() string { +func configMapNoticerValues() string { return strings.Join([]string{cmnfVolume, cmnfWatcher}, ", ") } @@ -132,7 +132,7 @@ func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateCon case cmnfWatcher: err = setupConfigMapWatcher(logger, mgr, configUpdated) default: - err = errors.New("need to provide the --config_map_noticer flag") + err = fmt.Errorf("need to provide the --config_map_noticer flag (valid values are %s)", configMapNoticerValues()) } if err != nil { return nil, err diff --git a/config/provisioners/in-memory-channel/README.md b/config/provisioners/in-memory-channel/README.md new file mode 100644 index 00000000000..d895f9b32c4 --- /dev/null +++ b/config/provisioners/in-memory-channel/README.md @@ -0,0 +1,63 @@ +# In-Memory Channels + +In-memory channels are a best effort Channel. They should **NOT** be used in Production. They are +useful for development. + +They differ from most Channels in that they have: +* No persistence. + - If a Pod goes down, messages go with it. +* No ordering guarantee. + - There is nothing enforcing an ordering, so two messages that arrive at the same time may + go downstream in any order. + - Different downstream subscribers may see different orders. +* No redelivery attempts. + - If downstream rejects a request, a log message is written, but that request is never sent + again. + + +### Deployment steps: + +1. Setup [Knative Eventing](../../../DEVELOPMENT.md). +1. Apply the 'in-memory-channel' ClusterProvisioner, Controller, and Dispatcher. + ```shell + ko apply -f config/providers/in-memory-channel/in-memory-channel.yaml + ```` +1. Create Channels that reference the 'in-memory-channel'. + + ```yaml + apiVersion: eventing.knative.dev/v1alpha1 + kind: Channel + metadata: + name: foo + spec: + provisioner: + ref: + apiVersion: eventing.knative.dev/v1alpha1 + kind: ClusterProvisioner + name: in-memory-channel + ``` + +### Components + +The major components are: +* ClusterProvisioner Controller +* Channel Controller +* Channel Dispatcher +* Channel Dispatcher Config Map. + +The ClusterProvisioner Controller and the Channel Controller are colocated in one Pod. +```shell +kubectl get deployment -n knative-eventing in-memory-channel-controller +``` + +The Channel Dispatcher receives and distributes all events. There is a single Dispatcher for all +in-memory Channels. +```shell +kubectl get deployment -n knative-eventing in-memory-channel-dispatcher +``` + +The Channel Dispatcher Config Map is used to send information about Channels and Subscriptions from +the Channel Controller to the Channel Dispatcher. +```shell +kubectl get configmap -n knative-eventing in-memory-channel-dispatcher-config-map +``` diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml new file mode 100644 index 00000000000..44a15c66175 --- /dev/null +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -0,0 +1,183 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: eventing.knative.dev/v1alpha1 +kind: ClusterProvisioner +metadata: + name: in-memory-channel +spec: + reconciles: + group: eventing.knative.dev/v1alpha1 + kind: Channel + +--- + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: in-memory-channel-controller + namespace: knative-eventing + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: in-memory-channel-controller +rules: + - apiGroups: + - eventing.knative.dev + resources: + - channels + - clusterprovisioners + verbs: + - get + - list + - watch + - update + - apiGroups: + - "" # Core API group. + resources: + - configmaps + - services + verbs: + - get + - list + - watch + - create + - apiGroups: + - "" # Core API Group. + resources: + - configmaps + resourceNames: + - in-memory-channel-dispatcher-config-map + verbs: + - update + - apiGroups: + - networking.istio.io + resources: + - virtualservices + verbs: + - get + - list + - watch + - create + +--- + +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: in-memory-channel-controller + namespace: knative-eventing +subjects: + - kind: ServiceAccount + name: in-memory-channel-controller + namespace: knative-eventing +roleRef: + kind: ClusterRole + name: in-memory-channel-controller + apiGroup: rbac.authorization.k8s.io + +--- + +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: in-memory-channel-controller + namespace: knative-eventing +spec: + replicas: 1 + selector: + matchLabels: &labels + clusterProvisioner: in-memory-channel + role: controller + template: + metadata: + labels: *labels + spec: + serviceAccountName: in-memory-channel-controller + containers: + - name: controller + image: github.com/knative/eventing/pkg/controller/eventing/inmemory/controller + +--- + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: in-memory-channel-dispatcher + namespace: knative-eventing + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: in-memory-channel-dispatcher + namespace: knative-eventing +rules: + - apiGroups: + - "" # Core API group. + resources: + - configmaps + verbs: + - get + - list + - watch + +--- + +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: in-memory-channel-dispatcher + namespace: knative-eventing +subjects: + - kind: ServiceAccount + name: in-memory-channel-dispatcher + namespace: knative-eventing +roleRef: + kind: ClusterRole + name: in-memory-channel-dispatcher + apiGroup: rbac.authorization.k8s.io + +--- + +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: in-memory-channel-dispatcher + namespace: knative-eventing +spec: + replicas: 1 + selector: + matchLabels: &labels + clusterProvisioner: in-memory-channel + role: dispatcher + template: + metadata: + annotations: + sidecar.istio.io/inject: "true" + labels: *labels + spec: + serviceAccountName: in-memory-channel-dispatcher + containers: + - name: dispatcher + image: github.com/knative/eventing/cmd/fanoutsidecar + args: + - --sidecar_port=8080 + - --config_map_noticer=watcher + - --config_map_namespace=knative-eventing + - --config_map_name=in-memory-channel-dispatcher-config-map diff --git a/config/provisioners/in-memory-channel/tmp/bus-with-subs.yaml b/config/provisioners/in-memory-channel/tmp/bus-with-subs.yaml new file mode 100644 index 00000000000..771d8cc7520 --- /dev/null +++ b/config/provisioners/in-memory-channel/tmp/bus-with-subs.yaml @@ -0,0 +1,104 @@ +# This sets up two channels and some subscriptions: +# qux-1 -> event-changer -> qux-2 -> message-dumper-foo +# V L> message-dumper-bar +# message-dumper-foo +# +# So any message going to qux-1 is expected to appear in message-dumper-foo twice (one of which is +# altered) and message-dumper-bar once. + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Channel +metadata: + name: qux-1 +spec: + provisioner: + ref: + apiVersion: eventing.knative.dev/v1alpha1 + kind: ClusterProvisioner + name: in-memory-channel + +--- + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Channel +metadata: + name: qux-2 +spec: + provisioner: + ref: + apiVersion: eventing.knative.dev/v1alpha1 + kind: ClusterProvisioner + name: in-memory-channel + +--- + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Subscription +metadata: + name: qux-1-dumper-foo +spec: + from: + apiVersion: eventing.knative.dev/v1alpha1 + kind: Channel + name: qux-1 + call: + target: + apiVersion: v1 + kind: Service + name: message-dumper-foo + +--- + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Subscription +metadata: + name: qux-changer +spec: + from: + apiVersion: eventing.knative.dev/v1alpha1 + kind: Channel + name: qux-1 + call: + target: + apiVersion: v1 + kind: Service + name: event-changer + result: + target: + apiVersion: eventing.knative.dev/v1alpha1 + kind: Channel + name: qux-2 + +--- + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Subscription +metadata: + name: qux-2-dumper-foo +spec: + from: + apiVersion: eventing.knative.dev/v1alpha1 + kind: Channel + name: qux-2 + call: + target: + apiVersion: v1 + kind: Service + name: message-dumper-foo + +--- + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Subscription +metadata: + name: qux-2-dumper-bar +spec: + from: + apiVersion: eventing.knative.dev/v1alpha1 + kind: Channel + name: qux-2 + call: + target: + apiVersion: v1 + kind: Service + name: message-dumper-bar diff --git a/config/provisioners/in-memory-channel/tmp/bus2.yaml b/config/provisioners/in-memory-channel/tmp/bus2.yaml new file mode 100644 index 00000000000..78d2852e2f2 --- /dev/null +++ b/config/provisioners/in-memory-channel/tmp/bus2.yaml @@ -0,0 +1,34 @@ + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Channel +metadata: + name: foo +spec: + provisioner: + ref: + apiVersion: eventing.knative.dev/v1alpha1 + kind: ClusterProvisioner + name: in-memory-channel + channelable: + subscribers: + - sinkableDomain: message-dumper-foo.default.svc.cluster.local # Already exists + callableDomain: event-changer.default.svc.cluster.local + - sinkableDomain: message-dumper-bar.default.svc.cluster.local # Already exists + +--- + +apiVersion: eventing.knative.dev/v1alpha1 +kind: Channel +metadata: + name: bar +spec: + provisioner: + ref: + apiVersion: eventing.knative.dev/v1alpha1 + kind: ClusterProvisioner + name: in-memory-channel + channelable: + subscribers: + - sinkableDomain: message-dumper-bar.default.svc.cluster.local # Already exists + callableDomain: event-changer.default.svc.cluster.local + diff --git a/pkg/apis/eventing/v1alpha1/channel_types.go b/pkg/apis/eventing/v1alpha1/channel_types.go index 8c69a2124fa..d5122b08344 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types.go +++ b/pkg/apis/eventing/v1alpha1/channel_types.go @@ -20,6 +20,7 @@ import ( "github.com/knative/pkg/apis" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/webhook" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -75,7 +76,7 @@ type ChannelSpec struct { Channelable *duckv1alpha1.Channelable `json:"channelable,omitempty"` } -var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned) +var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionSinkable, ChannelConditionSubscribable) // ChannelStatus represents the current state of a Channel. type ChannelStatus struct { @@ -110,6 +111,14 @@ const ( // ChannelConditionProvisioned has status True when the Channel's backing // resources have been provisioned. ChannelConditionProvisioned duckv1alpha1.ConditionType = "Provisioned" + + // ChannelConditionSinkable has status true when this Channel meets the Sinkable contract and + // has a non-empty domainInternal. + ChannelConditionSinkable duckv1alpha1.ConditionType = "Sinkable" + + // ChannelConditionSubscribable has status true when this Channel meets the Subscribable + // contract and has a non-empty Channelable object reference. + ChannelConditionSubscribable duckv1alpha1.ConditionType = "Subscribable" ) // GetCondition returns the condition currently associated with the given type, or nil. @@ -132,6 +141,36 @@ func (cs *ChannelStatus) MarkProvisioned() { chanCondSet.Manage(cs).MarkTrue(ChannelConditionProvisioned) } +// SetSubscribable makes this Channel Subscribable, by having it point at itself. The 'name' and +// 'namespace' should be the name and namespace of the Channel this ChannelStatus is on. It also +// sets the ChannelConditionSubscribable to true. +func (cs *ChannelStatus) SetSubscribable(namespace, name string) { + if namespace != "" || name != "" { + cs.Subscribable.Channelable = corev1.ObjectReference{ + Kind: "Channel", + APIVersion: SchemeGroupVersion.String(), + Namespace: namespace, + Name: name, + } + chanCondSet.Manage(cs).MarkTrue(ChannelConditionSubscribable) + } else { + cs.Subscribable.Channelable = corev1.ObjectReference{} + chanCondSet.Manage(cs).MarkFalse(ChannelConditionSubscribable, "notSubscribable", "not Subscribable") + } + +} + +// SetSinkable makes this Channel sinkable by setting the domainInternal. It also sets the +// ChannelConditionSinkable to true. +func (cs *ChannelStatus) SetSinkable(domainInternal string) { + cs.Sinkable.DomainInternal = domainInternal + if domainInternal != "" { + chanCondSet.Manage(cs).MarkTrue(ChannelConditionSinkable) + } else { + chanCondSet.Manage(cs).MarkFalse(ChannelConditionSinkable, "emptyDomainInternal", "domainInternal is the empty string") + } +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // ChannelList is a collection of Channels. diff --git a/pkg/apis/eventing/v1alpha1/channel_types_test.go b/pkg/apis/eventing/v1alpha1/channel_types_test.go index 2f208018207..e2e6cac4104 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types_test.go +++ b/pkg/apis/eventing/v1alpha1/channel_types_test.go @@ -35,6 +35,8 @@ var condUnprovisioned = duckv1alpha1.Condition{ Status: corev1.ConditionFalse, } +var ignoreTransitionTimeMessageAndReason = cmpopts.IgnoreFields(duckv1alpha1.Condition{}, "LastTransitionTime", "Message", "Reason") + func TestChannelGetCondition(t *testing.T) { tests := []struct { name string @@ -97,6 +99,12 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionSinkable, + Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionSubscribable, + Status: corev1.ConditionUnknown, }}, }, }, { @@ -114,6 +122,12 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionSinkable, + Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionSubscribable, + Status: corev1.ConditionUnknown, }}, }, }, { @@ -131,6 +145,12 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionSinkable, + Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionSubscribable, + Status: corev1.ConditionUnknown, }}}, }, } @@ -150,14 +170,20 @@ func TestChannelIsReady(t *testing.T) { tests := []struct { name string markProvisioned bool + setSubscribable bool + setSinkable bool wantReady bool }{{ name: "all happy", markProvisioned: true, + setSubscribable: true, + setSinkable: true, wantReady: true, }, { name: "one sad", markProvisioned: false, + setSubscribable: true, + setSinkable: true, wantReady: false, }} for _, test := range tests { @@ -166,6 +192,12 @@ func TestChannelIsReady(t *testing.T) { if test.markProvisioned { cs.MarkProvisioned() } + if test.setSubscribable { + cs.SetSubscribable("foo", "bar") + } + if test.setSinkable { + cs.SetSinkable("foo.bar") + } got := cs.IsReady() if test.wantReady != got { t.Errorf("unexpected readiness: want %v, got %v", test.wantReady, got) @@ -173,3 +205,122 @@ func TestChannelIsReady(t *testing.T) { }) } } + +func TestChannelStatus_SetSubscribable(t *testing.T) { + testCases := map[string]struct { + namespace string + name string + want *ChannelStatus + }{ + "empty namespace and name": { + want: &ChannelStatus{ + Conditions: []duckv1alpha1.Condition{ + // Note that Ready is here because when the condition is marked False, duck + // automatically sets Ready to false. + { + Type: ChannelConditionReady, + Status: corev1.ConditionFalse, + }, + { + Type: ChannelConditionSubscribable, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + "empty namespace": { + name: "foobar", + want: &ChannelStatus{ + Subscribable: duckv1alpha1.Subscribable{ + Channelable: corev1.ObjectReference{ + APIVersion: SchemeGroupVersion.String(), + Kind: "Channel", + Name: "foobar", + }, + }, + Conditions: []duckv1alpha1.Condition{ + { + Type: ChannelConditionSubscribable, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + "subscribable": { + namespace: "test-namespace", + name: "test-name", + want: &ChannelStatus{ + Subscribable: duckv1alpha1.Subscribable{ + Channelable: corev1.ObjectReference{ + APIVersion: SchemeGroupVersion.String(), + Kind: "Channel", + Namespace: "test-namespace", + Name: "test-name", + }, + }, + Conditions: []duckv1alpha1.Condition{ + { + Type: ChannelConditionSubscribable, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + cs := &ChannelStatus{} + cs.SetSubscribable(tc.namespace, tc.name) + if diff := cmp.Diff(tc.want, cs, ignoreTransitionTimeMessageAndReason); diff != "" { + t.Errorf("unexpected conditions (-want, +got) = %v", diff) + } + }) + } +} + +func TestChannelStatus_SetSinkable(t *testing.T) { + testCases := map[string]struct { + domainInternal string + want *ChannelStatus + }{ + "empty string": { + want: &ChannelStatus{ + Conditions: []duckv1alpha1.Condition{ + // Note that Ready is here because when the condition is marked False, duck + // automatically sets Ready to false. + { + Type: ChannelConditionReady, + Status: corev1.ConditionFalse, + }, + { + Type: ChannelConditionSinkable, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + "has domain": { + domainInternal: "test-domain", + want: &ChannelStatus{ + Sinkable: duckv1alpha1.Sinkable{ + DomainInternal: "test-domain", + }, + Conditions: []duckv1alpha1.Condition{ + { + Type: ChannelConditionSinkable, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + cs := &ChannelStatus{} + cs.SetSinkable(tc.domainInternal) + if diff := cmp.Diff(tc.want, cs, ignoreTransitionTimeMessageAndReason); diff != "" { + t.Errorf("unexpected conditions (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/apis/eventing/v1alpha1/cluster_provisioner_types.go b/pkg/apis/eventing/v1alpha1/cluster_provisioner_types.go index d093f450208..6a0a32ecb0b 100644 --- a/pkg/apis/eventing/v1alpha1/cluster_provisioner_types.go +++ b/pkg/apis/eventing/v1alpha1/cluster_provisioner_types.go @@ -82,6 +82,12 @@ type ClusterProvisionerStatus struct { ObservedGeneration int64 `json:"observedGeneration,omitempty"` } +const ( + // ClusterProvisionerConditionReady has status True when the Controller reconciling objects + // controlled by it is ready to control them. + ClusterProvisionerConditionReady = duckv1alpha1.ConditionReady +) + // GetCondition returns the condition currently associated with the given type, or nil. func (ps *ClusterProvisionerStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition { return cProvCondSet.Manage(ps).GetCondition(t) @@ -97,6 +103,14 @@ func (ps *ClusterProvisionerStatus) InitializeConditions() { cProvCondSet.Manage(ps).InitializeConditions() } +// MarkReady marks this ClusterProvisioner as Ready=true. +// +// Note that this is not the normal pattern for duck conditions, but because there is (currently) +// no other condition on ClusterProvisioners, the normal IsReady() logic doesn't work well. +func (ps *ClusterProvisionerStatus) MarkReady() { + cProvCondSet.Manage(ps).MarkTrue(ClusterProvisionerConditionReady) +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // ClusterProvisionerList is a list of ClusterProvisioner resources diff --git a/pkg/apis/eventing/v1alpha1/cluster_provisioner_types_test.go b/pkg/apis/eventing/v1alpha1/cluster_provisioner_types_test.go index 8a9ab747aa0..afb7e8a0ff9 100644 --- a/pkg/apis/eventing/v1alpha1/cluster_provisioner_types_test.go +++ b/pkg/apis/eventing/v1alpha1/cluster_provisioner_types_test.go @@ -116,3 +116,15 @@ func TestClusterProvisionerStatusGetCondition(t *testing.T) { }) } } + +func TestClusterProvisionerStatus_MarkReady(t *testing.T) { + ps := ClusterProvisionerStatus{} + ps.InitializeConditions() + if ps.IsReady() { + t.Errorf("Should not be ready when initialized.") + } + ps.MarkReady() + if !ps.IsReady() { + t.Errorf("Should be ready after MarkReady() was called.") + } +} diff --git a/pkg/buses/message_receiver.go b/pkg/buses/message_receiver.go index efe38c7dae1..a4aa93f4435 100644 --- a/pkg/buses/message_receiver.go +++ b/pkg/buses/message_receiver.go @@ -168,7 +168,7 @@ func (r *MessageReceiver) fromHTTPHeaders(headers http.Header) map[string]string return safe } -// parseChannel converts the channel's hostname into a channel +// ParseChannel converts the channel's hostname into a channel // reference. func ParseChannel(host string) ChannelReference { chunks := strings.Split(host, ".") diff --git a/pkg/controller/eventing/inmemory/channel/controller.go b/pkg/controller/eventing/inmemory/channel/controller.go new file mode 100644 index 00000000000..b139d622f2a --- /dev/null +++ b/pkg/controller/eventing/inmemory/channel/controller.go @@ -0,0 +1,95 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package channel + +import ( + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/system" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "in-memory-channel-controller" + + // ConfigMapName is the name of the ConfigMap in the knative-eventing namespace that contains + // the subscription information for all in-memory Channels. The Provisioner writes to it and the + // Dispatcher reads from it. + ConfigMapName = "in-memory-channel-dispatcher-config-map" +) + +var ( + defaultConfigMapKey = types.NamespacedName{ + Namespace: system.Namespace, + Name: ConfigMapName, + } +) + +// ProvideController returns a Controller that represents the in-memory-channel Provisioner. +func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Controller, error) { + // Setup a new controller to Reconcile Channels that belong to this Cluster Provisioner + // (in-memory channels). + r := &reconciler{ + configMapKey: defaultConfigMapKey, + recorder: mgr.GetRecorder(controllerAgentName), + logger: logger, + } + c, err := controller.New(controllerAgentName, mgr, controller.Options{ + Reconciler: r, + }) + if err != nil { + logger.Error("Unable to create controller.", zap.Error(err)) + return nil, err + } + + // Watch Channels. + err = c.Watch(&source.Kind{ + Type: &eventingv1alpha1.Channel{}, + }, &handler.EnqueueRequestForObject{}) + if err != nil { + logger.Error("Unable to watch Channels.", zap.Error(err), zap.Any("type", &eventingv1alpha1.Channel{})) + return nil, err + } + + // Watch the K8s Services that are owned by Channels. + err = c.Watch(&source.Kind{ + Type: &corev1.Service{}, + }, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true}) + if err != nil { + logger.Error("Unable to watch K8s Services.", zap.Error(err)) + return nil, err + } + + // Watch the VirtualServices that are owned by Channels. + err = c.Watch(&source.Kind{ + Type: &istiov1alpha3.VirtualService{}, + }, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true}) + if err != nil { + logger.Error("Unable to watch VirtualServices.", zap.Error(err)) + return nil, err + } + + return c, nil +} diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go new file mode 100644 index 00000000000..439361e8978 --- /dev/null +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -0,0 +1,435 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package channel + +import ( + "context" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + cpcontroller "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterprovisioner" + "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + "github.com/knative/eventing/pkg/system" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + portName = "http" + portNumber = 80 + finalizerName = controllerAgentName +) + +type reconciler struct { + client client.Client + recorder record.EventRecorder + logger *zap.Logger + + configMapKey client.ObjectKey +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + return nil +} + +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + // TODO: use this to store the logger and set a deadline + ctx := context.TODO() + logger := r.logger.With(zap.Any("request", request)) + + c := &eventingv1alpha1.Channel{} + err := r.client.Get(ctx, request.NamespacedName, c) + + // The Channel may have been deleted since it was added to the workqueue. If so, there is + // nothing to be done. + if errors.IsNotFound(err) { + logger.Info("Could not find Channel", zap.Error(err)) + return reconcile.Result{}, nil + } + + // Any other error should be retried in another reconciliation. + if err != nil { + logger.Error("Unable to Get Channel", zap.Error(err)) + return reconcile.Result{}, err + } + + // Does this Controller control this Channel? + if !r.shouldReconcile(c) { + logger.Info("Not reconciling Channel, it is not controlled by this Controller", zap.Any("ref", c.Spec)) + return reconcile.Result{}, nil + } + logger.Info("Reconciling Channel") + + // Modify a copy, not the original. + c = c.DeepCopy() + + err = r.reconcile(ctx, c) + if err != nil { + logger.Info("Error reconciling Channel", zap.Error(err)) + // Note that we do not return the error here, because we want to update the Status + // regardless of the error. + } + + if updateStatusErr := r.updateChannel(ctx, c); updateStatusErr != nil { + logger.Info("Error updating Channel Status", zap.Error(updateStatusErr)) + return reconcile.Result{}, updateStatusErr + } + + return reconcile.Result{}, err +} + +// shouldReconcile determines if this Controller should control (and therefore reconcile) a given +// ClusterProvisioner. This Controller only handles in-memory channels. +func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool { + if c.Spec.Provisioner != nil { + return cpcontroller.IsControlled(c.Spec.Provisioner, cpcontroller.Channel) + } + return false +} + +func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) error { + logger := r.logger.With(zap.Any("channel", c)) + + c.Status.InitializeConditions() + + // We are syncing three things: + // 1. The K8s Service to talk to this Channel. + // 2. The Istio VirtualService to talk to this Channel. + // 3. The configuration of all Channel subscriptions. + + // We always need to sync the Channel config, so do it first. + if err := r.syncChannelConfig(ctx); err != nil { + logger.Info("Error updating syncing the Channel config", zap.Error(err)) + return err + } + + if c.DeletionTimestamp != nil { + // K8s garbage collection will delete the K8s service and VirtualService for this channel. + // We use a finalizer to ensure the channel config has been synced. + r.removeFinalizer(c) + return nil + } + + r.addFinalizer(c) + c.Status.SetSubscribable(c.Namespace, c.Name) + + if svc, err := r.createK8sService(ctx, c); err != nil { + logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) + return err + } else { + c.Status.SetSinkable(controller.ServiceHostName(svc.Name, svc.Namespace)) + } + + if err := r.createVirtualService(ctx, c); err != nil { + logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) + return err + } + + c.Status.MarkProvisioned() + return nil +} + +func (r *reconciler) addFinalizer(c *eventingv1alpha1.Channel) { + finalizers := sets.NewString(c.Finalizers...) + finalizers.Insert(finalizerName) + c.Finalizers = finalizers.List() +} + +func (r *reconciler) removeFinalizer(c *eventingv1alpha1.Channel) { + finalizers := sets.NewString(c.Finalizers...) + finalizers.Delete(finalizerName) + c.Finalizers = finalizers.List() +} + +func (r *reconciler) getK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) { + svcKey := types.NamespacedName{ + Namespace: c.Namespace, + Name: controller.ChannelServiceName(c.Name), + } + svc := &corev1.Service{} + err := r.client.Get(ctx, svcKey, svc) + return svc, err +} + +func (r *reconciler) createK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) { + svc, err := r.getK8sService(ctx, c) + + if errors.IsNotFound(err) { + svc = newK8sService(c) + err = r.client.Create(ctx, svc) + } + + // If an error occurred in either Get or Create, we need to reconcile again. + if err != nil { + return nil, err + } + + // Check if this Channel is the owner of the K8s service. + if !metav1.IsControlledBy(svc, c) { + r.logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc)) + } + return svc, nil +} + +func (r *reconciler) getVirtualService(ctx context.Context, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { + vsk := client.ObjectKey{ + Namespace: c.Namespace, + Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name), + } + vs := &istiov1alpha3.VirtualService{} + err := r.client.Get(ctx, vsk, vs) + return vs, err +} + +func (r *reconciler) createVirtualService(ctx context.Context, c *eventingv1alpha1.Channel) error { + virtualService, err := r.getVirtualService(ctx, c) + + // If the resource doesn't exist, we'll create it + if errors.IsNotFound(err) { + virtualService = newVirtualService(c) + err = r.client.Create(ctx, virtualService) + } + + // If an error occurs during Get/Create, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return err + } + + // If the Virtual Service is not controlled by this Channel, we should log a warning, but don't + // consider it an error. + if !metav1.IsControlledBy(virtualService, c) { + r.logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService)) + } + return nil +} + +// newK8sService creates a new Service for a Channel resource. It also sets the appropriate +// OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it. +// As well as being garbage collected when the Channel is deleted. +func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { + labels := map[string]string{ + "channel": c.Name, + "provisioner": c.Spec.Provisioner.Ref.Name, + } + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ChannelServiceName(c.ObjectMeta.Name), + Namespace: c.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(c, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "Channel", + }), + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: portName, + Port: portNumber, + }, + }, + }, + } +} + +// newVirtualService creates a new VirtualService for a Channel resource. It also sets the +// appropriate OwnerReferences on the resource so handleObject can discover the Channel resource +// that 'owns' it. As well as being garbage collected when the Channel is deleted. +func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.VirtualService { + labels := map[string]string{ + "channel": channel.Name, + "provisioner": channel.Spec.Provisioner.Ref.Name, + } + destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Ref.Name), system.Namespace) + return &istiov1alpha3.VirtualService{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ChannelVirtualServiceName(channel.Name), + Namespace: channel.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(channel, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "Channel", + }), + }, + }, + Spec: istiov1alpha3.VirtualServiceSpec{ + Hosts: []string{ + controller.ServiceHostName(controller.ChannelServiceName(channel.Name), channel.Namespace), + controller.ChannelHostName(channel.Name, channel.Namespace), + }, + Http: []istiov1alpha3.HTTPRoute{{ + Rewrite: &istiov1alpha3.HTTPRewrite{ + Authority: controller.ChannelHostName(channel.Name, channel.Namespace), + }, + Route: []istiov1alpha3.DestinationWeight{{ + Destination: istiov1alpha3.Destination{ + Host: destinationHost, + Port: istiov1alpha3.PortSelector{ + Number: portNumber, + }, + }}, + }}, + }, + }, + } +} + +func (r *reconciler) updateChannel(ctx context.Context, u *eventingv1alpha1.Channel) error { + o := &eventingv1alpha1.Channel{} + if err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { + r.logger.Info("Error getting Channel for status update", zap.Error(err), zap.Any("updatedChannel", u)) + return err + } + + updated := false + if !equality.Semantic.DeepEqual(o.Finalizers, u.Finalizers) { + updated = true + o.SetFinalizers(u.Finalizers) + } + if !equality.Semantic.DeepEqual(o.Status, u.Status) { + updated = true + o.Status = u.Status + } + + if updated { + return r.client.Update(ctx, o) + } + return nil +} + +func (r *reconciler) syncChannelConfig(ctx context.Context) error { + channels, err := r.listAllChannels(ctx) + if err != nil { + r.logger.Info("Unable to list channels", zap.Error(err)) + return err + } + config := multiChannelFanoutConfig(channels) + return r.writeConfigMap(ctx, config) +} + +func (r *reconciler) writeConfigMap(ctx context.Context, config *multichannelfanout.Config) error { + logger := r.logger.With(zap.Any("configMap", r.configMapKey)) + + updated, err := configmap.SerializeConfig(*config) + if err != nil { + r.logger.Error("Unable to serialize config", zap.Error(err), zap.Any("config", config)) + return err + } + + cm := &corev1.ConfigMap{} + err = r.client.Get(ctx, r.configMapKey, cm) + if errors.IsNotFound(err) { + cm = r.createNewConfigMap(updated) + err = r.client.Create(ctx, cm) + } + if err != nil { + logger.Info("Unable to get/create ConfigMap", zap.Error(err)) + return err + } + + if equality.Semantic.DeepEqual(cm.Data, updated) { + // Nothing to update. + return nil + } + + cm.Data = updated + return r.client.Update(ctx, cm) +} + +func (r *reconciler) createNewConfigMap(data map[string]string) *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: r.configMapKey.Namespace, + Name: r.configMapKey.Name, + }, + Data: data, + } +} + +func multiChannelFanoutConfig(channels []eventingv1alpha1.Channel) *multichannelfanout.Config { + cc := make([]multichannelfanout.ChannelConfig, 0) + for _, c := range channels { + channelable := c.Spec.Channelable + if channelable != nil { + cc = append(cc, multichannelfanout.ChannelConfig{ + Namespace: c.Namespace, + Name: c.Name, + FanoutConfig: fanout.Config{ + Subscriptions: c.Spec.Channelable.Subscribers, + }, + }) + } + } + return &multichannelfanout.Config{ + ChannelConfigs: cc, + } +} + +func (r *reconciler) listAllChannels(ctx context.Context) ([]eventingv1alpha1.Channel, error) { + channels := make([]eventingv1alpha1.Channel, 0) + + opts := &client.ListOptions{ + // TODO this is here because the fake client needs it. Remove this when it's no longer + // needed. + Raw: &metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + }, + }, + } + for { + cl := &eventingv1alpha1.ChannelList{} + if err := r.client.List(ctx, opts, cl); err != nil { + return nil, err + } + + for _, c := range cl.Items { + if r.shouldReconcile(&c) { + channels = append(channels, c) + } + } + if cl.Continue != "" { + opts.Raw.Continue = cl.Continue + } else { + return channels, nil + } + } +} diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go new file mode 100644 index 00000000000..13466bad4ae --- /dev/null +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -0,0 +1,823 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package channel + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + controllertesting "github.com/knative/eventing/pkg/controller/testing" + "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +const ( + cpName = "in-memory-channel" + + cNamespace = "test-namespace" + cName = "test-channel" + cUID = "test-uid" + + cmNamespace = cNamespace + cmName = "test-config-map" + + testErrorMessage = "test induced error" + + insertedByVerifyConfigMapData = "data inserted by verifyConfigMapData so that it can be WantPresent" +) + +var ( + // deletionTime is used when objects are marked as deleted. Rfc3339Copy() + // truncates to seconds to match the loss of precision during serialization. + deletionTime = metav1.Now().Rfc3339Copy() + + truePointer = true + + // channelsConfig and channels are linked together. A change to one, will likely require a + // change to the other. channelsConfig is the serialized config of channels for everything + // provisioned by the in-memory-provisioner. + channelsConfig = multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: cNamespace, + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "foo", + }, + { + SinkableDomain: "bar", + }, + { + CallableDomain: "baz", + SinkableDomain: "qux", + }, + }, + }, + }, + { + Namespace: cNamespace, + Name: "c3", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "steve", + }, + }, + }, + }, + }, + } + + channels = []eventingv1alpha1.Channel{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: cNamespace, + Name: "c1", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Channel", + }, + Spec: eventingv1alpha1.ChannelSpec{ + Provisioner: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: cpName, + }, + }, + Channelable: &duckv1alpha1.Channelable{ + Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "foo", + }, + { + SinkableDomain: "bar", + }, + { + CallableDomain: "baz", + SinkableDomain: "qux", + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: cNamespace, + Name: "c2", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Channel", + }, + Spec: eventingv1alpha1.ChannelSpec{ + Provisioner: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: "some-other-provisioner", + }, + }, + Channelable: &duckv1alpha1.Channelable{ + Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "anything", + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: cNamespace, + Name: "c3", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Channel", + }, + Spec: eventingv1alpha1.ChannelSpec{ + Provisioner: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: cpName, + }, + }, + Channelable: &duckv1alpha1.Channelable{ + Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "steve", + }, + }, + }, + }, + }, + } +) + +func init() { + // Add types to scheme. + eventingv1alpha1.AddToScheme(scheme.Scheme) + corev1.AddToScheme(scheme.Scheme) + istiov1alpha3.AddToScheme(scheme.Scheme) +} + +func TestInjectClient(t *testing.T) { + r := &reconciler{} + orig := r.client + n := fake.NewFakeClient() + if orig == n { + t.Errorf("Original and new clients are identical: %v", orig) + } + err := r.InjectClient(n) + if err != nil { + t.Errorf("Unexpected error injecting the client: %v", err) + } + if n != r.client { + t.Errorf("Unexpected client. Expected: '%v'. Actual: '%v'", n, r.client) + } +} + +func TestReconcile(t *testing.T) { + testCases := []controllertesting.TestCase{ + { + Name: "Channel not found", + }, + { + Name: "Error getting Channel", + Mocks: controllertesting.Mocks{ + MockGets: errorGettingChannel(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Channel not reconciled - nil provisioner", + InitialState: []runtime.Object{ + makeChannelNilProvisioner(), + }, + }, + { + Name: "Channel not reconciled - nil ref", + InitialState: []runtime.Object{ + makeChannelNilRef(), + }, + }, + { + Name: "Channel not reconciled - namespace", + InitialState: []runtime.Object{ + makeChannelWithWrongProvisionerNamespace(), + }, + }, + { + Name: "Channel not reconciled - name", + InitialState: []runtime.Object{ + makeChannelWithWrongProvisionerName(), + }, + }, + { + Name: "Channel deleted - Channel config sync fails", + InitialState: []runtime.Object{ + makeDeletingChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: errorListingChannels(), + }, + WantPresent: []runtime.Object{ + // Finalizer has not been removed. + makeDeletingChannel(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Channel deleted - finalizer removed", + InitialState: []runtime.Object{ + makeDeletingChannel(), + }, + WantPresent: []runtime.Object{ + makeDeletingChannelWithoutFinalizer(), + }, + }, + { + Name: "Channel config sync fails - can't list Channels", + InitialState: []runtime.Object{ + makeChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: errorListingChannels(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Channel config sync fails - can't get ConfigMap", + InitialState: []runtime.Object{ + makeChannel(), + }, + Mocks: controllertesting.Mocks{ + MockGets: errorGettingConfigMap(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Channel config sync fails - can't create ConfigMap", + InitialState: []runtime.Object{ + makeChannel(), + }, + Mocks: controllertesting.Mocks{ + MockCreates: errorCreatingConfigMap(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Channel config sync fails - can't update ConfigMap", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + }, + Mocks: controllertesting.Mocks{ + MockUpdates: errorUpdatingConfigMap(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "K8s service get fails", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + }, + Mocks: controllertesting.Mocks{ + MockGets: errorGettingK8sService(), + }, + WantPresent: []runtime.Object{ + makeChannelWithFinalizerAndSubscribable(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "K8s service creation fails", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + }, + Mocks: controllertesting.Mocks{ + MockCreates: errorCreatingK8sService(), + }, + WantPresent: []runtime.Object{ + // TODO: This should have a useful error message saying that the K8s Service failed. + makeChannelWithFinalizerAndSubscribable(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "K8s service already exists - not owned by Channel", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + makeK8sServiceNotOwnedByChannel(), + }, + WantPresent: []runtime.Object{ + makeReadyChannel(), + }, + }, + { + Name: "Virtual service get fails", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + makeK8sService(), + makeVirtualService(), + }, + Mocks: controllertesting.Mocks{ + MockGets: errorGettingVirtualService(), + }, + WantPresent: []runtime.Object{ + // TODO: This should have a useful error message saying that the VirtualService + // failed. + makeChannelWithFinalizerAndSubscribableAndSinkable(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Virtual service creation fails", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + makeK8sService(), + }, + Mocks: controllertesting.Mocks{ + MockCreates: errorCreatingVirtualService(), + }, + WantPresent: []runtime.Object{ + // TODO: This should have a useful error message saying that the VirtualService + // failed. + makeChannelWithFinalizerAndSubscribableAndSinkable(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "VirtualService already exists - not owned by Channel", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + makeK8sService(), + makeVirtualServiceNowOwnedByChannel(), + }, + WantPresent: []runtime.Object{ + makeReadyChannel(), + }, + }, + { + Name: "Channel get for update fails", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + makeK8sService(), + makeVirtualService(), + }, + Mocks: controllertesting.Mocks{ + MockGets: errorOnSecondChannelGet(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Channel update fails", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + makeK8sService(), + makeVirtualService(), + }, + Mocks: controllertesting.Mocks{ + MockUpdates: errorUpdatingChannel(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Channel reconcile successful - Channel list follows pagination", + InitialState: []runtime.Object{ + makeChannel(), + makeConfigMap(), + }, + Mocks: controllertesting.Mocks{ + MockLists: (&paginatedChannelsListStruct{channels: channels}).MockLists(), + // This is more accurate to be in WantPresent, but we need to check JSON equality, + // not string equality, so it can't be done in WantPresent. Instead, we verify + // during the update call, swapping out the data and WantPresent with that inserted + // data. + MockUpdates: verifyConfigMapData(), + }, + WantPresent: []runtime.Object{ + makeReadyChannel(), + makeK8sService(), + makeVirtualService(), + makeConfigMapWithVerifyConfigMapData(), + }, + }, + } + recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + for _, tc := range testCases { + configMapKey := types.NamespacedName{ + Namespace: cmNamespace, + Name: cmName, + } + c := tc.GetClient() + r := &reconciler{ + client: c, + recorder: recorder, + logger: zap.NewNop(), + configMapKey: configMapKey, + } + if tc.ReconcileKey == "" { + tc.ReconcileKey = fmt.Sprintf("/%s", cName) + } + tc.IgnoreTimes = true + t.Run(tc.Name, tc.Runner(t, r, c)) + } +} + +func makeChannel() *eventingv1alpha1.Channel { + c := &eventingv1alpha1.Channel{ + TypeMeta: metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: cNamespace, + Name: cName, + UID: cUID, + }, + Spec: eventingv1alpha1.ChannelSpec{ + Provisioner: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: cpName, + }, + }, + }, + } + c.Status.InitializeConditions() + return c +} + +func makeChannelWithFinalizerAndSubscribable() *eventingv1alpha1.Channel { + c := makeChannelWithFinalizer() + c.Status.SetSubscribable(c.Namespace, c.Name) + return c +} + +func makeChannelWithFinalizerAndSubscribableAndSinkable() *eventingv1alpha1.Channel { + c := makeChannelWithFinalizerAndSubscribable() + c.Status.SetSinkable(fmt.Sprintf("%s-channel.%s.svc.cluster.local", c.Name, c.Namespace)) + return c +} + +func makeReadyChannel() *eventingv1alpha1.Channel { + // Ready channels have the finalizer and are Subscribable and Sinkable. + c := makeChannelWithFinalizerAndSubscribableAndSinkable() + c.Status.MarkProvisioned() + return c +} + +func makeChannelNilProvisioner() *eventingv1alpha1.Channel { + c := makeChannel() + c.Spec.Provisioner = nil + return c +} + +func makeChannelNilRef() *eventingv1alpha1.Channel { + c := makeChannel() + c.Spec.Provisioner.Ref = nil + return c +} + +func makeChannelWithWrongProvisionerNamespace() *eventingv1alpha1.Channel { + c := makeChannel() + c.Spec.Provisioner.Ref.Namespace = "wrong-namespace" + return c +} + +func makeChannelWithWrongProvisionerName() *eventingv1alpha1.Channel { + c := makeChannel() + c.Spec.Provisioner.Ref.Name = "wrong-name" + return c +} + +func makeChannelWithFinalizer() *eventingv1alpha1.Channel { + c := makeChannel() + c.Finalizers = []string{finalizerName} + return c +} + +func makeDeletingChannel() *eventingv1alpha1.Channel { + c := makeChannelWithFinalizer() + c.DeletionTimestamp = &deletionTime + return c +} + +func makeDeletingChannelWithoutFinalizer() *eventingv1alpha1.Channel { + c := makeDeletingChannel() + c.Finalizers = nil + return c +} + +func makeConfigMap() *corev1.ConfigMap { + return &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: cmNamespace, + Name: cmName, + }, + } +} + +func makeConfigMapWithVerifyConfigMapData() *corev1.ConfigMap { + cm := makeConfigMap() + cm.Data = map[string]string{} + cm.Data[configmap.MultiChannelFanoutConfigKey] = insertedByVerifyConfigMapData + return cm +} + +func makeK8sService() *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-channel", cName), + Namespace: cNamespace, + Labels: map[string]string{ + "channel": cName, + "provisioner": cpName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: cName, + UID: cUID, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: portName, + Port: portNumber, + }, + }, + }, + } +} + +func makeK8sServiceNotOwnedByChannel() *corev1.Service { + svc := makeK8sService() + svc.OwnerReferences = nil + return svc +} + +func makeVirtualService() *istiov1alpha3.VirtualService { + return &istiov1alpha3.VirtualService{ + TypeMeta: metav1.TypeMeta{ + APIVersion: istiov1alpha3.SchemeGroupVersion.String(), + Kind: "VirtualService", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-channel", cName), + Namespace: cNamespace, + Labels: map[string]string{ + "channel": cName, + "provisioner": cpName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: cName, + UID: cUID, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + }, + Spec: istiov1alpha3.VirtualServiceSpec{ + Hosts: []string{ + fmt.Sprintf("%s-channel.%s.svc.cluster.local", cName, cNamespace), + fmt.Sprintf("%s.%s.channels.cluster.local", cName, cNamespace), + }, + Http: []istiov1alpha3.HTTPRoute{{ + Rewrite: &istiov1alpha3.HTTPRewrite{ + Authority: fmt.Sprintf("%s.%s.channels.cluster.local", cName, cNamespace), + }, + Route: []istiov1alpha3.DestinationWeight{{ + Destination: istiov1alpha3.Destination{ + Host: "in-memory-channel-clusterbus.knative-eventing.svc.cluster.local", + Port: istiov1alpha3.PortSelector{ + Number: portNumber, + }, + }}, + }}, + }, + }, + } +} + +func makeVirtualServiceNowOwnedByChannel() *istiov1alpha3.VirtualService { + vs := makeVirtualService() + vs.OwnerReferences = nil + return vs +} + +func errorOnSecondChannelGet() []controllertesting.MockGet { + passThrough := []controllertesting.MockGet{ + func(innerClient client.Client, ctx context.Context, key client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, innerClient.Get(ctx, key, obj) + }, + } + return append(passThrough, errorGettingChannel()...) +} + +func errorGettingChannel() []controllertesting.MockGet { + return []controllertesting.MockGet{ + func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*eventingv1alpha1.Channel); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorGettingConfigMap() []controllertesting.MockGet { + return []controllertesting.MockGet{ + func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*corev1.ConfigMap); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorGettingK8sService() []controllertesting.MockGet { + return []controllertesting.MockGet{ + func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*corev1.Service); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorGettingVirtualService() []controllertesting.MockGet { + return []controllertesting.MockGet{ + func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*istiov1alpha3.VirtualService); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorListingChannels() []controllertesting.MockList { + return []controllertesting.MockList{ + func(client.Client, context.Context, *client.ListOptions, runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New(testErrorMessage) + }, + } +} + +func errorCreatingConfigMap() []controllertesting.MockCreate { + return []controllertesting.MockCreate{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*corev1.ConfigMap); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorCreatingK8sService() []controllertesting.MockCreate { + return []controllertesting.MockCreate{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*corev1.Service); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorCreatingVirtualService() []controllertesting.MockCreate { + return []controllertesting.MockCreate{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*istiov1alpha3.VirtualService); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorUpdatingChannel() []controllertesting.MockUpdate { + return []controllertesting.MockUpdate{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*eventingv1alpha1.Channel); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorUpdatingConfigMap() []controllertesting.MockUpdate { + return []controllertesting.MockUpdate{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*corev1.ConfigMap); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +type paginatedChannelsListStruct struct { + channels []eventingv1alpha1.Channel +} + +func (p *paginatedChannelsListStruct) MockLists() []controllertesting.MockList { + return []controllertesting.MockList{ + func(_ client.Client, _ context.Context, _ *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { + if l, ok := list.(*eventingv1alpha1.ChannelList); ok { + + if len(p.channels) > 0 { + c := p.channels[0] + p.channels = p.channels[1:] + l.Continue = "yes" + l.Items = []eventingv1alpha1.Channel{ + c, + } + } + return controllertesting.Handled, nil + } + return controllertesting.Unhandled, nil + }, + } +} + +func verifyConfigMapData() []controllertesting.MockUpdate { + return []controllertesting.MockUpdate{ + func(innerClient client.Client, ctx context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if cm, ok := obj.(*corev1.ConfigMap); ok { + s := cm.Data[configmap.MultiChannelFanoutConfigKey] + c := multichannelfanout.Config{} + err := json.Unmarshal([]byte(s), &c) + if err != nil { + return controllertesting.Handled, + fmt.Errorf("test is unable to unmarshal ConfigMap data: %v", err) + } + if diff := cmp.Diff(c, channelsConfig); diff != "" { + return controllertesting.Handled, + fmt.Errorf("test got unwanted ChannelsConfig (-want +got) %s", diff) + } + // Verified it is correct, now so that we can verify this actually occurred, swap + // out the data with a known value for later comparison. + cm.Data[configmap.MultiChannelFanoutConfigKey] = insertedByVerifyConfigMapData + return controllertesting.Handled, innerClient.Update(ctx, obj) + } + return controllertesting.Unhandled, nil + }, + } +} diff --git a/pkg/controller/eventing/inmemory/clusterprovisioner/controller.go b/pkg/controller/eventing/inmemory/clusterprovisioner/controller.go new file mode 100644 index 00000000000..3296a144733 --- /dev/null +++ b/pkg/controller/eventing/inmemory/clusterprovisioner/controller.go @@ -0,0 +1,71 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterprovisioner + +import ( + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "in-memory-channel-controller" +) + +// ProvideController returns a flow controller. +func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Controller, error) { + logger = logger.With(zap.String("controller", controllerAgentName)) + + // Setup a new controller to Reconcile ClusterProvisioners that are in-memory channels. + r := &reconciler{ + recorder: mgr.GetRecorder(controllerAgentName), + logger: logger, + } + c, err := controller.New(controllerAgentName, mgr, controller.Options{ + Reconciler: r, + }) + if err != nil { + logger.Error("Unable to create controller.", zap.Error(err)) + return nil, err + } + + // Watch ClusterProvisioners. + err = c.Watch(&source.Kind{ + Type: &eventingv1alpha1.ClusterProvisioner{}, + }, &handler.EnqueueRequestForObject{}) + if err != nil { + logger.Error("Unable to watch ClusterProvisioners.", zap.Error(err), zap.Any("type", &eventingv1alpha1.ClusterProvisioner{})) + return nil, err + } + + // Watch the K8s Services that are owned by ClusterProvisioners. + err = c.Watch(&source.Kind{ + Type: &corev1.Service{}, + }, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.ClusterProvisioner{}, IsController: true}) + if err != nil { + logger.Error("Unable to watch K8s Services.", zap.Error(err)) + return nil, err + } + + return c, nil +} diff --git a/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go new file mode 100644 index 00000000000..a11cb53ca2f --- /dev/null +++ b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go @@ -0,0 +1,220 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterprovisioner + +import ( + "context" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + "github.com/knative/eventing/pkg/system" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + // Name is the name of the in-memory channel ClusterProvisioner. + Name = "in-memory-channel" + + // Channel is the name of the Channel resource in eventing.knative.dev/v1alpha1. + Channel = "Channel" +) + +type reconciler struct { + client client.Client + recorder record.EventRecorder + logger *zap.Logger +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + return nil +} + +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + //TODO use this to store the logger and set a deadline + ctx := context.TODO() + logger := r.logger.With(zap.Any("request", request)) + + cp := &eventingv1alpha1.ClusterProvisioner{} + err := r.client.Get(ctx, request.NamespacedName, cp) + + // The ClusterProvisioner may have been deleted since it was added to the workqueue. If so, + // there is nothing to be done. + if errors.IsNotFound(err) { + logger.Info("Could not find ClusterProvisioner", zap.Error(err)) + return reconcile.Result{}, nil + } + + // Any other error should be retried in another reconciliation. + if err != nil { + logger.Error("Unable to Get ClusterProvisioner", zap.Error(err)) + return reconcile.Result{}, err + } + + // Does this Controller control this ClusterProvisioner? + if !shouldReconcile(cp.Namespace, cp.Name, cp.Spec.Reconciles.Kind) { + logger.Info("Not reconciling ClusterProvisioner, it is not controlled by this Controller", zap.String("APIVersion", cp.APIVersion), zap.String("Kind", cp.Kind), zap.String("Namespace", cp.Namespace), zap.String("name", cp.Name)) + return reconcile.Result{}, nil + } + logger.Info("Reconciling ClusterProvisioner.") + + // Modify a copy of this object, rather than the original. + cp = cp.DeepCopy() + + err = r.reconcile(ctx, cp) + if err != nil { + logger.Info("Error reconciling ClusterProvisioner", zap.Error(err)) + // Note that we do not return the error here, because we want to update the Status + // regardless of the error. + } + + if updateStatusErr := r.updateClusterProvisionerStatus(ctx, cp); updateStatusErr != nil { + logger.Info("Error updating ClusterProvisioner Status", zap.Error(updateStatusErr)) + return reconcile.Result{}, updateStatusErr + } + + return reconcile.Result{}, err +} + +// IsControlled determines if the in-memory Channel Controller should control (and therefore +// reconcile) a given object, based on that object's ClusterProvisioner reference. kind is the kind +// of that object. +func IsControlled(ref *eventingv1alpha1.ProvisionerReference, kind string) bool { + if ref != nil && ref.Ref != nil { + return shouldReconcile(ref.Ref.Namespace, ref.Ref.Name, kind) + } + return false +} + +// shouldReconcile determines if this Controller should control (and therefore reconcile) a given +// ClusterProvisioner. This Controller only handles in-memory channels. +func shouldReconcile(namespace, name, kind string) bool { + return namespace == "" && name == Name && kind == Channel +} + +func (r *reconciler) reconcile(ctx context.Context, cp *eventingv1alpha1.ClusterProvisioner) error { + logger := r.logger.With(zap.Any("clusterProvisioner", cp)) + + // We are syncing one thing. + // 1. The K8s Service to talk to all in-memory Channels. + // - There is a single K8s Service for all requests going any in-memory Channel. + + if cp.DeletionTimestamp != nil { + // K8s garbage collection will delete the dispatcher service, once this ClusterProvisioner + // is deleted, so we don't need to do anything. + return nil + } + + if err := r.createDispatcherService(ctx, cp); err != nil { + logger.Info("Error creating the ClusterProvisioner's K8s Service", zap.Error(err)) + return err + } + + cp.Status.MarkReady() + return nil +} + +func (r *reconciler) createDispatcherService(ctx context.Context, cp *eventingv1alpha1.ClusterProvisioner) error { + svcName := controller.ClusterBusDispatcherServiceName(cp.Name) + svcKey := types.NamespacedName{ + Namespace: system.Namespace, + Name: svcName, + } + svc := &corev1.Service{} + err := r.client.Get(ctx, svcKey, svc) + + if errors.IsNotFound(err) { + svc = newDispatcherService(cp) + err = r.client.Create(ctx, svc) + } + + // If an error occurred in either Get or Create, we need to reconcile again. + if err != nil { + return err + } + + // Check if this ClusterProvisioner is the owner of the K8s service. + if !metav1.IsControlledBy(svc, cp) { + r.logger.Warn("ClusterProvisioner's K8s Service is not owned by the ClusterProvisioner", zap.Any("clusterProvisioner", cp), zap.Any("service", svc)) + } + return nil +} + +func (r *reconciler) updateClusterProvisionerStatus(ctx context.Context, u *eventingv1alpha1.ClusterProvisioner) error { + o := &eventingv1alpha1.ClusterProvisioner{} + if err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { + r.logger.Info("Error getting ClusterProvisioner for status update", zap.Error(err), zap.Any("updatedClusterProvisioner", u)) + return err + } + + if !equality.Semantic.DeepEqual(o.Status, u.Status) { + o.Status = u.Status + return r.client.Update(ctx, o) + } + return nil +} + +// newDispatcherService creates a new Service for a ClusterBus resource. It also sets +// the appropriate OwnerReferences on the resource so handleObject can discover +// the ClusterBus resource that 'owns' it. +func newDispatcherService(cp *eventingv1alpha1.ClusterProvisioner) *corev1.Service { + labels := dispatcherLabels(cp.Name) + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ClusterBusDispatcherServiceName(cp.Name), + Namespace: system.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(cp, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "ClusterProvisioner", + }), + }, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + } +} + +func dispatcherLabels(cpName string) map[string]string { + return map[string]string{ + "clusterProvisioner": cpName, + "role": "dispatcher", + } +} diff --git a/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile_test.go b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile_test.go new file mode 100644 index 00000000000..f0bd62dbdce --- /dev/null +++ b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile_test.go @@ -0,0 +1,382 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterprovisioner + +import ( + "context" + "errors" + "fmt" + "testing" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/system" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + controllertesting "github.com/knative/eventing/pkg/controller/testing" +) + +const ( + cpUid = "test-uid" + testErrorMessage = "test-induced-error" +) + +var ( + // deletionTime is used when objects are marked as deleted. Rfc3339Copy() + // truncates to seconds to match the loss of precision during serialization. + deletionTime = metav1.Now().Rfc3339Copy() + + truePointer = true +) + +func init() { + // Add types to scheme + eventingv1alpha1.AddToScheme(scheme.Scheme) + corev1.AddToScheme(scheme.Scheme) +} + +func TestInjectClient(t *testing.T) { + r := &reconciler{} + orig := r.client + n := fake.NewFakeClient() + if orig == n { + t.Errorf("Original and new clients are identical: %v", orig) + } + err := r.InjectClient(n) + if err != nil { + t.Errorf("Unexpected error injecting the client: %v", err) + } + if n != r.client { + t.Errorf("Unexpected client. Expected: '%v'. Actual: '%v'", n, r.client) + } +} + +func TestIsControlled(t *testing.T) { + testCases := map[string]struct { + ref *eventingv1alpha1.ProvisionerReference + kind string + isControlled bool + }{ + "nil": { + ref: nil, + kind: "Channel", + isControlled: false, + }, + "ref nil": { + ref: &eventingv1alpha1.ProvisionerReference{ + Ref: nil, + }, + kind: "Channel", + isControlled: false, + }, + "wrong namespace": { + ref: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Namespace: "other", + Name: Name, + }, + }, + kind: "Channel", + isControlled: false, + }, + "wrong name": { + ref: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: "other-name", + }, + }, + kind: "Channel", + isControlled: false, + }, + "wrong kind": { + ref: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: Name, + }, + }, + kind: "Source", + isControlled: false, + }, + "is controlled": { + ref: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: Name, + }, + }, + kind: "Channel", + isControlled: true, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + isControlled := IsControlled(tc.ref, tc.kind) + if isControlled != tc.isControlled { + t.Errorf("Expected: %v. Actual: %v", tc.isControlled, isControlled) + } + }) + } +} + +func TestReconcile(t *testing.T) { + testCases := []controllertesting.TestCase{ + { + Name: "CP not found", + }, + { + Name: "Unable to get CP", + Mocks: controllertesting.Mocks{ + MockGets: []controllertesting.MockGet{ + errorGettingClusterProvisioner(), + }, + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Should not reconcile - namespace", + InitialState: []runtime.Object{ + &eventingv1alpha1.ClusterProvisioner{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "not empty string", + Name: Name, + }, + }, + }, + }, + { + Name: "Should not reconcile - name", + InitialState: []runtime.Object{ + &eventingv1alpha1.ClusterProvisioner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-name", + }, + }, + }, + ReconcileKey: "/wrong-name", + }, + { + Name: "Delete succeeds", + // Deleting does nothing. + InitialState: []runtime.Object{ + makeDeletingClusterProvisioner(), + }, + }, + { + Name: "Create dispatcher fails", + InitialState: []runtime.Object{ + makeClusterProvisioner(), + }, + Mocks: controllertesting.Mocks{ + MockGets: []controllertesting.MockGet{ + errorGettingK8sService(), + }, + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Create dispatcher - already exists", + InitialState: []runtime.Object{ + makeClusterProvisioner(), + makeK8sService(), + }, + WantPresent: []runtime.Object{ + makeReadyClusterProvisioner(), + }, + }, + { + Name: "Create dispatcher - not owned by CP", + InitialState: []runtime.Object{ + makeClusterProvisioner(), + makeK8sServiceNotOwnedByClusterProvisioner(), + }, + WantPresent: []runtime.Object{ + makeReadyClusterProvisioner(), + }, + }, + { + Name: "Create dispatcher succeeds", + InitialState: []runtime.Object{ + makeClusterProvisioner(), + }, + WantPresent: []runtime.Object{ + makeReadyClusterProvisioner(), + makeK8sService(), + }, + }, + { + Name: "Error getting CP for updating Status", + // Nothing to create or update other than the status of CP itself. + InitialState: []runtime.Object{ + makeClusterProvisioner(), + makeK8sService(), + }, + Mocks: controllertesting.Mocks{ + MockGets: oneSuccessfulClusterProvisionerGet(), + }, + WantErrMsg: testErrorMessage, + }, + { + Name: "Error updating Status", + // Nothing to create or update other than the status of CP itself. + InitialState: []runtime.Object{ + makeClusterProvisioner(), + makeK8sService(), + }, + Mocks: controllertesting.Mocks{ + MockUpdates: []controllertesting.MockUpdate{ + errorUpdating(), + }, + }, + WantErrMsg: testErrorMessage, + }, + } + recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + for _, tc := range testCases { + c := tc.GetClient() + r := &reconciler{ + client: c, + recorder: recorder, + logger: zap.NewNop(), + } + if tc.ReconcileKey == "" { + tc.ReconcileKey = fmt.Sprintf("/%s", Name) + } + tc.IgnoreTimes = true + t.Run(tc.Name, tc.Runner(t, r, c)) + } +} + +func makeClusterProvisioner() *eventingv1alpha1.ClusterProvisioner { + return &eventingv1alpha1.ClusterProvisioner{ + TypeMeta: metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "ClusterProvisioner", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: Name, + UID: cpUid, + }, + Spec: eventingv1alpha1.ClusterProvisionerSpec{ + Reconciles: metav1.GroupKind{ + Group: "eventing.knative.dev/v1alpha1", + Kind: "Channel", + }, + }, + } +} + +func makeReadyClusterProvisioner() *eventingv1alpha1.ClusterProvisioner { + cp := makeClusterProvisioner() + cp.Status.Conditions = []duckv1alpha1.Condition{ + { + Type: duckv1alpha1.ConditionReady, + Status: corev1.ConditionTrue, + }, + } + return cp +} + +func makeDeletingClusterProvisioner() *eventingv1alpha1.ClusterProvisioner { + cp := makeClusterProvisioner() + cp.DeletionTimestamp = &deletionTime + return cp +} + +func makeK8sService() *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace, + Name: fmt.Sprintf("%s-clusterbus", Name), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "ClusterProvisioner", + Name: Name, + UID: cpUid, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + Labels: dispatcherLabels(Name), + }, + Spec: corev1.ServiceSpec{ + Selector: dispatcherLabels(Name), + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + } +} + +func makeK8sServiceNotOwnedByClusterProvisioner() *corev1.Service { + svc := makeK8sService() + svc.OwnerReferences = nil + return svc +} + +func errorGettingClusterProvisioner() controllertesting.MockGet { + return func(client.Client, context.Context, client.ObjectKey, runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New(testErrorMessage) + } +} + +func errorGettingK8sService() controllertesting.MockGet { + return func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*corev1.Service); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + } +} + +func oneSuccessfulClusterProvisionerGet() []controllertesting.MockGet { + return []controllertesting.MockGet{ + // The first one is a pass through. + func(innerClient client.Client, ctx context.Context, key client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + err := innerClient.Get(ctx, key, obj) + return controllertesting.Handled, err + }, + // All subsequent ClusterProvisioner Gets fail. + func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*eventingv1alpha1.ClusterProvisioner); ok { + return controllertesting.Handled, errors.New(testErrorMessage) + } + return controllertesting.Unhandled, nil + }, + } +} + +func errorUpdating() controllertesting.MockUpdate { + return func(client.Client, context.Context, runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New(testErrorMessage) + } +} diff --git a/pkg/controller/eventing/inmemory/controller/main.go b/pkg/controller/eventing/inmemory/controller/main.go new file mode 100644 index 00000000000..c5485a9e719 --- /dev/null +++ b/pkg/controller/eventing/inmemory/controller/main.go @@ -0,0 +1,70 @@ +/* + * Copyright 2018 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "flag" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/controller/eventing/inmemory/channel" + "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterprovisioner" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "github.com/knative/pkg/signals" + "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func main() { + logConfig := buses.NewLoggingConfig() + logger := buses.NewBusLoggerFromConfig(logConfig) + defer logger.Sync() + logger = logger.With( + zap.String("eventing.knative.dev/clusterProvisioner", clusterprovisioner.Name), + zap.String("eventing.knative.dev/clusterProvisionerComponent", "Controller"), + ) + flag.Parse() + + mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) + if err != nil { + logger.Fatal("Error starting up.", zap.Error(err)) + } + + // Add custom types to this array to get them into the manager's scheme. + eventingv1alpha1.AddToScheme(mgr.GetScheme()) + istiov1alpha3.AddToScheme(mgr.GetScheme()) + + // The controllers for both the ClusterProvisioner and the Channels created by that + // ClusterProvisioner run in this process. + _, err = clusterprovisioner.ProvideController(mgr, logger.Desugar()) + if err != nil { + logger.Fatal("Unable to create Provisioner controller", zap.Error(err)) + } + _, err = channel.ProvideController(mgr, logger.Desugar()) + if err != nil { + logger.Fatal("Unable to create Channel controller", zap.Error(err)) + } + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + // Start blocks forever. + err = mgr.Start(stopCh) + if err != nil { + logger.Fatal("Manager.Start() returned an error", zap.Error(err)) + } +} diff --git a/pkg/sidecar/configmap/parse.go b/pkg/sidecar/configmap/parse.go index 8661dd2bf8e..a6f479fe6a5 100644 --- a/pkg/sidecar/configmap/parse.go +++ b/pkg/sidecar/configmap/parse.go @@ -17,6 +17,7 @@ limitations under the License. package configmap import ( + "encoding/json" "fmt" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" "go.uber.org/zap" @@ -29,6 +30,7 @@ const ( ) // ConfigMapData attempts to parse the config map's data into a multichannelfanout.Config. +// orig == NewFanoutConfig(SerializeConfig(orig)) func NewFanoutConfig(logger *zap.Logger, data map[string]string) (*multichannelfanout.Config, error) { str, present := data[MultiChannelFanoutConfigKey] if !present { @@ -37,3 +39,15 @@ func NewFanoutConfig(logger *zap.Logger, data map[string]string) (*multichannelf } return multichannelfanout.Parse(logger, str) } + +// SerializeConfig takes in a multichannelfanout.Config and generates the ConfigMap equivalent. +// orig == NewFanoutConfig(SerializeConfig(orig)) +func SerializeConfig(config multichannelfanout.Config) (map[string]string, error) { + jb, err := json.Marshal(config) + if err != nil { + return nil, err + } + return map[string]string{ + MultiChannelFanoutConfigKey: string(jb), + }, nil +} diff --git a/pkg/sidecar/configmap/parse_test.go b/pkg/sidecar/configmap/parse_test.go index a621ad510a8..5cddb280b89 100644 --- a/pkg/sidecar/configmap/parse_test.go +++ b/pkg/sidecar/configmap/parse_test.go @@ -38,7 +38,7 @@ func TestNewFanoutConfig(t *testing.T) { expectedErr: true, }, { - name: "invalid YAML", + name: "invalid YAML", config: ` key: - value @@ -57,7 +57,7 @@ func TestNewFanoutConfig(t *testing.T) { expectedErr: true, }, { - name: "valid", + name: "valid", config: ` channelConfigs: - namespace: default @@ -142,6 +142,64 @@ func TestNewFanoutConfig(t *testing.T) { } } +func TestSerializeConfig(t *testing.T) { + testCases := map[string]struct { + config *multichannelfanout.Config + }{ + "empty config": { + config: &multichannelfanout.Config{}, + }, + "full config": { + config: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "foo.example.com", + SinkableDomain: "bar.example.com", + }, + { + SinkableDomain: "qux.example.com", + }, + { + CallableDomain: "baz.example.com", + }, + {}, + }, + }, + }, + { + Namespace: "other", + Name: "no-subs", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{}, + }, + }, + }, + }, + }, + } + + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + s, err := SerializeConfig(*tc.config) + if err != nil { + t.Errorf("Unexpected error serializing config: %v", err) + } + rt, err := NewFanoutConfig(zap.NewNop(), s) + if err != nil { + t.Errorf("Unexpected error deserializing: %v", err) + } + if diff := cmp.Diff(tc.config, rt); diff != "" { + t.Errorf("Unexpected error roundtripping the config (-want, +got): %v", diff) + } + }) + } +} + func formatData(config string) map[string]string { data := make(map[string]string) if config != "" {