From 5d3c30c98e8900515979b8f78dc67e949242b965 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 5 Aug 2019 13:28:20 -0400 Subject: [PATCH] Add e2e test for Choice --- test/base/resources/constants.go | 1 + test/base/resources/kube.go | 23 ++++ test/common/creation.go | 12 +++ test/common/typemeta.go | 3 + test/e2e/choice_test.go | 147 ++++++++++++++++++++++++++ test/test_images/filterevents/main.go | 67 ++++++++++++ 6 files changed, 253 insertions(+) create mode 100644 test/e2e/choice_test.go create mode 100644 test/test_images/filterevents/main.go diff --git a/test/base/resources/constants.go b/test/base/resources/constants.go index 5277172fa9c..2ac470cdf75 100644 --- a/test/base/resources/constants.go +++ b/test/base/resources/constants.go @@ -47,6 +47,7 @@ const ( NatssChannelKind string = "NatssChannel" SequenceKind string = "Sequence" + ChoiceKind string = "Choice" ) // Kind for sources resources. diff --git a/test/base/resources/kube.go b/test/base/resources/kube.go index 87b9817dcf5..24ba3c31929 100644 --- a/test/base/resources/kube.go +++ b/test/base/resources/kube.go @@ -154,6 +154,29 @@ func SequenceStepperPod(name, eventMsgAppender string) *corev1.Pod { } } +// EventFilteringPod creates a Pod that either filter or send the received CloudEvent +func EventFilteringPod(name string, filter bool) *corev1.Pod { + const imageName = "filterevents" + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: imageName, + Image: pkgTest.ImagePath(imageName), + ImagePullPolicy: corev1.PullAlways, + }}, + RestartPolicy: corev1.RestartPolicyAlways, + }, + } + if filter { + pod.Spec.Containers[0].Args = []string{"-filter"} + } + return pod +} + // EventLatencyPod creates a Pod that measures events transfer latency. func EventLatencyPod(name, sink string, eventCount int) *corev1.Pod { const imageName = "latency" diff --git a/test/common/creation.go b/test/common/creation.go index 392fd9a8498..85b1313058a 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -19,6 +19,7 @@ package common import ( eventingduckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + messagingv1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" "github.com/knative/eventing/test/base/resources" corev1 "k8s.io/api/core/v1" @@ -155,6 +156,17 @@ func (client *Client) CreateSequenceOrFail( client.Tracker.AddObj(sequence) } +// CreateChoiceOrFail will create a Choice or fail the test if there is an error. +func (client *Client) CreateChoiceOrFail(choice *messagingv1alpha1.Choice) { + choices := client.Eventing.MessagingV1alpha1().Choices(client.Namespace) + // create choice with the new reference + created, err := choices.Create(choice) + if err != nil { + client.T.Fatalf("Failed to create choice %q: %v", choice.Name, err) + } + client.Tracker.AddObj(created) +} + // CreateCronJobSourceOrFail will create a CronJobSource or fail the test if there is an error. func (client *Client) CreateCronJobSourceOrFail( name, diff --git a/test/common/typemeta.go b/test/common/typemeta.go index ec65e79a5f0..b3b53a4492e 100644 --- a/test/common/typemeta.go +++ b/test/common/typemeta.go @@ -67,6 +67,9 @@ var NatssChannelTypeMeta = MessagingTypeMeta(resources.NatssChannelKind) // SequenceTypeMeta is the TypeMeta ref for Sequence. var SequenceTypeMeta = MessagingTypeMeta(resources.SequenceKind) +// ChoiceTypeMeta is the TypeMeta ref for Choice. +var ChoiceTypeMeta = MessagingTypeMeta(resources.ChoiceKind) + // MessagingTypeMeta returns the TypeMeta ref for an eventing messaing resource. func MessagingTypeMeta(kind string) *metav1.TypeMeta { return &metav1.TypeMeta{ diff --git a/test/e2e/choice_test.go b/test/e2e/choice_test.go new file mode 100644 index 00000000000..4da98b8daab --- /dev/null +++ b/test/e2e/choice_test.go @@ -0,0 +1,147 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "encoding/json" + "fmt" + "testing" + + eventingduckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + eventingtesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/knative/eventing/test/base/resources" + "github.com/knative/eventing/test/common" + "k8s.io/apimachinery/pkg/util/uuid" + pkgTest "knative.dev/pkg/test" +) + +type caseConfig struct { + filter bool +} + +func TestChoice(t *testing.T) { + const ( + senderPodName = "e2e-choice" + ) + table := []struct { + name string + casesConfig []caseConfig + expected string + }{ + { + name: "two-cases-pass-first-case-only", + casesConfig: []caseConfig{ + {filter: false}, + {filter: true}, + }, + expected: "choice-two-cases-pass-first-case-only-case-0-sub", + }, + } + channelTypeMeta := getChannelTypeMeta(common.DefaultChannel) + + client := setup(t, true) + defer tearDown(client) + + for _, tc := range table { + choiceCases := make([]v1alpha1.ChoiceCase, len(tc.casesConfig)) + for caseNumber, cse := range tc.casesConfig { + // construct filter services + filterPodName := fmt.Sprintf("choice-%s-case-%d-filter", tc.name, caseNumber) + filterPod := resources.EventFilteringPod(filterPodName, cse.filter) + client.CreatePodOrFail(filterPod, common.WithService(filterPodName)) + + // construct case subscriber + subPodName := fmt.Sprintf("choice-%s-case-%d-sub", tc.name, caseNumber) + subPod := resources.SequenceStepperPod(subPodName, subPodName) + client.CreatePodOrFail(subPod, common.WithService(subPodName)) + + choiceCases[caseNumber] = v1alpha1.ChoiceCase{ + Filter: &eventingv1alpha1.SubscriberSpec{ + Ref: resources.ServiceRef(filterPodName), + }, + Subscriber: eventingv1alpha1.SubscriberSpec{ + Ref: resources.ServiceRef(subPodName), + }, + } + } + + channelTemplate := &eventingduckv1alpha1.ChannelTemplateSpec{ + TypeMeta: *(channelTypeMeta), + } + + // create logger service for global reply + loggerPodName := fmt.Sprintf("%s-logger", tc.name) + loggerPod := resources.EventLoggerPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + + // create channel as reply of the Choice + // TODO(Fredy-Z): now we'll have to use a channel plus its subscription here, as reply of the Subscription + // must be Addressable. + replyChannelName := fmt.Sprintf("reply-%s", tc.name) + client.CreateChannelOrFail(replyChannelName, channelTypeMeta) + replySubscriptionName := fmt.Sprintf("reply-%s", tc.name) + client.CreateSubscriptionOrFail( + replySubscriptionName, + replyChannelName, + channelTypeMeta, + resources.WithSubscriberForSubscription(loggerPodName), + ) + + choice := eventingtesting.NewChoice(tc.name, client.Namespace, + eventingtesting.WithChoiceChannelTemplateSpec(channelTemplate), + eventingtesting.WithChoiceCases(choiceCases), + eventingtesting.WithChoiceReply(pkgTest.CoreV1ObjectReference(channelTypeMeta.Kind, channelTypeMeta.APIVersion, replyChannelName))) + + client.CreateChoiceOrFail(choice) + + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // send fake CloudEvent to the Choice + msg := fmt.Sprintf("TestChoice %s - ", uuid.NewUUID()) + // NOTE: the eventData format must be CloudEventBaseData, as it needs to be correctly parsed in the stepper service. + eventData := resources.CloudEventBaseData{Message: msg} + eventDataBytes, err := json.Marshal(eventData) + if err != nil { + t.Fatalf("Failed to convert %v to json: %v", eventData, err) + } + event := &resources.CloudEvent{ + Source: senderPodName, + Type: resources.CloudEventDefaultType, + Data: string(eventDataBytes), + Encoding: resources.CloudEventDefaultEncoding, + } + if err := client.SendFakeEventToAddressable( + senderPodName, + tc.name, + common.ChoiceTypeMeta, + event, + ); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the choice %q", tc.name) + } + + // verify the logger service receives the correct transformed event + if err := client.CheckLog(loggerPodName, common.CheckerContains(tc.expected)); err != nil { + t.Fatalf("String %q not found in logs of logger pod %q: %v", tc.expected, loggerPodName, err) + } + } + +} diff --git a/test/test_images/filterevents/main.go b/test/test_images/filterevents/main.go new file mode 100644 index 00000000000..d6ee24c3c0b --- /dev/null +++ b/test/test_images/filterevents/main.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "log" + + cloudevents "github.com/cloudevents/sdk-go" +) + +var ( + filter bool +) + +func init() { + flag.BoolVar(&filter, "filter", false, "Whether to filter the event") +} + +func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error { + ctx := event.Context.AsV03() + + dataBytes, err := event.DataBytes() + if err != nil { + log.Printf("Got Data Error: %s\n", err.Error()) + return err + } + log.Println("Received a new event: ") + log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes) + + if filter { + log.Println("Filter event") + resp.Status = 200 + } else { + log.Println("Reply with event") + resp.RespondWith(200, &event) + } + return nil +} + +func main() { + // parse the command line flags + flag.Parse() + + c, err := cloudevents.NewDefaultClient() + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + log.Printf("listening on 8080") + log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent)) +}