From b073c026e1e4e68505fd408f2ec1786775dac8fb Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 11 Nov 2019 20:56:28 +0200 Subject: [PATCH] wire up controllers / webhooks and add e2e tests --- cmd/controller/main.go | 4 + cmd/webhook/main.go | 9 +- ...200-addressable-resolvers-clusterrole.yaml | 25 ++- config/200-controller-clusterrole.yaml | 10 + config/300-flows-parallel.yaml | 209 ++++++++++++++++++ config/300-flows-sequence.yaml | 142 ++++++++++++ test/base/resources/constants.go | 7 + test/common/creation.go | 23 ++ test/common/typemeta.go | 16 +- test/e2e/flows_parallel_test.go | 143 ++++++++++++ test/e2e/flows_sequence_test.go | 150 +++++++++++++ 11 files changed, 734 insertions(+), 4 deletions(-) create mode 100644 config/300-flows-parallel.yaml create mode 100644 config/300-flows-sequence.yaml create mode 100644 test/e2e/flows_parallel_test.go create mode 100644 test/e2e/flows_sequence_test.go diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 898ff9fef3e..b67125e476d 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -25,6 +25,8 @@ import ( "knative.dev/eventing/pkg/reconciler/broker" "knative.dev/eventing/pkg/reconciler/channel" "knative.dev/eventing/pkg/reconciler/eventtype" + flowsparallel "knative.dev/eventing/pkg/reconciler/flowsparallel" + flowssequence "knative.dev/eventing/pkg/reconciler/flowssequence" "knative.dev/eventing/pkg/reconciler/namespace" "knative.dev/eventing/pkg/reconciler/parallel" "knative.dev/eventing/pkg/reconciler/sequence" @@ -42,5 +44,7 @@ func main() { eventtype.NewController, sequence.NewController, parallel.NewController, + flowsparallel.NewController, + flowssequence.NewController, ) } diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 610e26db877..a959581457c 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + flowsv1alpha1 "knative.dev/eventing/pkg/apis/flows/v1alpha1" messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1" sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" "knative.dev/eventing/pkg/defaultchannel" @@ -66,7 +67,7 @@ func NewResourceAdmissionController(ctx context.Context, cmw configmap.Watcher) // The resources to validate and default. map[schema.GroupVersionKind]resourcesemantics.GenericCRD{ - // For group eventing.knative.dev, + // For group eventing.knative.dev. eventingv1alpha1.SchemeGroupVersion.WithKind("Broker"): &eventingv1alpha1.Broker{}, eventingv1alpha1.SchemeGroupVersion.WithKind("Trigger"): &eventingv1alpha1.Trigger{}, eventingv1alpha1.SchemeGroupVersion.WithKind("EventType"): &eventingv1alpha1.EventType{}, @@ -78,10 +79,14 @@ func NewResourceAdmissionController(ctx context.Context, cmw configmap.Watcher) messagingv1alpha1.SchemeGroupVersion.WithKind("Channel"): &messagingv1alpha1.Channel{}, messagingv1alpha1.SchemeGroupVersion.WithKind("Subscription"): &messagingv1alpha1.Subscription{}, - // For group sources.eventing.knative.dev + // For group sources.eventing.knative.dev. sourcesv1alpha1.SchemeGroupVersion.WithKind("ApiServerSource"): &sourcesv1alpha1.ApiServerSource{}, sourcesv1alpha1.SchemeGroupVersion.WithKind("ContainerSource"): &sourcesv1alpha1.ContainerSource{}, sourcesv1alpha1.SchemeGroupVersion.WithKind("CronJobSource"): &sourcesv1alpha1.CronJobSource{}, + + // For group flows.knative.dev + flowsv1alpha1.SchemeGroupVersion.WithKind("Parallel"): &flowsv1alpha1.Parallel{}, + flowsv1alpha1.SchemeGroupVersion.WithKind("Sequence"): &flowsv1alpha1.Sequence{}, }, // A function that infuses the context passed to Validate/SetDefaults with custom metadata. diff --git a/config/200-addressable-resolvers-clusterrole.yaml b/config/200-addressable-resolvers-clusterrole.yaml index 3654ef08ff5..b1d66f06edb 100644 --- a/config/200-addressable-resolvers-clusterrole.yaml +++ b/config/200-addressable-resolvers-clusterrole.yaml @@ -131,4 +131,27 @@ rules: verbs: - get - list - - watch \ No newline at end of file + - watch + +--- + +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: flows-addressable-resolver + labels: + eventing.knative.dev/release: devel + duck.knative.dev/addressable: "true" +# Do not use this role directly. These rules will be added to the "addressable-resolver" role. +rules: +- apiGroups: + - flows.knative.dev + resources: + - sequences + - sequences/status + - parallels + - parallels/status + verbs: + - get + - list + - watch diff --git a/config/200-controller-clusterrole.yaml b/config/200-controller-clusterrole.yaml index 1a1c7c4fd01..d8e8dcd8e84 100644 --- a/config/200-controller-clusterrole.yaml +++ b/config/200-controller-clusterrole.yaml @@ -86,6 +86,16 @@ rules: - "subscriptions/status" verbs: *everything + # Flow resources and statuses we care about. + - apiGroups: + - "flows.knative.dev" + resources: + - "sequences" + - "sequences/status" + - "parallels" + - "parallels/status" + verbs: *everything + # Messaging resources and finalizers we care about. - apiGroups: - "messaging.knative.dev" diff --git a/config/300-flows-parallel.yaml b/config/300-flows-parallel.yaml new file mode 100644 index 00000000000..185d4490012 --- /dev/null +++ b/config/300-flows-parallel.yaml @@ -0,0 +1,209 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: parallels.flows.knative.dev + labels: + eventing.knative.dev/release: devel + knative.dev/crd-install: "true" + duck.knative.dev/addressable: "true" +spec: + group: flows.knative.dev + version: v1alpha1 + names: + kind: Parallel + plural: parallels + singular: parallel + categories: + - all + - knative + - eventing + - flows + scope: Namespaced + subresources: + status: {} + additionalPrinterColumns: + - name: Ready + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason" + - name: URL + type: string + JSONPath: .status.address.url + - name: Age + type: date + JSONPath: .metadata.creationTimestamp + validation: + openAPIV3Schema: + properties: + spec: + required: + - branches + - channelTemplate + properties: + branches: + type: array + description: "the list of filter/subscribers pairs." + items: + type: object + required: + - subscriber + properties: + filter: + type: object + description: "the destination of the filter expression that is guarding the branch." + properties: + ref: + type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + uri: + type: string + description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." + subscriber: + type: object + description: "the destination of the events if the filter passes." + properties: + ref: + type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + uri: + type: string + description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." + reply: + description: "a reference to where the result of the subscriber of this branch gets sent to. If not specified, the result is sent to the Parallel reply." + anyOf: + - type: object + properties: + uri: + type: string + description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." + minLength: 1 + - type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + properties: + ref: + type: object + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + - type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + channelTemplate: + type: object + description: "specifies which Channel to use. If left unspecified, it is set to the default Channel for the namespace (or cluster, in case there are no defaults for the namespace)." + required: + - apiVersion + - kind + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + spec: + type: object + reply: + type: object + description: "a reference to where the result of a branch subscriber gets sent to when the branch does not have a reply." + anyOf: + - type: object + properties: + uri: + type: string + description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." + minLength: 1 + - type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + properties: + ref: + type: object + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + - type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 diff --git a/config/300-flows-sequence.yaml b/config/300-flows-sequence.yaml new file mode 100644 index 00000000000..f4ca9c8d4d7 --- /dev/null +++ b/config/300-flows-sequence.yaml @@ -0,0 +1,142 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: sequences.flows.knative.dev + labels: + eventing.knative.dev/release: devel + knative.dev/crd-install: "true" + duck.knative.dev/addressable: "true" +spec: + group: flows.knative.dev + versions: + - name: v1alpha1 + served: true + storage: true + names: + kind: Sequence + plural: sequences + singular: sequence + categories: + - all + - knative + - eventing + - flows + scope: Namespaced + subresources: + status: {} + additionalPrinterColumns: + - name: Ready + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason" + - name: URL + type: string + JSONPath: .status.address.url + - name: Age + type: date + JSONPath: .metadata.creationTimestamp + validation: + openAPIV3Schema: + properties: + spec: + required: + - steps + - channelTemplate + properties: + steps: + type: array + description: "the list of Destinations (processors / functions) that will be called in the order provided." + items: + type: object + description: "a processor / function in the Sequence." + properties: + ref: + type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + uri: + type: string + description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." + channelTemplate: + type: object + description: "specifies which Channel to use. If left unspecified, it is set to the default Channel for the namespace (or cluster, in case there are no defaults for the namespace)." + required: + - apiVersion + - kind + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + spec: + type: object + reply: + description: "a reference to where the result of the last subscriber gets sent to." + anyOf: + - type: object + properties: + uri: + type: string + description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." + minLength: 1 + - type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + properties: + ref: + type: object + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + - type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 diff --git a/test/base/resources/constants.go b/test/base/resources/constants.go index a83bbf1a8bc..557cd978ea6 100644 --- a/test/base/resources/constants.go +++ b/test/base/resources/constants.go @@ -25,6 +25,7 @@ const ( EventingAPIVersion = "eventing.knative.dev/v1alpha1" SourcesAPIVersion = "sources.eventing.knative.dev/v1alpha1" MessagingAPIVersion = "messaging.knative.dev/v1alpha1" + FlowsAPIVersion = "flows.knative.dev/v1alpha1" ) // Kind for core Kubernetes resources. @@ -49,6 +50,12 @@ const ( ParallelKind string = "Parallel" ) +// Kind for flows resources. +const ( + FlowsSequenceKind string = "Sequence" + FlowsParallelKind string = "Parallel" +) + // Kind for sources resources. const ( CronJobSourceKind string = "CronJobSource" diff --git a/test/common/creation.go b/test/common/creation.go index 13c5a8a48e0..b8ee446dd05 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + flowsv1alpha1 "knative.dev/eventing/pkg/apis/flows/v1alpha1" messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1" sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" "knative.dev/eventing/test/base" @@ -143,6 +144,17 @@ func (client *Client) CreateSequenceOrFail(sequence *messagingv1alpha1.Sequence) client.Tracker.AddObj(sequence) } +// CreateFlowsSequenceOrFail will create a Sequence (in flows.knative.dev api group) or +// fail the test if there is an error. +func (client *Client) CreateFlowsSequenceOrFail(sequence *flowsv1alpha1.Sequence) { + sequences := client.Eventing.FlowsV1alpha1().Sequences(client.Namespace) + _, err := sequences.Create(sequence) + if err != nil { + client.T.Fatalf("Failed to create sequence %q: %v", sequence.Name, err) + } + client.Tracker.AddObj(sequence) +} + // CreateParallelOrFail will create a Parallel or fail the test if there is an error. func (client *Client) CreateParallelOrFail(parallel *messagingv1alpha1.Parallel) { parallels := client.Eventing.MessagingV1alpha1().Parallels(client.Namespace) @@ -153,6 +165,17 @@ func (client *Client) CreateParallelOrFail(parallel *messagingv1alpha1.Parallel) client.Tracker.AddObj(parallel) } +// CreateFlowsParallelOrFail will create a Parallel (in flows.knative.dev api group) or +// fail the test if there is an error. +func (client *Client) CreateFlowsParallelOrFail(parallel *flowsv1alpha1.Parallel) { + parallels := client.Eventing.FlowsV1alpha1().Parallels(client.Namespace) + _, err := parallels.Create(parallel) + if err != nil { + client.T.Fatalf("Failed to create flows parallel %q: %v", parallel.Name, err) + } + client.Tracker.AddObj(parallel) +} + // CreateCronJobSourceOrFail will create a CronJobSource or fail the test if there is an error. func (client *Client) CreateCronJobSourceOrFail(cronJobSource *sourcesv1alpha1.CronJobSource) { cronJobSourceInterface := client.Eventing.SourcesV1alpha1().CronJobSources(client.Namespace) diff --git a/test/common/typemeta.go b/test/common/typemeta.go index 6c501e27196..595e9524540 100644 --- a/test/common/typemeta.go +++ b/test/common/typemeta.go @@ -69,10 +69,24 @@ var SequenceTypeMeta = MessagingTypeMeta(resources.SequenceKind) // ParallelTypeMeta is the TypeMeta ref for Parallel. var ParallelTypeMeta = MessagingTypeMeta(resources.ParallelKind) -// MessagingTypeMeta returns the TypeMeta ref for an eventing messaing resource. +// MessagingTypeMeta returns the TypeMeta ref for an eventing messaging resource. func MessagingTypeMeta(kind string) *metav1.TypeMeta { return &metav1.TypeMeta{ Kind: kind, APIVersion: resources.MessagingAPIVersion, } } + +// FlowsParallelTypeMeta is the TypeMeta ref for Parallel (in flows.knative.dev). +var FlowsParallelTypeMeta = FlowsTypeMeta(resources.FlowsParallelKind) + +// FlowsSequenceTypeMeta is the TypeMeta ref for Sequence (in flows.knative.dev). +var FlowsSequenceTypeMeta = FlowsTypeMeta(resources.FlowsSequenceKind) + +// FlowsTypeMeta returns the TypeMeta ref for an eventing messaing resource. +func FlowsTypeMeta(kind string) *metav1.TypeMeta { + return &metav1.TypeMeta{ + Kind: kind, + APIVersion: resources.FlowsAPIVersion, + } +} diff --git a/test/e2e/flows_parallel_test.go b/test/e2e/flows_parallel_test.go new file mode 100644 index 00000000000..562140aba01 --- /dev/null +++ b/test/e2e/flows_parallel_test.go @@ -0,0 +1,143 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "encoding/json" + "fmt" + "testing" + + "k8s.io/apimachinery/pkg/util/uuid" + eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + "knative.dev/eventing/pkg/apis/flows/v1alpha1" + eventingtesting "knative.dev/eventing/pkg/reconciler/testing" + "knative.dev/eventing/test/base/resources" + "knative.dev/eventing/test/common" + duckv1 "knative.dev/pkg/apis/duck/v1" + pkgTest "knative.dev/pkg/test" +) + +func TestFlowsParallel(t *testing.T) { + const ( + senderPodName = "e2e-parallel" + ) + table := []struct { + name string + branchesConfig []branchConfig + expected string + }{ + { + name: "two-branches-pass-first-branch-only", + branchesConfig: []branchConfig{ + {filter: false}, + {filter: true}, + }, + expected: "parallel-two-branches-pass-first-branch-only-branch-0-sub", + }, + } + channelTypeMeta := getChannelTypeMeta(common.DefaultChannel) + + client := setup(t, true) + defer tearDown(client) + + for _, tc := range table { + parallelBranches := make([]v1alpha1.ParallelBranch, len(tc.branchesConfig)) + for branchNumber, cse := range tc.branchesConfig { + // construct filter services + filterPodName := fmt.Sprintf("parallel-%s-branch-%d-filter", tc.name, branchNumber) + filterPod := resources.EventFilteringPod(filterPodName, cse.filter) + client.CreatePodOrFail(filterPod, common.WithService(filterPodName)) + + // construct branch subscriber + subPodName := fmt.Sprintf("parallel-%s-branch-%d-sub", tc.name, branchNumber) + subPod := resources.SequenceStepperPod(subPodName, subPodName) + client.CreatePodOrFail(subPod, common.WithService(subPodName)) + + parallelBranches[branchNumber] = v1alpha1.ParallelBranch{ + Filter: &duckv1.Destination{ + Ref: resources.ServiceRef(filterPodName), + }, + Subscriber: duckv1.Destination{ + Ref: resources.ServiceRef(subPodName), + }, + } + } + + channelTemplate := &eventingduckv1alpha1.ChannelTemplateSpec{ + TypeMeta: *(channelTypeMeta), + } + + // create logger service for global reply + loggerPodName := fmt.Sprintf("%s-logger", tc.name) + loggerPod := resources.EventLoggerPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + + // create channel as reply of the Parallel + // TODO(Fredy-Z): now we'll have to use a channel plus its subscription here, as reply of the Subscription + // must be Addressable. + replyChannelName := fmt.Sprintf("reply-%s", tc.name) + client.CreateChannelOrFail(replyChannelName, channelTypeMeta) + replySubscriptionName := fmt.Sprintf("reply-%s", tc.name) + client.CreateSubscriptionOrFail( + replySubscriptionName, + replyChannelName, + channelTypeMeta, + resources.WithSubscriberForSubscription(loggerPodName), + ) + + parallel := eventingtesting.NewFlowsParallel(tc.name, client.Namespace, + eventingtesting.WithFlowsParallelChannelTemplateSpec(channelTemplate), + eventingtesting.WithFlowsParallelBranches(parallelBranches), + eventingtesting.WithFlowsParallelReply(&duckv1.Destination{Ref: pkgTest.CoreV1ObjectReference(channelTypeMeta.Kind, channelTypeMeta.APIVersion, replyChannelName)})) + + client.CreateFlowsParallelOrFail(parallel) + + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // send fake CloudEvent to the Parallel + msg := fmt.Sprintf("TestFlowParallel %s - ", uuid.NewUUID()) + // NOTE: the eventData format must be CloudEventBaseData, as it needs to be correctly parsed in the stepper service. + eventData := resources.CloudEventBaseData{Message: msg} + eventDataBytes, err := json.Marshal(eventData) + if err != nil { + t.Fatalf("Failed to convert %v to json: %v", eventData, err) + } + event := &resources.CloudEvent{ + Source: senderPodName, + Type: resources.CloudEventDefaultType, + Data: string(eventDataBytes), + Encoding: resources.CloudEventDefaultEncoding, + } + if err := client.SendFakeEventToAddressable( + senderPodName, + tc.name, + common.FlowsParallelTypeMeta, + event, + ); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the parallel %q : %s", tc.name, err) + } + + // verify the logger service receives the correct transformed event + if err := client.CheckLog(loggerPodName, common.CheckerContains(tc.expected)); err != nil { + t.Fatalf("String %q not found in logs of logger pod %q: %v", tc.expected, loggerPodName, err) + } + } + +} diff --git a/test/e2e/flows_sequence_test.go b/test/e2e/flows_sequence_test.go new file mode 100644 index 00000000000..21f705de68e --- /dev/null +++ b/test/e2e/flows_sequence_test.go @@ -0,0 +1,150 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "encoding/json" + "fmt" + "testing" + + "k8s.io/apimachinery/pkg/util/uuid" + + eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + eventingtesting "knative.dev/eventing/pkg/reconciler/testing" + "knative.dev/eventing/test/base/resources" + "knative.dev/eventing/test/common" + + duckv1 "knative.dev/pkg/apis/duck/v1" + pkgTest "knative.dev/pkg/test" +) + +func TestFlowsSequence(t *testing.T) { + const ( + sequenceName = "e2e-sequence" + senderPodName = "e2e-sequence-sender-pod" + + channelName = "e2e-sequence-channel" + subscriptionName = "e2e-sequence-subscription" + loggerPodName = "e2e-sequence-logger-pod" + ) + stepSubscriberConfigs := []struct { + podName string + msgAppender string + }{{ + podName: "e2e-stepper1", + msgAppender: "-step1", + }, { + podName: "e2e-stepper2", + msgAppender: "-step2", + }, { + podName: "e2e-stepper3", + msgAppender: "-step3", + }} + channelTypeMeta := getChannelTypeMeta(common.DefaultChannel) + + client := setup(t, true) + defer tearDown(client) + + // construct steps for the sequence + steps := make([]duckv1.Destination, 0) + for _, config := range stepSubscriberConfigs { + // create a stepper Pod with Service + podName := config.podName + msgAppender := config.msgAppender + stepperPod := resources.SequenceStepperPod(podName, msgAppender) + + client.CreatePodOrFail(stepperPod, common.WithService(podName)) + // create a new step + step := duckv1.Destination{ + Ref: resources.ServiceRef(podName), + } + // add the step into steps + steps = append(steps, step) + } + + // create channelTemplate for the Sequence + channelTemplate := &eventingduckv1alpha1.ChannelTemplateSpec{ + TypeMeta: *(channelTypeMeta), + } + + // create channel as reply of the Sequence + // TODO(Fredy-Z): now we'll have to use a channel plus its subscription here, as reply of the Sequence + // must be Addressable. In the future if we use Knative Serving in the tests, we can + // make the logger service as a Knative service, and remove the channel and subscription. + client.CreateChannelOrFail(channelName, channelTypeMeta) + // create logger service as the subscriber + loggerPod := resources.EventLoggerPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + // create subscription to subscribe the channel, and forward the received events to the logger service + client.CreateSubscriptionOrFail( + subscriptionName, + channelName, + channelTypeMeta, + resources.WithSubscriberForSubscription(loggerPodName), + ) + replyRef := pkgTest.CoreV1ObjectReference(channelTypeMeta.Kind, channelTypeMeta.APIVersion, channelName) + + // create the sequence object + sequence := eventingtesting.NewFlowsSequence( + sequenceName, + client.Namespace, + eventingtesting.WithFlowsSequenceSteps(steps), + eventingtesting.WithFlowsSequenceChannelTemplateSpec(channelTemplate), + eventingtesting.WithFlowsSequenceReply(&duckv1.Destination{Ref: replyRef}), + ) + + // create Sequence or fail the test if there is an error + client.CreateFlowsSequenceOrFail(sequence) + + // wait for all test resources to be ready, so that we can start sending events + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // send fake CloudEvent to the Sequence + msg := fmt.Sprintf("TestSequence %s", uuid.NewUUID()) + // NOTE: the eventData format must be CloudEventBaseData, as it needs to be correctly parsed in the stepper service. + eventData := resources.CloudEventBaseData{Message: msg} + eventDataBytes, err := json.Marshal(eventData) + if err != nil { + t.Fatalf("Failed to convert %v to json: %v", eventData, err) + } + event := &resources.CloudEvent{ + Source: senderPodName, + Type: resources.CloudEventDefaultType, + Data: string(eventDataBytes), + Encoding: resources.CloudEventDefaultEncoding, + } + if err := client.SendFakeEventToAddressable( + senderPodName, + sequenceName, + common.FlowsSequenceTypeMeta, + event, + ); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the sequence %q : %s", sequenceName, err) + } + + // verify the logger service receives the correct transformed event + expectedMsg := msg + for _, config := range stepSubscriberConfigs { + expectedMsg += config.msgAppender + } + if err := client.CheckLog(loggerPodName, common.CheckerContains(expectedMsg)); err != nil { + t.Fatalf("String %q not found in logs of logger pod %q: %v", expectedMsg, loggerPodName, err) + } +}