diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 40c6898a912..8e454daf84e 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -24,6 +24,7 @@ import ( "github.com/knative/eventing/pkg/reconciler/broker" "github.com/knative/eventing/pkg/reconciler/channel" + "github.com/knative/eventing/pkg/reconciler/choice" "github.com/knative/eventing/pkg/reconciler/eventingchannel" "github.com/knative/eventing/pkg/reconciler/eventtype" "github.com/knative/eventing/pkg/reconciler/namespace" @@ -42,5 +43,6 @@ func main() { broker.NewController, eventtype.NewController, sequence.NewController, + choice.NewController, ) } diff --git a/config/200-controller-clusterrole.yaml b/config/200-controller-clusterrole.yaml index d08a231b870..f9bc305d71a 100644 --- a/config/200-controller-clusterrole.yaml +++ b/config/200-controller-clusterrole.yaml @@ -86,6 +86,8 @@ rules: - "sequences/status" - "channels" - "channels/status" + - "choices" + - "choices/status" verbs: *everything # Messaging resources and finalizers we care about. @@ -93,6 +95,7 @@ rules: - "messaging.knative.dev" resources: - "sequences/finalizers" + - "choices/finalizers" verbs: - "update" diff --git a/config/300-choice.yaml b/config/300-choice.yaml new file mode 100644 index 00000000000..bc06825f0b1 --- /dev/null +++ b/config/300-choice.yaml @@ -0,0 +1,133 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: choices.messaging.knative.dev + labels: + eventing.knative.dev/release: devel + knative.dev/crd-install: "true" +spec: + group: messaging.knative.dev + version: v1alpha1 + names: + kind: Choice + plural: choices + singular: choice + categories: + - all + - knative + - eventing + - messaging + scope: Namespaced + subresources: + status: {} + additionalPrinterColumns: + - name: Ready + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason" + - name: URL + type: string + JSONPath: .status.address.url + - name: Age + type: date + JSONPath: .metadata.creationTimestamp + validation: + openAPIV3Schema: + properties: + spec: + required: + - cases + - channelTemplate + properties: + cases: + type: array + items: + type: object + required: + - subscriber + properties: + filter: + type: object + properties: + dnsName: + type: string + minLength: 1 + uri: + type: string + minLength: 1 + ref: + type: object + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + namespace: + type: string + maxLength: 0 + subscriber: + type: object + properties: + dnsName: + type: string + minLength: 1 + uri: + type: string + minLength: 1 + ref: + type: object + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + namespace: + type: string + maxLength: 0 + channelTemplate: + type: object + required: + - apiVersion + - kind + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + spec: + type: object diff --git a/pkg/apis/messaging/v1alpha1/choice_defaults.go b/pkg/apis/messaging/v1alpha1/choice_defaults.go new file mode 100644 index 00000000000..e7c9ada26fd --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/choice_defaults.go @@ -0,0 +1,37 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + + eventingduckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" +) + +func (s *Choice) SetDefaults(ctx context.Context) { + if s != nil && s.Spec.ChannelTemplate == nil { + // The singleton may not have been set, if so ignore it and validation will reject the + // Channel. + if cd := eventingduckv1alpha1.ChannelDefaulterSingleton; cd != nil { + channelTemplate := cd.GetDefault(s.Namespace) + s.Spec.ChannelTemplate = channelTemplate + } + } + s.Spec.SetDefaults(ctx) +} + +func (ss *ChoiceSpec) SetDefaults(ctx context.Context) {} diff --git a/pkg/apis/messaging/v1alpha1/choice_defaults_test.go b/pkg/apis/messaging/v1alpha1/choice_defaults_test.go new file mode 100644 index 00000000000..cca02d83bdd --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/choice_defaults_test.go @@ -0,0 +1,97 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/google/go-cmp/cmp" + eventingduckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" +) + +func TestChoiceSetDefaults(t *testing.T) { + testCases := map[string]struct { + nilChannelDefaulter bool + channelTemplate *eventingduckv1alpha1.ChannelTemplateSpec + initial Choice + expected Choice + }{ + "nil ChannelDefaulter": { + nilChannelDefaulter: true, + expected: Choice{}, + }, + "unset ChannelDefaulter": { + expected: Choice{}, + }, + "set ChannelDefaulter": { + channelTemplate: defaultChannelTemplate, + expected: Choice{ + Spec: ChoiceSpec{ + ChannelTemplate: defaultChannelTemplate, + }, + }, + }, + "template already specified": { + channelTemplate: defaultChannelTemplate, + initial: Choice{ + Spec: ChoiceSpec{ + ChannelTemplate: &eventingduckv1alpha1.ChannelTemplateSpec{ + TypeMeta: v1.TypeMeta{ + APIVersion: SchemeGroupVersion.String(), + Kind: "OtherChannel", + }, + }, + }, + }, + expected: Choice{ + Spec: ChoiceSpec{ + ChannelTemplate: &eventingduckv1alpha1.ChannelTemplateSpec{ + TypeMeta: v1.TypeMeta{ + APIVersion: SchemeGroupVersion.String(), + Kind: "OtherChannel", + }, + }, + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + if !tc.nilChannelDefaulter { + eventingduckv1alpha1.ChannelDefaulterSingleton = &choiceChannelDefaulter{ + channelTemplate: tc.channelTemplate, + } + defer func() { eventingduckv1alpha1.ChannelDefaulterSingleton = nil }() + } + tc.initial.SetDefaults(context.TODO()) + if diff := cmp.Diff(tc.expected, tc.initial); diff != "" { + t.Fatalf("Unexpected defaults (-want, +got): %s", diff) + } + }) + } +} + +type choiceChannelDefaulter struct { + channelTemplate *eventingduckv1alpha1.ChannelTemplateSpec +} + +func (cd *choiceChannelDefaulter) GetDefault(_ string) *eventingduckv1alpha1.ChannelTemplateSpec { + return cd.channelTemplate +} diff --git a/pkg/apis/messaging/v1alpha1/choice_lifecycle.go b/pkg/apis/messaging/v1alpha1/choice_lifecycle.go new file mode 100644 index 00000000000..7cd6f98fd92 --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/choice_lifecycle.go @@ -0,0 +1,200 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" + pkgduckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" +) + +var pChoiceCondSet = apis.NewLivingConditionSet(ChoiceConditionReady, ChoiceConditionChannelsReady, ChoiceConditionSubscriptionsReady, ChoiceConditionAddressable) + +const ( + // ChoiceConditionReady has status True when all subconditions below have been set to True. + ChoiceConditionReady = apis.ConditionReady + + // ChoiceConditionChannelsReady has status True when all the channels created as part of + // this choice are ready. + ChoiceConditionChannelsReady apis.ConditionType = "ChannelsReady" + + // ChoiceConditionSubscriptionsReady has status True when all the subscriptions created as part of + // this choice are ready. + ChoiceConditionSubscriptionsReady apis.ConditionType = "SubscriptionsReady" + + // ChoiceConditionAddressable has status true when this Choice meets + // the Addressable contract and has a non-empty hostname. + ChoiceConditionAddressable apis.ConditionType = "Addressable" +) + +// GetCondition returns the condition currently associated with the given type, or nil. +func (ps *ChoiceStatus) GetCondition(t apis.ConditionType) *apis.Condition { + return pChoiceCondSet.Manage(ps).GetCondition(t) +} + +// IsReady returns true if the resource is ready overall. +func (ps *ChoiceStatus) IsReady() bool { + return pChoiceCondSet.Manage(ps).IsHappy() +} + +// InitializeConditions sets relevant unset conditions to Unknown state. +func (ps *ChoiceStatus) InitializeConditions() { + pChoiceCondSet.Manage(ps).InitializeConditions() +} + +// PropagateSubscriptionStatuses sets the ChoiceConditionSubscriptionsReady based on +// the status of the incoming subscriptions. +func (ps *ChoiceStatus) PropagateSubscriptionStatuses(filterSubscriptions []*eventingv1alpha1.Subscription, subscriptions []*eventingv1alpha1.Subscription) { + if ps.CaseStatuses == nil { + ps.CaseStatuses = make([]ChoiceCaseStatus, len(subscriptions)) + } + allReady := true + // If there are no subscriptions, treat that as a False case. Could go either way, but this seems right. + if len(subscriptions) == 0 { + allReady = false + } + + for i, s := range subscriptions { + ps.CaseStatuses[i].SubscriptionStatus = ChoiceSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: s.APIVersion, + Kind: s.Kind, + Name: s.Name, + Namespace: s.Namespace, + }, + } + + readyCondition := s.Status.GetCondition(eventingv1alpha1.SubscriptionConditionReady) + if readyCondition != nil { + ps.CaseStatuses[i].SubscriptionStatus.ReadyCondition = *readyCondition + if readyCondition.Status != corev1.ConditionTrue { + allReady = false + } + } else { + allReady = false + } + + fs := filterSubscriptions[i] + ps.CaseStatuses[i].FilterSubscriptionStatus = ChoiceSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: fs.APIVersion, + Kind: fs.Kind, + Name: fs.Name, + Namespace: fs.Namespace, + }, + } + readyCondition = fs.Status.GetCondition(eventingv1alpha1.SubscriptionConditionReady) + if readyCondition != nil { + ps.CaseStatuses[i].FilterSubscriptionStatus.ReadyCondition = *readyCondition + if readyCondition.Status != corev1.ConditionTrue { + allReady = false + } + } else { + allReady = false + } + + } + if allReady { + pChoiceCondSet.Manage(ps).MarkTrue(ChoiceConditionSubscriptionsReady) + } else { + ps.MarkSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none") + } +} + +// PropagateChannelStatuses sets the ChannelStatuses and ChoiceConditionChannelsReady based on the +// status of the incoming channels. +func (ps *ChoiceStatus) PropagateChannelStatuses(ingressChannel *duckv1alpha1.Channelable, channels []*duckv1alpha1.Channelable) { + if ps.CaseStatuses == nil { + ps.CaseStatuses = make([]ChoiceCaseStatus, len(channels)) + } + allReady := true + + ps.IngressChannelStatus.Channel = corev1.ObjectReference{ + APIVersion: ingressChannel.APIVersion, + Kind: ingressChannel.Kind, + Name: ingressChannel.Name, + Namespace: ingressChannel.Namespace, + } + + address := ingressChannel.Status.AddressStatus.Address + if address != nil { + ps.IngressChannelStatus.ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionTrue} + } else { + ps.IngressChannelStatus.ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionFalse, Reason: "NotAddressable", Message: "Channel is not addressable"} + allReady = false + } + + for i, c := range channels { + ps.CaseStatuses[i].FilterChannelStatus = ChoiceChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: c.APIVersion, + Kind: c.Kind, + Name: c.Name, + Namespace: c.Namespace, + }, + } + // TODO: Once the addressable has a real status to dig through, use that here instead of + // addressable, because it might be addressable but not ready. + address := c.Status.AddressStatus.Address + if address != nil { + ps.CaseStatuses[i].FilterChannelStatus.ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionTrue} + } else { + ps.CaseStatuses[i].FilterChannelStatus.ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionFalse, Reason: "NotAddressable", Message: "Channel is not addressable"} + allReady = false + } + + // Mark the Choice address as the Address of the first channel. + if i == 0 { + ps.setAddress(address) + } + } + if allReady { + pChoiceCondSet.Manage(ps).MarkTrue(ChoiceConditionChannelsReady) + } else { + ps.MarkChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none") + } +} + +func (ps *ChoiceStatus) MarkChannelsNotReady(reason, messageFormat string, messageA ...interface{}) { + pChoiceCondSet.Manage(ps).MarkFalse(ChoiceConditionChannelsReady, reason, messageFormat, messageA...) +} + +func (ps *ChoiceStatus) MarkSubscriptionsNotReady(reason, messageFormat string, messageA ...interface{}) { + pChoiceCondSet.Manage(ps).MarkFalse(ChoiceConditionSubscriptionsReady, reason, messageFormat, messageA...) +} + +func (ps *ChoiceStatus) MarkAddressableNotReady(reason, messageFormat string, messageA ...interface{}) { + pChoiceCondSet.Manage(ps).MarkFalse(ChoiceConditionAddressable, reason, messageFormat, messageA...) +} + +func (ps *ChoiceStatus) setAddress(address *pkgduckv1alpha1.Addressable) { + ps.Address = address + + if address == nil { + pChoiceCondSet.Manage(ps).MarkFalse(ChoiceConditionAddressable, "emptyHostname", "hostname is the empty string") + return + } + if address.URL != nil || address.Hostname != "" { + pChoiceCondSet.Manage(ps).MarkTrue(ChoiceConditionAddressable) + } else { + ps.Address.Hostname = "" + ps.Address.URL = nil + pChoiceCondSet.Manage(ps).MarkFalse(ChoiceConditionAddressable, "emptyHostname", "hostname is the empty string") + } +} diff --git a/pkg/apis/messaging/v1alpha1/choice_lifecycle_test.go b/pkg/apis/messaging/v1alpha1/choice_lifecycle_test.go new file mode 100644 index 00000000000..29c8a0d4ccb --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/choice_lifecycle_test.go @@ -0,0 +1,441 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/pkg/apis" + + "github.com/google/go-cmp/cmp" + duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgduckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" +) + +var choiceConditionReady = apis.Condition{ + Type: ChoiceConditionReady, + Status: corev1.ConditionTrue, +} + +var choiceConditionChannelsReady = apis.Condition{ + Type: ChoiceConditionChannelsReady, + Status: corev1.ConditionTrue, +} + +var choiceConditionSubscriptionsReady = apis.Condition{ + Type: ChoiceConditionSubscriptionsReady, + Status: corev1.ConditionTrue, +} + +func TestChoiceGetCondition(t *testing.T) { + tests := []struct { + name string + ss *ChoiceStatus + condQuery apis.ConditionType + want *apis.Condition + }{{ + name: "single condition", + ss: &ChoiceStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{ + choiceConditionReady, + }, + }, + }, + condQuery: apis.ConditionReady, + want: &choiceConditionReady, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.ss.GetCondition(test.condQuery) + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } + }) + } +} + +func TestChoiceInitializeConditions(t *testing.T) { + tests := []struct { + name string + ts *ChoiceStatus + want *ChoiceStatus + }{{ + name: "empty", + ts: &ChoiceStatus{}, + want: &ChoiceStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: ChoiceConditionAddressable, + Status: corev1.ConditionUnknown, + }, { + Type: ChoiceConditionChannelsReady, + Status: corev1.ConditionUnknown, + }, { + Type: ChoiceConditionReady, + Status: corev1.ConditionUnknown, + }, { + Type: ChoiceConditionSubscriptionsReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one false", + ts: &ChoiceStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: ChoiceConditionChannelsReady, + Status: corev1.ConditionFalse, + }}, + }, + }, + want: &ChoiceStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: ChoiceConditionAddressable, + Status: corev1.ConditionUnknown, + }, { + Type: ChoiceConditionChannelsReady, + Status: corev1.ConditionFalse, + }, { + Type: ChoiceConditionReady, + Status: corev1.ConditionUnknown, + }, { + Type: ChoiceConditionSubscriptionsReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one true", + ts: &ChoiceStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: ChoiceConditionSubscriptionsReady, + Status: corev1.ConditionTrue, + }}, + }, + }, + want: &ChoiceStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: ChoiceConditionAddressable, + Status: corev1.ConditionUnknown, + }, { + Type: ChoiceConditionChannelsReady, + Status: corev1.ConditionUnknown, + }, { + Type: ChoiceConditionReady, + Status: corev1.ConditionUnknown, + }, { + Type: ChoiceConditionSubscriptionsReady, + Status: corev1.ConditionTrue, + }}, + }, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + test.ts.InitializeConditions() + if diff := cmp.Diff(test.want, test.ts, ignoreAllButTypeAndStatus); diff != "" { + t.Errorf("unexpected conditions (-want, +got) = %v", diff) + } + }) + } +} + +func TestChoicePropagateSubscriptionStatuses(t *testing.T) { + tests := []struct { + name string + fsubs []*eventingv1alpha1.Subscription + subs []*eventingv1alpha1.Subscription + want corev1.ConditionStatus + }{{ + name: "empty", + fsubs: []*eventingv1alpha1.Subscription{}, + subs: []*eventingv1alpha1.Subscription{}, + want: corev1.ConditionFalse, + }, { + name: "empty status", + fsubs: []*eventingv1alpha1.Subscription{{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub", + Namespace: "testns", + }, + Status: eventingv1alpha1.SubscriptionStatus{}, + }}, subs: []*eventingv1alpha1.Subscription{{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub", + Namespace: "testns", + }, + Status: eventingv1alpha1.SubscriptionStatus{}, + }}, + want: corev1.ConditionFalse, + }, { + name: "one filter and subscriber subscription not ready", + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", false)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", false)}, + want: corev1.ConditionFalse, + }, { + name: "one filter and one subscription ready", + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true)}, + want: corev1.ConditionTrue, + }, { + name: "one filter subscription not ready and one subscription ready", + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", false)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true)}, + want: corev1.ConditionFalse, + }, { + name: "one subscription ready, one not", + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, + want: corev1.ConditionFalse, + }, { + name: "two subscriptions ready", + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: corev1.ConditionTrue, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := ChoiceStatus{} + ps.PropagateSubscriptionStatuses(test.fsubs, test.subs) + got := ps.GetCondition(ChoiceConditionSubscriptionsReady).Status + want := test.want + if want != got { + t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got) + } + }) + } +} + +func TestChoicePropagateChannelStatuses(t *testing.T) { + tests := []struct { + name string + ichannel *duckv1alpha1.Channelable + channels []*duckv1alpha1.Channelable + want corev1.ConditionStatus + }{{ + name: "ingress false, empty", + ichannel: getChannelable(false), + channels: []*duckv1alpha1.Channelable{}, + want: corev1.ConditionFalse, + }, { + name: "ingress false, one channelable not ready", + ichannel: getChannelable(false), + channels: []*duckv1alpha1.Channelable{getChannelable(false)}, + want: corev1.ConditionFalse, + }, { + name: "ingress true, one channelable not ready", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(false)}, + want: corev1.ConditionFalse, + }, { + name: "ingress false, one channelable ready", + ichannel: getChannelable(false), + channels: []*duckv1alpha1.Channelable{getChannelable(true)}, + want: corev1.ConditionFalse, + }, { + name: "ingress true, one channelable ready", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(true)}, + want: corev1.ConditionTrue, + }, { + name: "ingress true, one channelable ready, one not", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(false)}, + want: corev1.ConditionFalse, + }, { + name: "ingress true, two channelables ready", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(true)}, + want: corev1.ConditionTrue, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := ChoiceStatus{} + ps.PropagateChannelStatuses(test.ichannel, test.channels) + got := ps.GetCondition(ChoiceConditionChannelsReady).Status + want := test.want + if want != got { + t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got) + } + }) + } +} + +func TestChoiceReady(t *testing.T) { + tests := []struct { + name string + fsubs []*eventingv1alpha1.Subscription + subs []*eventingv1alpha1.Subscription + ichannel *duckv1alpha1.Channelable + channels []*duckv1alpha1.Channelable + want bool + }{{ + name: "ingress false, empty", + fsubs: []*eventingv1alpha1.Subscription{}, + subs: []*eventingv1alpha1.Subscription{}, + ichannel: getChannelable(false), + channels: []*duckv1alpha1.Channelable{}, + want: false, + }, { + name: "ingress true, empty", + fsubs: []*eventingv1alpha1.Subscription{}, + subs: []*eventingv1alpha1.Subscription{}, + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{}, + want: false, + }, { + name: "ingress true, one channelable not ready, one subscription ready", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(false)}, + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true)}, + want: false, + }, { + name: "ingress true, one channelable ready, one subscription not ready", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(true)}, + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", false)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", false)}, + want: false, + }, { + name: "ingress false, one channelable ready, one subscription ready", + ichannel: getChannelable(false), + channels: []*duckv1alpha1.Channelable{getChannelable(true)}, + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true)}, + want: false, + }, { + name: "ingress true, one channelable ready, one subscription ready", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(true)}, + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true)}, + want: true, + }, { + name: "ingress true, one channelable ready, one not, two subsriptions ready", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(false)}, + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: false, + }, { + name: "ingress true, two channelables ready, one subscription ready, one not", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, + want: false, + }, { + name: "ingress false, two channelables ready, two subscriptions ready", + ichannel: getChannelable(false), + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: false, + }, { + name: "ingress true, two channelables ready, two subscriptions ready", + ichannel: getChannelable(true), + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*eventingv1alpha1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: true, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := ChoiceStatus{} + ps.PropagateChannelStatuses(test.ichannel, test.channels) + ps.PropagateSubscriptionStatuses(test.fsubs, test.subs) + got := ps.IsReady() + want := test.want + if want != got { + t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got) + } + }) + } +} + +func TestChoicePropagateSetAddress(t *testing.T) { + URL, _ := apis.ParseURL("http://example.com") + tests := []struct { + name string + address *pkgduckv1alpha1.Addressable + want *pkgduckv1alpha1.Addressable + wantStatus corev1.ConditionStatus + }{{ + name: "nil", + address: nil, + want: nil, + wantStatus: corev1.ConditionFalse, + }, { + name: "empty", + address: &pkgduckv1alpha1.Addressable{}, + want: &pkgduckv1alpha1.Addressable{}, + wantStatus: corev1.ConditionFalse, + }, { + name: "URL", + address: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{URL}, ""}, + want: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{URL}, ""}, + wantStatus: corev1.ConditionTrue, + }, { + name: "hostname", + address: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{}, "myhostname"}, + want: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{}, "myhostname"}, + wantStatus: corev1.ConditionTrue, + }, { + name: "nil", + address: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{nil}, ""}, + want: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{}, ""}, + wantStatus: corev1.ConditionFalse, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := ChoiceStatus{} + ps.setAddress(test.address) + got := ps.Address + if diff := cmp.Diff(test.want, got, ignoreAllButTypeAndStatus); diff != "" { + t.Errorf("unexpected address (-want, +got) = %v", diff) + } + gotStatus := ps.GetCondition(ChoiceConditionAddressable).Status + if test.wantStatus != gotStatus { + t.Errorf("unexpected conditions (-want, +got) = %v %v", test.wantStatus, gotStatus) + } + }) + } +} diff --git a/pkg/apis/messaging/v1alpha1/choice_types.go b/pkg/apis/messaging/v1alpha1/choice_types.go new file mode 100644 index 00000000000..a3c9c6497fd --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/choice_types.go @@ -0,0 +1,165 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + eventingduckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/pkg/apis" + duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + "knative.dev/pkg/webhook" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// Choice defines conditional branches that will be wired in +// series through Channels and Subscriptions. +type Choice struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Spec defines the desired state of the Choice. + Spec ChoiceSpec `json:"spec,omitempty"` + + // Status represents the current state of the Choice. This data may be out of + // date. + // +optional + Status ChoiceStatus `json:"status,omitempty"` +} + +// Check that Choice can be validated, can be defaulted, and has immutable fields. +var _ apis.Validatable = (*Choice)(nil) +var _ apis.Defaultable = (*Choice)(nil) + +// TODO: make appropriate fields immutable. +//var _ apis.Immutable = (*Choice)(nil) +var _ runtime.Object = (*Choice)(nil) +var _ webhook.GenericCRD = (*Choice)(nil) + +type ChoiceSpec struct { + // Cases is the list of Filter/Subscribers pairs. Filters are evaluated in the order + // provided, until one pass (returns true) + Cases []ChoiceCase `json:"cases"` + + // ChannelTemplate specifies which Channel CRD to use + ChannelTemplate *eventingduckv1alpha1.ChannelTemplateSpec `json:"channelTemplate"` + + // Reply is a Reference to where the result of a case Subscriber gets sent to + // when the case does not have a Reply + // + // You can specify only the following fields of the ObjectReference: + // - Kind + // - APIVersion + // - Name + // + // The resource pointed by this ObjectReference must meet the Addressable contract + // with a reference to the Addressable duck type. If the resource does not meet this contract, + // it will be reflected in the Subscription's status. + // +optional + Reply *corev1.ObjectReference `json:"reply,omitempty"` +} + +type ChoiceCase struct { + // Filter is the expression guarding the branch/case + Filter *eventingv1alpha1.SubscriberSpec `json:"filter,omitempty"` + + // Subscriber receiving the event when the filter passes + Subscriber eventingv1alpha1.SubscriberSpec `json:"subscriber"` + + // Reply is a Reference to where the result of Subscriber of this case gets sent to. + // If not specified, sent the result to the Choice Reply + // + // You can specify only the following fields of the ObjectReference: + // - Kind + // - APIVersion + // - Name + // + // The resource pointed by this ObjectReference must meet the Addressable contract + // with a reference to the Addressable duck type. If the resource does not meet this contract, + // it will be reflected in the Subscription's status. + // +optional + Reply *corev1.ObjectReference `json:"reply,omitempty"` +} + +// ChoiceStatus represents the current state of a Choice. +type ChoiceStatus struct { + // inherits duck/v1alpha1 Status, which currently provides: + // * ObservedGeneration - the 'Generation' of the Service that was last processed by the controller. + // * Conditions - the latest available observations of a resource's current state. + duckv1beta1.Status `json:",inline"` + + // IngressChannelStatus corresponds to the ingress channel status. + IngressChannelStatus ChoiceChannelStatus `json:"ingressChannelStatus"` + + // CaseStatuses is an array of corresponding to cases status. + // Matches the Spec.Cases array in the order. + CaseStatuses []ChoiceCaseStatus `json:"caseStatuses"` + + // AddressStatus is the starting point to this Choice. Sending to this + // will target the first subscriber. + // It generally has the form {channel}.{namespace}.svc.{cluster domain name} + duckv1alpha1.AddressStatus `json:",inline"` +} + +// ChoiceCaseStatus represents the current state of a Choice case +type ChoiceCaseStatus struct { + // FilterSubscriptionStatus corresponds to the filter subscription status. + FilterSubscriptionStatus ChoiceSubscriptionStatus `json:"filterSubscriptionStatus"` + + // FilterChannelStatus corresponds to the filter channel status. + FilterChannelStatus ChoiceChannelStatus `json:"filterChannelStatus"` + + // SubscriptionStatus corresponds to the subscriber subscription status. + SubscriptionStatus ChoiceSubscriptionStatus `json:"subscriberSubscriptionStatus"` +} + +type ChoiceChannelStatus struct { + // Channel is the reference to the underlying channel. + Channel corev1.ObjectReference `json:"channel"` + + // ReadyCondition indicates whether the Channel is ready or not. + ReadyCondition apis.Condition `json:"ready"` +} + +type ChoiceSubscriptionStatus struct { + // Subscription is the reference to the underlying Subscription. + Subscription corev1.ObjectReference `json:"subscription"` + + // ReadyCondition indicates whether the Subscription is ready or not. + ReadyCondition apis.Condition `json:"ready"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ChoiceList is a collection of Choices. +type ChoiceList struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ListMeta `json:"metadata,omitempty"` + Items []Choice `json:"items"` +} + +// GetGroupVersionKind returns GroupVersionKind for Choice +func (p *Choice) GetGroupVersionKind() schema.GroupVersionKind { + return SchemeGroupVersion.WithKind("Choice") +} diff --git a/pkg/apis/messaging/v1alpha1/choice_types_test.go b/pkg/apis/messaging/v1alpha1/choice_types_test.go new file mode 100644 index 00000000000..8a52b161b5a --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/choice_types_test.go @@ -0,0 +1,26 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import "testing" + +func TestChoiceKind(t *testing.T) { + choice := Choice{} + if choice.GetGroupVersionKind().String() != "messaging.knative.dev/v1alpha1, Kind=Choice" { + t.Errorf("unexpected gvk: %v", choice.GetGroupVersionKind()) + } +} diff --git a/pkg/apis/messaging/v1alpha1/choice_validation.go b/pkg/apis/messaging/v1alpha1/choice_validation.go new file mode 100644 index 00000000000..79983182dde --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/choice_validation.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/pkg/apis" +) + +func (p *Choice) Validate(ctx context.Context) *apis.FieldError { + return p.Spec.Validate(ctx).ViaField("spec") +} + +func (ps *ChoiceSpec) Validate(ctx context.Context) *apis.FieldError { + var errs *apis.FieldError + + if len(ps.Cases) == 0 { + errs = errs.Also(apis.ErrMissingField("cases")) + } + + for i, s := range ps.Cases { + if s.Filter != nil { + if err := eventingv1alpha1.IsValidSubscriberSpec(*s.Filter); err != nil { + errs = errs.Also(err.ViaField("filter")) + } + } + + if e := eventingv1alpha1.IsValidSubscriberSpec(s.Subscriber); e != nil { + errs = errs.Also(apis.ErrInvalidArrayValue(s, "cases", i)) + } + } + + if ps.ChannelTemplate == nil { + errs = errs.Also(apis.ErrMissingField("channelTemplate")) + return errs + } + + if len(ps.ChannelTemplate.APIVersion) == 0 { + errs = errs.Also(apis.ErrMissingField("channelTemplate.apiVersion")) + } + + if len(ps.ChannelTemplate.Kind) == 0 { + errs = errs.Also(apis.ErrMissingField("channelTemplate.kind")) + } + if ps.Reply != nil { + if err := eventingv1alpha1.IsValidObjectReference(*ps.Reply); err != nil { + errs = errs.Also(err.ViaField("reply")) + } + } + return errs +} diff --git a/pkg/apis/messaging/v1alpha1/choice_validation_test.go b/pkg/apis/messaging/v1alpha1/choice_validation_test.go new file mode 100644 index 00000000000..e3608496e35 --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/choice_validation_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "knative.dev/pkg/apis" +) + +func TestChoiceValidation(t *testing.T) { + name := "invalid choice spec" + choice := &Choice{Spec: ChoiceSpec{}} + + want := &apis.FieldError{ + Paths: []string{"spec.channelTemplate", "spec.cases"}, + Message: "missing field(s)", + } + + t.Run(name, func(t *testing.T) { + got := choice.Validate(context.TODO()) + if diff := cmp.Diff(want.Error(), got.Error()); diff != "" { + t.Errorf("Choice.Validate (-want, +got) = %v", diff) + } + }) +} + +func TestChoiceSpecValidation(t *testing.T) { + subscriberURI := "http://example.com" + validChannelTemplate := &eventingduck.ChannelTemplateSpec{ + metav1.TypeMeta{ + Kind: "mykind", + APIVersion: "myapiversion", + }, + &runtime.RawExtension{}, + } + tests := []struct { + name string + ts *ChoiceSpec + want *apis.FieldError + }{{ + name: "invalid choice spec - empty", + ts: &ChoiceSpec{}, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channelTemplate", "cases") + return fe + }(), + }, { + name: "invalid choice spec - empty cases", + ts: &ChoiceSpec{ + ChannelTemplate: validChannelTemplate, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("cases") + return fe + }(), + }, { + name: "missing channeltemplatespec", + ts: &ChoiceSpec{ + Cases: []ChoiceCase{{Subscriber: eventingv1alpha1.SubscriberSpec{URI: &subscriberURI}}}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channelTemplate") + return fe + }(), + }, { + name: "invalid channeltemplatespec missing APIVersion", + ts: &ChoiceSpec{ + ChannelTemplate: &eventingduck.ChannelTemplateSpec{metav1.TypeMeta{Kind: "mykind"}, &runtime.RawExtension{}}, + Cases: []ChoiceCase{{Subscriber: eventingv1alpha1.SubscriberSpec{URI: &subscriberURI}}}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channelTemplate.apiVersion") + return fe + }(), + }, { + name: "invalid channeltemplatespec missing Kind", + ts: &ChoiceSpec{ + ChannelTemplate: &eventingduck.ChannelTemplateSpec{metav1.TypeMeta{APIVersion: "myapiversion"}, &runtime.RawExtension{}}, + Cases: []ChoiceCase{{Subscriber: eventingv1alpha1.SubscriberSpec{URI: &subscriberURI}}}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channelTemplate.kind") + return fe + }(), + }, { + name: "valid choice", + ts: &ChoiceSpec{ + ChannelTemplate: validChannelTemplate, + Cases: []ChoiceCase{{Subscriber: eventingv1alpha1.SubscriberSpec{URI: &subscriberURI}}}, + }, + want: func() *apis.FieldError { + return nil + }(), + }, { + name: "valid choice with valid reply", + ts: &ChoiceSpec{ + ChannelTemplate: validChannelTemplate, + Cases: []ChoiceCase{{Subscriber: eventingv1alpha1.SubscriberSpec{URI: &subscriberURI}}}, + Reply: makeValidReply("reply-channel"), + }, + want: func() *apis.FieldError { + return nil + }(), + }, { + name: "valid choice with invalid missing name", + ts: &ChoiceSpec{ + ChannelTemplate: validChannelTemplate, + Cases: []ChoiceCase{{Subscriber: eventingv1alpha1.SubscriberSpec{URI: &subscriberURI}}}, + Reply: &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + }, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply.name") + return fe + }(), + }, { + name: "valid choice with invalid reply", + ts: &ChoiceSpec{ + ChannelTemplate: validChannelTemplate, + Cases: []ChoiceCase{{Subscriber: eventingv1alpha1.SubscriberSpec{URI: &subscriberURI}}}, + Reply: makeInvalidReply("reply-channel"), + }, + want: func() *apis.FieldError { + fe := apis.ErrDisallowedFields("reply.Namespace") + fe.Details = "only name, apiVersion and kind are supported fields" + return fe + }(), + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.ts.Validate(context.TODO()) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("%s: Validate ChoiceSpec (-want, +got) = %v", test.name, diff) + } + }) + } +} diff --git a/pkg/apis/messaging/v1alpha1/register.go b/pkg/apis/messaging/v1alpha1/register.go index 4f44a546034..786550068e0 100644 --- a/pkg/apis/messaging/v1alpha1/register.go +++ b/pkg/apis/messaging/v1alpha1/register.go @@ -51,6 +51,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &SequenceList{}, &Channel{}, &ChannelList{}, + &Choice{}, + &ChoiceList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go index 5b9d0c92e26..c156ddcbacb 100644 --- a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go @@ -138,6 +138,208 @@ func (in *ChannelStatus) DeepCopy() *ChannelStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Choice) DeepCopyInto(out *Choice) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Choice. +func (in *Choice) DeepCopy() *Choice { + if in == nil { + return nil + } + out := new(Choice) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Choice) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChoiceCase) DeepCopyInto(out *ChoiceCase) { + *out = *in + if in.Filter != nil { + in, out := &in.Filter, &out.Filter + *out = new(eventingv1alpha1.SubscriberSpec) + (*in).DeepCopyInto(*out) + } + in.Subscriber.DeepCopyInto(&out.Subscriber) + if in.Reply != nil { + in, out := &in.Reply, &out.Reply + *out = new(v1.ObjectReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChoiceCase. +func (in *ChoiceCase) DeepCopy() *ChoiceCase { + if in == nil { + return nil + } + out := new(ChoiceCase) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChoiceCaseStatus) DeepCopyInto(out *ChoiceCaseStatus) { + *out = *in + in.FilterSubscriptionStatus.DeepCopyInto(&out.FilterSubscriptionStatus) + in.FilterChannelStatus.DeepCopyInto(&out.FilterChannelStatus) + in.SubscriptionStatus.DeepCopyInto(&out.SubscriptionStatus) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChoiceCaseStatus. +func (in *ChoiceCaseStatus) DeepCopy() *ChoiceCaseStatus { + if in == nil { + return nil + } + out := new(ChoiceCaseStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChoiceChannelStatus) DeepCopyInto(out *ChoiceChannelStatus) { + *out = *in + out.Channel = in.Channel + in.ReadyCondition.DeepCopyInto(&out.ReadyCondition) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChoiceChannelStatus. +func (in *ChoiceChannelStatus) DeepCopy() *ChoiceChannelStatus { + if in == nil { + return nil + } + out := new(ChoiceChannelStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChoiceList) DeepCopyInto(out *ChoiceList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Choice, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChoiceList. +func (in *ChoiceList) DeepCopy() *ChoiceList { + if in == nil { + return nil + } + out := new(ChoiceList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ChoiceList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChoiceSpec) DeepCopyInto(out *ChoiceSpec) { + *out = *in + if in.Cases != nil { + in, out := &in.Cases, &out.Cases + *out = make([]ChoiceCase, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.ChannelTemplate != nil { + in, out := &in.ChannelTemplate, &out.ChannelTemplate + *out = new(duckv1alpha1.ChannelTemplateSpec) + (*in).DeepCopyInto(*out) + } + if in.Reply != nil { + in, out := &in.Reply, &out.Reply + *out = new(v1.ObjectReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChoiceSpec. +func (in *ChoiceSpec) DeepCopy() *ChoiceSpec { + if in == nil { + return nil + } + out := new(ChoiceSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChoiceStatus) DeepCopyInto(out *ChoiceStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) + in.IngressChannelStatus.DeepCopyInto(&out.IngressChannelStatus) + if in.CaseStatuses != nil { + in, out := &in.CaseStatuses, &out.CaseStatuses + *out = make([]ChoiceCaseStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + in.AddressStatus.DeepCopyInto(&out.AddressStatus) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChoiceStatus. +func (in *ChoiceStatus) DeepCopy() *ChoiceStatus { + if in == nil { + return nil + } + out := new(ChoiceStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChoiceSubscriptionStatus) DeepCopyInto(out *ChoiceSubscriptionStatus) { + *out = *in + out.Subscription = in.Subscription + in.ReadyCondition.DeepCopyInto(&out.ReadyCondition) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChoiceSubscriptionStatus. +func (in *ChoiceSubscriptionStatus) DeepCopy() *ChoiceSubscriptionStatus { + if in == nil { + return nil + } + out := new(ChoiceSubscriptionStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InMemoryChannel) DeepCopyInto(out *InMemoryChannel) { *out = *in diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/choice.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/choice.go new file mode 100644 index 00000000000..ea84c5d08d6 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/choice.go @@ -0,0 +1,174 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + scheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// ChoicesGetter has a method to return a ChoiceInterface. +// A group's client should implement this interface. +type ChoicesGetter interface { + Choices(namespace string) ChoiceInterface +} + +// ChoiceInterface has methods to work with Choice resources. +type ChoiceInterface interface { + Create(*v1alpha1.Choice) (*v1alpha1.Choice, error) + Update(*v1alpha1.Choice) (*v1alpha1.Choice, error) + UpdateStatus(*v1alpha1.Choice) (*v1alpha1.Choice, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.Choice, error) + List(opts v1.ListOptions) (*v1alpha1.ChoiceList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Choice, err error) + ChoiceExpansion +} + +// choices implements ChoiceInterface +type choices struct { + client rest.Interface + ns string +} + +// newChoices returns a Choices +func newChoices(c *MessagingV1alpha1Client, namespace string) *choices { + return &choices{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the choice, and returns the corresponding choice object, and an error if there is any. +func (c *choices) Get(name string, options v1.GetOptions) (result *v1alpha1.Choice, err error) { + result = &v1alpha1.Choice{} + err = c.client.Get(). + Namespace(c.ns). + Resource("choices"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of Choices that match those selectors. +func (c *choices) List(opts v1.ListOptions) (result *v1alpha1.ChoiceList, err error) { + result = &v1alpha1.ChoiceList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("choices"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested choices. +func (c *choices) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("choices"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a choice and creates it. Returns the server's representation of the choice, and an error, if there is any. +func (c *choices) Create(choice *v1alpha1.Choice) (result *v1alpha1.Choice, err error) { + result = &v1alpha1.Choice{} + err = c.client.Post(). + Namespace(c.ns). + Resource("choices"). + Body(choice). + Do(). + Into(result) + return +} + +// Update takes the representation of a choice and updates it. Returns the server's representation of the choice, and an error, if there is any. +func (c *choices) Update(choice *v1alpha1.Choice) (result *v1alpha1.Choice, err error) { + result = &v1alpha1.Choice{} + err = c.client.Put(). + Namespace(c.ns). + Resource("choices"). + Name(choice.Name). + Body(choice). + Do(). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *choices) UpdateStatus(choice *v1alpha1.Choice) (result *v1alpha1.Choice, err error) { + result = &v1alpha1.Choice{} + err = c.client.Put(). + Namespace(c.ns). + Resource("choices"). + Name(choice.Name). + SubResource("status"). + Body(choice). + Do(). + Into(result) + return +} + +// Delete takes name of the choice and deletes it. Returns an error if one occurs. +func (c *choices) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("choices"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *choices) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("choices"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched choice. +func (c *choices) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Choice, err error) { + result = &v1alpha1.Choice{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("choices"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_choice.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_choice.go new file mode 100644 index 00000000000..30c8ec25bb5 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_choice.go @@ -0,0 +1,140 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeChoices implements ChoiceInterface +type FakeChoices struct { + Fake *FakeMessagingV1alpha1 + ns string +} + +var choicesResource = schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1alpha1", Resource: "choices"} + +var choicesKind = schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1alpha1", Kind: "Choice"} + +// Get takes name of the choice, and returns the corresponding choice object, and an error if there is any. +func (c *FakeChoices) Get(name string, options v1.GetOptions) (result *v1alpha1.Choice, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(choicesResource, c.ns, name), &v1alpha1.Choice{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Choice), err +} + +// List takes label and field selectors, and returns the list of Choices that match those selectors. +func (c *FakeChoices) List(opts v1.ListOptions) (result *v1alpha1.ChoiceList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(choicesResource, choicesKind, c.ns, opts), &v1alpha1.ChoiceList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.ChoiceList{ListMeta: obj.(*v1alpha1.ChoiceList).ListMeta} + for _, item := range obj.(*v1alpha1.ChoiceList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested choices. +func (c *FakeChoices) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(choicesResource, c.ns, opts)) + +} + +// Create takes the representation of a choice and creates it. Returns the server's representation of the choice, and an error, if there is any. +func (c *FakeChoices) Create(choice *v1alpha1.Choice) (result *v1alpha1.Choice, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(choicesResource, c.ns, choice), &v1alpha1.Choice{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Choice), err +} + +// Update takes the representation of a choice and updates it. Returns the server's representation of the choice, and an error, if there is any. +func (c *FakeChoices) Update(choice *v1alpha1.Choice) (result *v1alpha1.Choice, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(choicesResource, c.ns, choice), &v1alpha1.Choice{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Choice), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeChoices) UpdateStatus(choice *v1alpha1.Choice) (*v1alpha1.Choice, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(choicesResource, "status", c.ns, choice), &v1alpha1.Choice{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Choice), err +} + +// Delete takes name of the choice and deletes it. Returns an error if one occurs. +func (c *FakeChoices) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(choicesResource, c.ns, name), &v1alpha1.Choice{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeChoices) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(choicesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.ChoiceList{}) + return err +} + +// Patch applies the patch and returns the patched choice. +func (c *FakeChoices) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Choice, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(choicesResource, c.ns, name, data, subresources...), &v1alpha1.Choice{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Choice), err +} diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_messaging_client.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_messaging_client.go index 90a70c27b96..b93702d704e 100644 --- a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_messaging_client.go +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_messaging_client.go @@ -32,6 +32,10 @@ func (c *FakeMessagingV1alpha1) Channels(namespace string) v1alpha1.ChannelInter return &FakeChannels{c, namespace} } +func (c *FakeMessagingV1alpha1) Choices(namespace string) v1alpha1.ChoiceInterface { + return &FakeChoices{c, namespace} +} + func (c *FakeMessagingV1alpha1) InMemoryChannels(namespace string) v1alpha1.InMemoryChannelInterface { return &FakeInMemoryChannels{c, namespace} } diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/generated_expansion.go index c3eadee3c40..72b52cbe160 100644 --- a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/generated_expansion.go @@ -20,6 +20,8 @@ package v1alpha1 type ChannelExpansion interface{} +type ChoiceExpansion interface{} + type InMemoryChannelExpansion interface{} type SequenceExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/messaging_client.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/messaging_client.go index 6ede09a4b2b..311835d649e 100644 --- a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/messaging_client.go +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/messaging_client.go @@ -28,6 +28,7 @@ import ( type MessagingV1alpha1Interface interface { RESTClient() rest.Interface ChannelsGetter + ChoicesGetter InMemoryChannelsGetter SequencesGetter } @@ -41,6 +42,10 @@ func (c *MessagingV1alpha1Client) Channels(namespace string) ChannelInterface { return newChannels(c, namespace) } +func (c *MessagingV1alpha1Client) Choices(namespace string) ChoiceInterface { + return newChoices(c, namespace) +} + func (c *MessagingV1alpha1Client) InMemoryChannels(namespace string) InMemoryChannelInterface { return newInMemoryChannels(c, namespace) } diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index ad007df212f..e18d7904109 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -71,6 +71,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=messaging.knative.dev, Version=v1alpha1 case messagingv1alpha1.SchemeGroupVersion.WithResource("channels"): return &genericInformer{resource: resource.GroupResource(), informer: f.Messaging().V1alpha1().Channels().Informer()}, nil + case messagingv1alpha1.SchemeGroupVersion.WithResource("choices"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Messaging().V1alpha1().Choices().Informer()}, nil case messagingv1alpha1.SchemeGroupVersion.WithResource("inmemorychannels"): return &genericInformer{resource: resource.GroupResource(), informer: f.Messaging().V1alpha1().InMemoryChannels().Informer()}, nil case messagingv1alpha1.SchemeGroupVersion.WithResource("sequences"): diff --git a/pkg/client/informers/externalversions/messaging/v1alpha1/choice.go b/pkg/client/informers/externalversions/messaging/v1alpha1/choice.go new file mode 100644 index 00000000000..8f9429ac691 --- /dev/null +++ b/pkg/client/informers/externalversions/messaging/v1alpha1/choice.go @@ -0,0 +1,89 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + messagingv1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + versioned "github.com/knative/eventing/pkg/client/clientset/versioned" + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// ChoiceInformer provides access to a shared informer and lister for +// Choices. +type ChoiceInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.ChoiceLister +} + +type choiceInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewChoiceInformer constructs a new informer for Choice type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewChoiceInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredChoiceInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredChoiceInformer constructs a new informer for Choice type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredChoiceInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.MessagingV1alpha1().Choices(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.MessagingV1alpha1().Choices(namespace).Watch(options) + }, + }, + &messagingv1alpha1.Choice{}, + resyncPeriod, + indexers, + ) +} + +func (f *choiceInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredChoiceInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *choiceInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&messagingv1alpha1.Choice{}, f.defaultInformer) +} + +func (f *choiceInformer) Lister() v1alpha1.ChoiceLister { + return v1alpha1.NewChoiceLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/messaging/v1alpha1/interface.go b/pkg/client/informers/externalversions/messaging/v1alpha1/interface.go index 6a61f938e30..ddc52e95c00 100644 --- a/pkg/client/informers/externalversions/messaging/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/messaging/v1alpha1/interface.go @@ -26,6 +26,8 @@ import ( type Interface interface { // Channels returns a ChannelInformer. Channels() ChannelInformer + // Choices returns a ChoiceInformer. + Choices() ChoiceInformer // InMemoryChannels returns a InMemoryChannelInformer. InMemoryChannels() InMemoryChannelInformer // Sequences returns a SequenceInformer. @@ -48,6 +50,11 @@ func (v *version) Channels() ChannelInformer { return &channelInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// Choices returns a ChoiceInformer. +func (v *version) Choices() ChoiceInformer { + return &choiceInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // InMemoryChannels returns a InMemoryChannelInformer. func (v *version) InMemoryChannels() InMemoryChannelInformer { return &inMemoryChannelInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/injection/informers/messaging/v1alpha1/choice/choice.go b/pkg/client/injection/informers/messaging/v1alpha1/choice/choice.go new file mode 100644 index 00000000000..92f7f0ab284 --- /dev/null +++ b/pkg/client/injection/informers/messaging/v1alpha1/choice/choice.go @@ -0,0 +1,52 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package choice + +import ( + "context" + + v1alpha1 "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1" + factory "github.com/knative/eventing/pkg/client/injection/informers/messaging/factory" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterInformer(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct{} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := factory.Get(ctx) + inf := f.Messaging().V1alpha1().Choices() + return context.WithValue(ctx, Key{}, inf), inf.Informer() +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context) v1alpha1.ChoiceInformer { + untyped := ctx.Value(Key{}) + if untyped == nil { + logging.FromContext(ctx).Fatalf( + "Unable to fetch %T from context.", (v1alpha1.ChoiceInformer)(nil)) + } + return untyped.(v1alpha1.ChoiceInformer) +} diff --git a/pkg/client/injection/informers/messaging/v1alpha1/choice/fake/fake.go b/pkg/client/injection/informers/messaging/v1alpha1/choice/fake/fake.go new file mode 100644 index 00000000000..17aa2044258 --- /dev/null +++ b/pkg/client/injection/informers/messaging/v1alpha1/choice/fake/fake.go @@ -0,0 +1,40 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + fake "github.com/knative/eventing/pkg/client/injection/informers/messaging/factory/fake" + choice "github.com/knative/eventing/pkg/client/injection/informers/messaging/v1alpha1/choice" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" +) + +var Get = choice.Get + +func init() { + injection.Fake.RegisterInformer(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := fake.Get(ctx) + inf := f.Messaging().V1alpha1().Choices() + return context.WithValue(ctx, choice.Key{}, inf), inf.Informer() +} diff --git a/pkg/client/listers/messaging/v1alpha1/choice.go b/pkg/client/listers/messaging/v1alpha1/choice.go new file mode 100644 index 00000000000..03b3170717c --- /dev/null +++ b/pkg/client/listers/messaging/v1alpha1/choice.go @@ -0,0 +1,94 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// ChoiceLister helps list Choices. +type ChoiceLister interface { + // List lists all Choices in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.Choice, err error) + // Choices returns an object that can list and get Choices. + Choices(namespace string) ChoiceNamespaceLister + ChoiceListerExpansion +} + +// choiceLister implements the ChoiceLister interface. +type choiceLister struct { + indexer cache.Indexer +} + +// NewChoiceLister returns a new ChoiceLister. +func NewChoiceLister(indexer cache.Indexer) ChoiceLister { + return &choiceLister{indexer: indexer} +} + +// List lists all Choices in the indexer. +func (s *choiceLister) List(selector labels.Selector) (ret []*v1alpha1.Choice, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Choice)) + }) + return ret, err +} + +// Choices returns an object that can list and get Choices. +func (s *choiceLister) Choices(namespace string) ChoiceNamespaceLister { + return choiceNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// ChoiceNamespaceLister helps list and get Choices. +type ChoiceNamespaceLister interface { + // List lists all Choices in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.Choice, err error) + // Get retrieves the Choice from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.Choice, error) + ChoiceNamespaceListerExpansion +} + +// choiceNamespaceLister implements the ChoiceNamespaceLister +// interface. +type choiceNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all Choices in the indexer for a given namespace. +func (s choiceNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.Choice, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Choice)) + }) + return ret, err +} + +// Get retrieves the Choice from the indexer for a given namespace and name. +func (s choiceNamespaceLister) Get(name string) (*v1alpha1.Choice, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("choice"), name) + } + return obj.(*v1alpha1.Choice), nil +} diff --git a/pkg/client/listers/messaging/v1alpha1/expansion_generated.go b/pkg/client/listers/messaging/v1alpha1/expansion_generated.go index 2d242f716bb..61615bd665a 100644 --- a/pkg/client/listers/messaging/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/messaging/v1alpha1/expansion_generated.go @@ -26,6 +26,14 @@ type ChannelListerExpansion interface{} // ChannelNamespaceLister. type ChannelNamespaceListerExpansion interface{} +// ChoiceListerExpansion allows custom methods to be added to +// ChoiceLister. +type ChoiceListerExpansion interface{} + +// ChoiceNamespaceListerExpansion allows custom methods to be added to +// ChoiceNamespaceLister. +type ChoiceNamespaceListerExpansion interface{} + // InMemoryChannelListerExpansion allows custom methods to be added to // InMemoryChannelLister. type InMemoryChannelListerExpansion interface{} diff --git a/pkg/reconciler/choice/choice.go b/pkg/reconciler/choice/choice.go new file mode 100644 index 00000000000..cd7d196cc6f --- /dev/null +++ b/pkg/reconciler/choice/choice.go @@ -0,0 +1,279 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package choice + +import ( + "context" + "errors" + "fmt" + "reflect" + + duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + eventinglisters "github.com/knative/eventing/pkg/client/listers/eventing/v1alpha1" + listers "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1" + "github.com/knative/eventing/pkg/duck" + "github.com/knative/eventing/pkg/logging" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/choice/resources" + "github.com/knative/eventing/pkg/utils" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" + duckroot "knative.dev/pkg/apis" + duckapis "knative.dev/pkg/apis/duck" + "knative.dev/pkg/controller" + "knative.dev/pkg/tracker" +) + +const ( + reconciled = "Reconciled" + reconcileFailed = "ReconcileFailed" + updateStatusFailed = "UpdateStatusFailed" +) + +type Reconciler struct { + *reconciler.Base + + // listers index properties about resources + choiceLister listers.ChoiceLister + tracker tracker.Interface + resourceTracker duck.ResourceTracker + subscriptionLister eventinglisters.SubscriptionLister +} + +// Check that our Reconciler implements controller.Reconciler +var _ controller.Reconciler = (*Reconciler)(nil) + +// Reconcile compares the actual state with the desired, and attempts to +// reconcile the two. It then updates the Status block of the Choice resource +// with the current Status of the resource. +func (r *Reconciler) Reconcile(ctx context.Context, key string) error { + logging.FromContext(ctx).Debug("reconciling", zap.String("key", key)) + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + logging.FromContext(ctx).Error("invalid resource key", zap.Error(err)) + return nil + } + + // Get the Choice resource with this namespace/name + original, err := r.choiceLister.Choices(namespace).Get(name) + if apierrs.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logging.FromContext(ctx).Error("Choice key in work queue no longer exists") + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy + choice := original.DeepCopy() + + // Reconcile this copy of the Choice and then write back any status + // updates regardless of whether the reconcile error out. + reconcileErr := r.reconcile(ctx, choice) + if reconcileErr != nil { + logging.FromContext(ctx).Error("Error reconciling Choice", zap.Error(reconcileErr)) + r.Recorder.Eventf(choice, corev1.EventTypeWarning, reconcileFailed, "Choice reconciliation failed: %v", reconcileErr) + } else { + logging.FromContext(ctx).Debug("Successfully reconciled Choice") + r.Recorder.Eventf(choice, corev1.EventTypeNormal, reconciled, "Choice reconciled") + } + + if _, updateStatusErr := r.updateStatus(ctx, choice); updateStatusErr != nil { + logging.FromContext(ctx).Warn("Error updating Choice status", zap.Error(updateStatusErr)) + r.Recorder.Eventf(choice, corev1.EventTypeWarning, updateStatusFailed, "Failed to update choice status: %s", key) + return updateStatusErr + } + + // Requeue if the resource is not ready: + return reconcileErr +} + +func (r *Reconciler) reconcile(ctx context.Context, p *v1alpha1.Choice) error { + p.Status.InitializeConditions() + + // Reconciling choice is pretty straightforward, it does the following things: + // 1. Create a channel fronting the whole choice and one filter channel per case. + // 2. For each of the Cases: + // 2.1 create a Subscription to the fronting Channel, subscribe the filter and send reply to the filter Channel + // 2.2 create a Subscription to the filter Channel, subcribe the subscriber and send reply to + // either the case Reply. If not present, send reply to the global Reply. If not present, do not send reply. + // 3. Rinse and repeat step #2 above for each Case in the list + if p.DeletionTimestamp != nil { + // Everything is cleaned up by the garbage collector. + return nil + } + + channelResourceInterface := r.DynamicClientSet.Resource(duckroot.KindToResource(p.Spec.ChannelTemplate.GetObjectKind().GroupVersionKind())).Namespace(p.Namespace) + + if channelResourceInterface == nil { + msg := fmt.Sprintf("Unable to create dynamic client for: %+v", p.Spec.ChannelTemplate) + logging.FromContext(ctx).Error(msg) + return errors.New(msg) + } + + // Tell tracker to reconcile this Choice whenever my channels change. + track := r.resourceTracker.TrackInNamespace(p) + + var ingressChannel *duckv1alpha1.Channelable + channels := make([]*duckv1alpha1.Channelable, 0, len(p.Spec.Cases)) + for i := -1; i < len(p.Spec.Cases); i++ { + var channelName string + if i == -1 { + channelName = resources.ChoiceChannelName(p.Name) + } else { + channelName = resources.ChoiceCaseChannelName(p.Name, i) + } + + c, err := r.reconcileChannel(ctx, channelName, channelResourceInterface, p) + if err != nil { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to reconcile Channel Object: %s/%s", p.Namespace, channelName), zap.Error(err)) + return err + + } + // Convert to Channel duck so that we can treat all Channels the same. + channelable := &duckv1alpha1.Channelable{} + err = duckapis.FromUnstructured(c, channelable) + if err != nil { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", p.Namespace, channelName), zap.Error(err)) + return err + + } + // Track channels and enqueue choice when they change. + if err = track(utils.ObjectRef(channelable, channelable.GroupVersionKind())); err != nil { + logging.FromContext(ctx).Error("Unable to track changes to Channel", zap.Error(err)) + return err + } + logging.FromContext(ctx).Info(fmt.Sprintf("Reconciled Channel Object: %s/%s %+v", p.Namespace, channelName, c)) + + if i == -1 { + ingressChannel = channelable + } else { + channels = append(channels, channelable) + } + } + p.Status.PropagateChannelStatuses(ingressChannel, channels) + + filterSubs := make([]*eventingv1alpha1.Subscription, 0, len(p.Spec.Cases)) + subs := make([]*eventingv1alpha1.Subscription, 0, len(p.Spec.Cases)) + for i := 0; i < len(p.Spec.Cases); i++ { + filterSub, sub, err := r.reconcileCase(ctx, i, p) + if err != nil { + return fmt.Errorf("Failed to reconcile Subscription Objects for case: %d : %s", i, err) + } + subs = append(subs, sub) + filterSubs = append(filterSubs, filterSub) + logging.FromContext(ctx).Debug(fmt.Sprintf("Reconciled Subscription Objects for case: %d: %+v, %+v", i, filterSub, sub)) + } + p.Status.PropagateSubscriptionStatuses(filterSubs, subs) + + return nil +} + +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.Choice) (*v1alpha1.Choice, error) { + p, err := r.choiceLister.Choices(desired.Namespace).Get(desired.Name) + if err != nil { + return nil, err + } + + // If there's nothing to update, just return. + if reflect.DeepEqual(p.Status, desired.Status) { + return p, nil + } + + // Don't modify the informers copy. + existing := p.DeepCopy() + existing.Status = desired.Status + + return r.EventingClientSet.MessagingV1alpha1().Choices(desired.Namespace).UpdateStatus(existing) +} + +func (r *Reconciler) reconcileChannel(ctx context.Context, channelName string, channelResourceInterface dynamic.ResourceInterface, p *v1alpha1.Choice) (*unstructured.Unstructured, error) { + c, err := channelResourceInterface.Get(channelName, metav1.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + newChannel, err := resources.NewChannel(channelName, p) + logging.FromContext(ctx).Error(fmt.Sprintf("Creating Channel Object: %+v", newChannel)) + if err != nil { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to create Channel resource object: %s/%s", p.Namespace, channelName), zap.Error(err)) + return nil, err + } + created, err := channelResourceInterface.Create(newChannel, metav1.CreateOptions{}) + if err != nil { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to create Channel: %s/%s", p.Namespace, channelName), zap.Error(err)) + return nil, err + } + logging.FromContext(ctx).Info(fmt.Sprintf("Created Channel: %s/%s", p.Namespace, channelName), zap.Any("NewChannel", newChannel)) + return created, nil + } + + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to get Channel: %s/%s", p.Namespace, channelName), zap.Error(err)) + return nil, err + } + logging.FromContext(ctx).Debug(fmt.Sprintf("Found Channel: %s/%s", p.Namespace, channelName), zap.Any("NewChannel", c)) + return c, nil +} + +func (r *Reconciler) reconcileCase(ctx context.Context, caseNumber int, p *v1alpha1.Choice) (*eventingv1alpha1.Subscription, *eventingv1alpha1.Subscription, error) { + filterExpected := resources.NewFilterSubscription(caseNumber, p) + filterSubName := resources.ChoiceFilterSubscriptionName(p.Name, caseNumber) + + filterSub, err := r.reconcileSubscription(ctx, caseNumber, filterExpected, filterSubName, p.Namespace) + if err != nil { + return nil, nil, err + } + + expected := resources.NewSubscription(caseNumber, p) + subName := resources.ChoiceSubscriptionName(p.Name, caseNumber) + + sub, err := r.reconcileSubscription(ctx, caseNumber, expected, subName, p.Namespace) + if err != nil { + return nil, nil, err + } + + return filterSub, sub, nil +} + +func (r *Reconciler) reconcileSubscription(ctx context.Context, caseNumber int, expected *eventingv1alpha1.Subscription, subName, ns string) (*eventingv1alpha1.Subscription, error) { + sub, err := r.subscriptionLister.Subscriptions(ns).Get(subName) + + // If the resource doesn't exist, we'll create it. + if apierrs.IsNotFound(err) { + sub = expected + logging.FromContext(ctx).Info(fmt.Sprintf("Creating subscription: %+v", sub)) + newSub, err := r.EventingClientSet.EventingV1alpha1().Subscriptions(sub.Namespace).Create(sub) + if err != nil { + // TODO: Send events here, or elsewhere? + //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Choice's subscription failed: %v", err) + return nil, fmt.Errorf("Failed to create Subscription Object for case: %d : %s", caseNumber, err) + } + return newSub, nil + } else if err != nil { + logging.FromContext(ctx).Error("Failed to get subscription", zap.Error(err)) + // TODO: Send events here, or elsewhere? + //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Choices's subscription failed: %v", err) + return nil, fmt.Errorf("Failed to get subscription: %s", err) + } + return sub, nil +} diff --git a/pkg/reconciler/choice/choice_test.go b/pkg/reconciler/choice/choice_test.go new file mode 100644 index 00000000000..fc287886453 --- /dev/null +++ b/pkg/reconciler/choice/choice_test.go @@ -0,0 +1,554 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package choice + +import ( + "context" + "fmt" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + clientgotesting "k8s.io/client-go/testing" + "knative.dev/pkg/apis" + duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + logtesting "knative.dev/pkg/logging/testing" + . "knative.dev/pkg/reconciler/testing" + + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + "github.com/knative/eventing/pkg/duck" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/choice/resources" + . "github.com/knative/eventing/pkg/reconciler/testing" + reconciletesting "github.com/knative/eventing/pkg/reconciler/testing" +) + +const ( + testNS = "test-namespace" + choiceName = "test-choice" + choiceUID = "test-choice-uid" + replyChannelName = "reply-channel" +) + +func init() { + // Add types to scheme + _ = v1alpha1.AddToScheme(scheme.Scheme) + _ = duckv1alpha1.AddToScheme(scheme.Scheme) +} + +type fakeAddressableInformer struct{} + +func (*fakeAddressableInformer) NewTracker(callback func(string), lease time.Duration) duck.ResourceTracker { + return fakeResourceTracker{} +} + +type fakeResourceTracker struct{} + +func (fakeResourceTracker) TrackInNamespace(metav1.Object) func(corev1.ObjectReference) error { + return func(corev1.ObjectReference) error { return nil } +} + +func (fakeResourceTracker) Track(ref corev1.ObjectReference, obj interface{}) error { + return nil +} + +func (fakeResourceTracker) OnChanged(obj interface{}) { +} + +func TestAllCases(t *testing.T) { + pKey := testNS + "/" + choiceName + imc := &eventingduck.ChannelTemplateSpec{ + metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + }, + &runtime.RawExtension{Raw: []byte("{}")}, + } + + table := TableTest{ + { + Name: "bad workqueue key", + // Make sure Reconcile handles bad keys. + Key: "too/many/parts", + }, { + Name: "key not found", + // Make sure Reconcile handles good keys that don't exist. + Key: "foo/not-found", + }, { // TODO: there is a bug in the controller, it will query for "" + // Name: "trigger key not found ", + // Objects: []runtime.Object{ + // reconciletesting.NewTrigger(triggerName, testNS), + // }, + // Key: "foo/incomplete", + // WantErr: true, + // WantEvents: []string{ + // Eventf(corev1.EventTypeWarning, "ChannelReferenceFetchFailed", "Failed to validate spec.channel exists: s \"\" not found"), + // }, + }, { + Name: "deleting", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceDeleted)}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Choice reconciled"), + }, + }, { + Name: "singlecase, no filter", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + }))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Choice reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(choiceName), + createCaseChannel(choiceName, 0), + resources.NewFilterSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + }))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{{Subscriber: createSubscriber(0)}}), + reconciletesting.WithChoiceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithChoiceAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithChoiceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithChoiceIngressChannelStatus(createChoiceChannelStatus(choiceName, corev1.ConditionFalse)), + reconciletesting.WithChoiceCaseStatuses([]v1alpha1.ChoiceCaseStatus{{ + FilterSubscriptionStatus: createChoiceFilterSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + FilterChannelStatus: createChoiceCaseChannelStatus(choiceName, 0, corev1.ConditionFalse), + SubscriptionStatus: createChoiceSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + }})), + }}, + }, { + Name: "singlecase, with filter", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Choice reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(choiceName), + createCaseChannel(choiceName, 0), + resources.NewFilterSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + reconciletesting.WithChoiceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithChoiceAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithChoiceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithChoiceIngressChannelStatus(createChoiceChannelStatus(choiceName, corev1.ConditionFalse)), + reconciletesting.WithChoiceCaseStatuses([]v1alpha1.ChoiceCaseStatus{{ + FilterSubscriptionStatus: createChoiceFilterSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + FilterChannelStatus: createChoiceCaseChannelStatus(choiceName, 0, corev1.ConditionFalse), + SubscriptionStatus: createChoiceSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + }})), + }}, + }, { + Name: "singlecase, no filter, with global reply", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + }))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Choice reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(choiceName), + createCaseChannel(choiceName, 0), + resources.NewFilterSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + }), reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + }), + reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)), + reconciletesting.WithChoiceAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithChoiceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithChoiceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithChoiceIngressChannelStatus(createChoiceChannelStatus(choiceName, corev1.ConditionFalse)), + reconciletesting.WithChoiceCaseStatuses([]v1alpha1.ChoiceCaseStatus{{ + FilterSubscriptionStatus: createChoiceFilterSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + FilterChannelStatus: createChoiceCaseChannelStatus(choiceName, 0, corev1.ConditionFalse), + SubscriptionStatus: createChoiceSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + }})), + }}, + }, { + Name: "singlecase, no filter, with case and global reply", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0), Reply: createCaseReplyChannel(0)}, + }))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Choice reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(choiceName), + createCaseChannel(choiceName, 0), + resources.NewFilterSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0), Reply: createCaseReplyChannel(0)}, + }), reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0), Reply: createCaseReplyChannel(0)}, + }), + reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)), + reconciletesting.WithChoiceAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithChoiceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithChoiceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithChoiceIngressChannelStatus(createChoiceChannelStatus(choiceName, corev1.ConditionFalse)), + reconciletesting.WithChoiceCaseStatuses([]v1alpha1.ChoiceCaseStatus{{ + FilterSubscriptionStatus: createChoiceFilterSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + FilterChannelStatus: createChoiceCaseChannelStatus(choiceName, 0, corev1.ConditionFalse), + SubscriptionStatus: createChoiceSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + }})), + }}, + }, + { + Name: "two cases, no filters", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Choice reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(choiceName), + createCaseChannel(choiceName, 0), + createCaseChannel(choiceName, 1), + resources.NewFilterSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(1, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(1, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + })))}, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + reconciletesting.WithChoiceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithChoiceAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithChoiceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithChoiceIngressChannelStatus(createChoiceChannelStatus(choiceName, corev1.ConditionFalse)), + reconciletesting.WithChoiceCaseStatuses([]v1alpha1.ChoiceCaseStatus{ + { + FilterSubscriptionStatus: createChoiceFilterSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + FilterChannelStatus: createChoiceCaseChannelStatus(choiceName, 0, corev1.ConditionFalse), + SubscriptionStatus: createChoiceSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createChoiceFilterSubscriptionStatus(choiceName, 1, corev1.ConditionFalse), + FilterChannelStatus: createChoiceCaseChannelStatus(choiceName, 1, corev1.ConditionFalse), + SubscriptionStatus: createChoiceSubscriptionStatus(choiceName, 1, corev1.ConditionFalse), + }})), + }}, + }, { + Name: "two cases with global reply", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Choice reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(choiceName), + createCaseChannel(choiceName, 0), + createCaseChannel(choiceName, 1), + resources.NewFilterSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(0, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)))), + resources.NewFilterSubscription(1, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(1, reconciletesting.NewChoice(choiceName, testNS, reconciletesting.WithChoiceChannelTemplateSpec(imc), reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewChoice(choiceName, testNS, + reconciletesting.WithInitChoiceConditions, + reconciletesting.WithChoiceReply(createReplyChannel(replyChannelName)), + reconciletesting.WithChoiceChannelTemplateSpec(imc), + reconciletesting.WithChoiceCases([]v1alpha1.ChoiceCase{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + reconciletesting.WithChoiceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithChoiceAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithChoiceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithChoiceIngressChannelStatus(createChoiceChannelStatus(choiceName, corev1.ConditionFalse)), + reconciletesting.WithChoiceCaseStatuses([]v1alpha1.ChoiceCaseStatus{ + { + FilterSubscriptionStatus: createChoiceFilterSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + FilterChannelStatus: createChoiceCaseChannelStatus(choiceName, 0, corev1.ConditionFalse), + SubscriptionStatus: createChoiceSubscriptionStatus(choiceName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createChoiceFilterSubscriptionStatus(choiceName, 1, corev1.ConditionFalse), + FilterChannelStatus: createChoiceCaseChannelStatus(choiceName, 1, corev1.ConditionFalse), + SubscriptionStatus: createChoiceSubscriptionStatus(choiceName, 1, corev1.ConditionFalse), + }})), + }}, + }, + } + + defer logtesting.ClearAll() + table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { + return &Reconciler{ + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + choiceLister: listers.GetChoiceLister(), + resourceTracker: fakeResourceTracker{}, + subscriptionLister: listers.GetSubscriptionLister(), + } + }, false)) +} + +func createCaseReplyChannel(caseNumber int) *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: fmt.Sprintf("%s-case-%d", replyChannelName, caseNumber), + } +} + +func createReplyChannel(channelName string) *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: channelName, + } +} + +func createChannel(choiceName string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1alpha1", + "kind": "inmemorychannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": testNS, + "name": resources.ChoiceChannelName(choiceName), + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1alpha1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Choice", + "name": choiceName, + "uid": "", + }, + }, + }, + "spec": map[string]interface{}{}, + }, + } +} + +func createCaseChannel(choiceName string, caseNumber int) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1alpha1", + "kind": "inmemorychannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": testNS, + "name": resources.ChoiceCaseChannelName(choiceName, caseNumber), + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1alpha1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Choice", + "name": choiceName, + "uid": "", + }, + }, + }, + "spec": map[string]interface{}{}, + }, + } +} + +func createChoiceCaseChannelStatus(choiceName string, caseNumber int, status corev1.ConditionStatus) v1alpha1.ChoiceChannelStatus { + return v1alpha1.ChoiceChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.ChoiceCaseChannelName(choiceName, caseNumber), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: status, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + } +} + +func createChoiceChannelStatus(choiceName string, status corev1.ConditionStatus) v1alpha1.ChoiceChannelStatus { + return v1alpha1.ChoiceChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.ChoiceChannelName(choiceName), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: status, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + } +} + +func createChoiceFilterSubscriptionStatus(choiceName string, caseNumber int, status corev1.ConditionStatus) v1alpha1.ChoiceSubscriptionStatus { + return v1alpha1.ChoiceSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.ChoiceFilterSubscriptionName(choiceName, caseNumber), + Namespace: testNS, + }, + } +} + +func createChoiceSubscriptionStatus(choiceName string, caseNumber int, status corev1.ConditionStatus) v1alpha1.ChoiceSubscriptionStatus { + return v1alpha1.ChoiceSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.ChoiceSubscriptionName(choiceName, caseNumber), + Namespace: testNS, + }, + } +} + +func createSubscriber(caseNumber int) eventingv1alpha1.SubscriberSpec { + uriString := fmt.Sprintf("http://example.com/%d", caseNumber) + return eventingv1alpha1.SubscriberSpec{ + URI: &uriString, + } +} + +func createFilter(caseNumber int) *eventingv1alpha1.SubscriberSpec { + uriString := fmt.Sprintf("http://example.com/filter-%d", caseNumber) + return &eventingv1alpha1.SubscriberSpec{ + URI: &uriString, + } +} diff --git a/pkg/reconciler/choice/controller.go b/pkg/reconciler/choice/controller.go new file mode 100644 index 00000000000..8ca579b0b87 --- /dev/null +++ b/pkg/reconciler/choice/controller.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package choice + +import ( + "context" + + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + "github.com/knative/eventing/pkg/duck" + "github.com/knative/eventing/pkg/reconciler" + "k8s.io/client-go/tools/cache" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + + "github.com/knative/eventing/pkg/client/injection/informers/eventing/v1alpha1/subscription" + "github.com/knative/eventing/pkg/client/injection/informers/messaging/v1alpha1/choice" +) + +const ( + // ReconcilerName is the name of the reconciler + ReconcilerName = "Choices" + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "choice-controller" +) + +// NewController initializes the controller and is called by the generated code +// Registers event handlers to enqueue events +func NewController( + ctx context.Context, + cmw configmap.Watcher, +) *controller.Impl { + + choiceInformer := choice.Get(ctx) + subscriptionInformer := subscription.Get(ctx) + resourceInformer := duck.NewResourceInformer(ctx) + + r := &Reconciler{ + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + choiceLister: choiceInformer.Lister(), + subscriptionLister: subscriptionInformer.Lister(), + } + impl := controller.NewImpl(r, r.Logger, ReconcilerName) + + r.Logger.Info("Setting up event handlers") + + r.resourceTracker = resourceInformer.NewTracker(impl.EnqueueKey, controller.GetTrackerLease(ctx)) + choiceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + // Register handler for Subscriptions that are owned by Choice, so that + // we get notified if they change. + subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("Choice")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + + return impl +} diff --git a/pkg/reconciler/choice/controller_test.go b/pkg/reconciler/choice/controller_test.go new file mode 100644 index 00000000000..7f9f3689438 --- /dev/null +++ b/pkg/reconciler/choice/controller_test.go @@ -0,0 +1,40 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package choice + +import ( + "testing" + + "knative.dev/pkg/configmap" + logtesting "knative.dev/pkg/logging/testing" + . "knative.dev/pkg/reconciler/testing" + + // Fake injection informers + _ "github.com/knative/eventing/pkg/client/injection/informers/eventing/v1alpha1/subscription/fake" + _ "github.com/knative/eventing/pkg/client/injection/informers/messaging/v1alpha1/choice/fake" +) + +func TestNew(t *testing.T) { + defer logtesting.ClearAll() + ctx, _ := SetupFakeContext(t) + + c := NewController(ctx, configmap.NewFixedWatcher()) + + if c == nil { + t.Fatal("Expected NewController to return a non-nil value") + } +} diff --git a/pkg/reconciler/choice/resources/channel.go b/pkg/reconciler/choice/resources/channel.go new file mode 100644 index 00000000000..9ea10f7729d --- /dev/null +++ b/pkg/reconciler/choice/resources/channel.go @@ -0,0 +1,69 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "encoding/json" + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "knative.dev/pkg/kmeta" + + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ChoiceChannelName creates a name for the Channel fronting choice. +func ChoiceChannelName(choiceName string) string { + return fmt.Sprintf("%s-kn-choice", choiceName) +} + +// ChoiceCaseChannelName creates a name for the Channel fronting a specific case +func ChoiceCaseChannelName(choiceName string, caseNumber int) string { + return fmt.Sprintf("%s-kn-choice-%d", choiceName, caseNumber) +} + +// NewChannel returns an unstructured.Unstructured based on the ChannelTemplateSpec +// for a given choice. +func NewChannel(name string, p *v1alpha1.Choice) (*unstructured.Unstructured, error) { + // Set the name of the resource we're creating as well as the namespace, etc. + template := eventingduck.ChannelTemplateSpecInternal{ + TypeMeta: metav1.TypeMeta{ + Kind: p.Spec.ChannelTemplate.Kind, + APIVersion: p.Spec.ChannelTemplate.APIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(p), + }, + Name: name, + Namespace: p.Namespace, + }, + Spec: p.Spec.ChannelTemplate.Spec, + } + raw, err := json.Marshal(template) + if err != nil { + return nil, err + } + u := &unstructured.Unstructured{} + err = json.Unmarshal(raw, u) + if err != nil { + return nil, err + } + return u, nil +} diff --git a/pkg/reconciler/choice/resources/subscription.go b/pkg/reconciler/choice/resources/subscription.go new file mode 100644 index 00000000000..e2b7b26ed52 --- /dev/null +++ b/pkg/reconciler/choice/resources/subscription.go @@ -0,0 +1,100 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + + "knative.dev/pkg/kmeta" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func ChoiceFilterSubscriptionName(choiceName string, caseNumber int) string { + return fmt.Sprintf("%s-kn-choice-filter-%d", choiceName, caseNumber) +} + +func ChoiceSubscriptionName(choiceName string, caseNumber int) string { + return fmt.Sprintf("%s-kn-choice-%d", choiceName, caseNumber) +} + +func NewFilterSubscription(caseNumber int, p *v1alpha1.Choice) *eventingv1alpha1.Subscription { + r := &eventingv1alpha1.Subscription{ + TypeMeta: metav1.TypeMeta{ + Kind: "Subscription", + APIVersion: "eventing.knative.dev/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: p.Namespace, + Name: ChoiceFilterSubscriptionName(p.Name, caseNumber), + + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(p), + }, + }, + Spec: eventingv1alpha1.SubscriptionSpec{ + Channel: corev1.ObjectReference{ + APIVersion: p.Spec.ChannelTemplate.APIVersion, + Kind: p.Spec.ChannelTemplate.Kind, + Name: ChoiceChannelName(p.Name), + }, + Subscriber: p.Spec.Cases[caseNumber].Filter, + }, + } + r.Spec.Reply = &eventingv1alpha1.ReplyStrategy{ + Channel: &corev1.ObjectReference{ + APIVersion: p.Spec.ChannelTemplate.APIVersion, + Kind: p.Spec.ChannelTemplate.Kind, + Name: ChoiceCaseChannelName(p.Name, caseNumber), + }} + return r +} + +func NewSubscription(caseNumber int, p *v1alpha1.Choice) *eventingv1alpha1.Subscription { + r := &eventingv1alpha1.Subscription{ + TypeMeta: metav1.TypeMeta{ + Kind: "Subscription", + APIVersion: "eventing.knative.dev/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: p.Namespace, + Name: ChoiceSubscriptionName(p.Name, caseNumber), + + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(p), + }, + }, + Spec: eventingv1alpha1.SubscriptionSpec{ + Channel: corev1.ObjectReference{ + APIVersion: p.Spec.ChannelTemplate.APIVersion, + Kind: p.Spec.ChannelTemplate.Kind, + Name: ChoiceCaseChannelName(p.Name, caseNumber), + }, + Subscriber: &p.Spec.Cases[caseNumber].Subscriber, + }, + } + + if p.Spec.Cases[caseNumber].Reply != nil { + r.Spec.Reply = &eventingv1alpha1.ReplyStrategy{Channel: p.Spec.Cases[caseNumber].Reply} + } else if p.Spec.Reply != nil { + r.Spec.Reply = &eventingv1alpha1.ReplyStrategy{Channel: p.Spec.Reply} + } + return r +} diff --git a/pkg/reconciler/testing/choice.go b/pkg/reconciler/testing/choice.go new file mode 100644 index 00000000000..3f09ce2de8e --- /dev/null +++ b/pkg/reconciler/testing/choice.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "time" + + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ChoiceOption enables further configuration of a Choice. +type ChoiceOption func(*v1alpha1.Choice) + +// NewChoice creates an Choice with ChoiceOptions. +func NewChoice(name, namespace string, popt ...ChoiceOption) *v1alpha1.Choice { + p := &v1alpha1.Choice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.ChoiceSpec{}, + } + for _, opt := range popt { + opt(p) + } + p.SetDefaults(context.Background()) + return p +} + +func WithInitChoiceConditions(p *v1alpha1.Choice) { + p.Status.InitializeConditions() +} + +func WithChoiceDeleted(p *v1alpha1.Choice) { + deleteTime := metav1.NewTime(time.Unix(1e9, 0)) + p.ObjectMeta.SetDeletionTimestamp(&deleteTime) +} + +func WithChoiceChannelTemplateSpec(cts *eventingduck.ChannelTemplateSpec) ChoiceOption { + return func(p *v1alpha1.Choice) { + p.Spec.ChannelTemplate = cts + } +} + +func WithChoiceCases(cases []v1alpha1.ChoiceCase) ChoiceOption { + return func(p *v1alpha1.Choice) { + p.Spec.Cases = cases + } +} + +func WithChoiceReply(reply *corev1.ObjectReference) ChoiceOption { + return func(p *v1alpha1.Choice) { + p.Spec.Reply = reply + } +} + +func WithChoiceCaseStatuses(caseStatuses []v1alpha1.ChoiceCaseStatus) ChoiceOption { + return func(p *v1alpha1.Choice) { + p.Status.CaseStatuses = caseStatuses + } +} + +func WithChoiceIngressChannelStatus(status v1alpha1.ChoiceChannelStatus) ChoiceOption { + return func(p *v1alpha1.Choice) { + p.Status.IngressChannelStatus = status + } +} + +func WithChoiceChannelsNotReady(reason, message string) ChoiceOption { + return func(p *v1alpha1.Choice) { + p.Status.MarkChannelsNotReady(reason, message) + } +} + +func WithChoiceSubscriptionsNotReady(reason, message string) ChoiceOption { + return func(p *v1alpha1.Choice) { + p.Status.MarkSubscriptionsNotReady(reason, message) + } +} + +func WithChoiceAddressableNotReady(reason, message string) ChoiceOption { + return func(p *v1alpha1.Choice) { + p.Status.MarkAddressableNotReady(reason, message) + } +} diff --git a/pkg/reconciler/testing/listers.go b/pkg/reconciler/testing/listers.go index 0acde9142dd..550ba8bc437 100644 --- a/pkg/reconciler/testing/listers.go +++ b/pkg/reconciler/testing/listers.go @@ -143,6 +143,10 @@ func (l *Listers) GetSequenceLister() messaginglisters.SequenceLister { return messaginglisters.NewSequenceLister(l.indexerFor(&messagingv1alpha1.Sequence{})) } +func (l *Listers) GetChoiceLister() messaginglisters.ChoiceLister { + return messaginglisters.NewChoiceLister(l.indexerFor(&messagingv1alpha1.Choice{})) +} + func (l *Listers) GetCronJobSourceLister() sourcelisters.CronJobSourceLister { return sourcelisters.NewCronJobSourceLister(l.indexerFor(&sourcesv1alpha1.CronJobSource{})) }