diff --git a/test/crd.go b/test/crd.go index e4a000b2c59..52acf69c6fb 100644 --- a/test/crd.go +++ b/test/crd.go @@ -31,7 +31,7 @@ const ( servingApiVersion = "serving.knative.dev/v1alpha1" ) -// Route returns a Route object in namespace +// Route returns a Route object in namespace. func Route(name string, namespace string, configName string) *servingv1alpha1.Route { return &servingv1alpha1.Route{ ObjectMeta: metav1.ObjectMeta{ @@ -72,17 +72,17 @@ func Configuration(name string, namespace string, imagePath string) *servingv1al } } -// ClusterChannelProvisioner returns a ClusterChannelProvisioner for a given name +// ClusterChannelProvisioner returns a ClusterChannelProvisioner for a given name. func ClusterChannelProvisioner(name string) *corev1.ObjectReference { return pkgTest.CoreV1ObjectReference("ClusterChannelProvisioner", eventsApiVersion, name) } -// ChannelRef returns an ObjectReference for a given Channel Name +// ChannelRef returns an ObjectReference for a given Channel Name. func ChannelRef(name string) *corev1.ObjectReference { return pkgTest.CoreV1ObjectReference("Channel", eventsApiVersion, name) } -// Channel returns a Channel with the specified provisioner +// Channel returns a Channel with the specified provisioner. func Channel(name string, namespace string, provisioner *corev1.ObjectReference) *v1alpha1.Channel { return &v1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ @@ -95,7 +95,7 @@ func Channel(name string, namespace string, provisioner *corev1.ObjectReference) } } -// SubscriberSpecForRoute returns a SubscriberSpec for a given Knative Service. +// SubscriberSpecForRoute returns a SubscriberSpec for a given Knative Route. func SubscriberSpecForRoute(name string) *v1alpha1.SubscriberSpec { return &v1alpha1.SubscriberSpec{ Ref: pkgTest.CoreV1ObjectReference("Route", servingApiVersion, name), @@ -109,7 +109,14 @@ func SubscriberSpecForService(name string) *v1alpha1.SubscriberSpec { } } -// Subscription returns a Subscription +// ReplyStrategyForChannel returns a ReplyStrategy for a given Channel. +func ReplyStrategyForChannel(name string) *v1alpha1.ReplyStrategy { + return &v1alpha1.ReplyStrategy{ + Channel: pkgTest.CoreV1ObjectReference("Channel", eventsApiVersion, name), + } +} + +// Subscription returns a Subscription. func Subscription(name string, namespace string, channel *corev1.ObjectReference, subscriber *v1alpha1.SubscriberSpec, reply *v1alpha1.ReplyStrategy) *v1alpha1.Subscription { return &v1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ @@ -154,10 +161,12 @@ type TypeAndSource struct { const ( CloudEventEncodingBinary = "binary" CloudEventEncodingStructured = "structured" + CloudEventDefaultEncoding = CloudEventEncodingBinary + CloudEventDefaultType = "dev.knative.test.event" ) // EventSenderPod creates a Pod that sends a single event to the given address. -func EventSenderPod(name string, namespace string, sink string, event CloudEvent) *corev1.Pod { +func EventSenderPod(name string, namespace string, sink string, event *CloudEvent) *corev1.Pod { if event.Encoding == "" { event.Encoding = CloudEventEncodingBinary } @@ -214,6 +223,30 @@ func EventLoggerPod(name string, namespace string, selector map[string]string) * } } +// EventTransformationPod creates a Pod that transforms events received. +func EventTransformationPod(name string, namespace string, selector map[string]string, msgPostfix string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: selector, + Annotations: map[string]string{"sidecar.istio.io/inject": "true"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "transformevents", + Image: pkgTest.ImagePath("transformevents"), + ImagePullPolicy: corev1.PullAlways, // TODO: this might not be wanted for local. + Args: []string{ + "-msg-postfix", + msgPostfix, + }, + }}, + RestartPolicy: corev1.RestartPolicyAlways, + }, + } +} + // Service creates a Kubernetes Service with the given name, namespace, and // selector. Port 8080 is assumed the target port. func Service(name string, namespace string, selector map[string]string) *corev1.Service { diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 1903ce9c378..4051b5d0283 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -87,6 +87,6 @@ function dump_extra_cluster_state() { initialize $@ -go_test_e2e ./test/e2e || fail_test +go_test_e2e -timeout=20m ./test/e2e || fail_test success diff --git a/test/e2e/broker_trigger_test.go b/test/e2e/broker_trigger_test.go index 3a4a0b2f6ac..33464ac6e64 100644 --- a/test/e2e/broker_trigger_test.go +++ b/test/e2e/broker_trigger_test.go @@ -59,7 +59,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { clients, cleaner := Setup(t, t.Logf) // Verify namespace exists. - ns, cleanupNS := NamespaceExists(t, clients, t.Logf) + ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) defer cleanupNS() defer TearDown(clients, cleaner, t.Logf) @@ -186,7 +186,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { // Create cloud event. // Using event type and source as part of the body for easier debugging. body := fmt.Sprintf("Body-%s-%s", eventToSend.Type, eventToSend.Source) - cloudEvent := test.CloudEvent{ + cloudEvent := &test.CloudEvent{ Source: eventToSend.Source, Type: eventToSend.Type, Data: fmt.Sprintf(`{"msg":%q}`, body), diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go new file mode 100644 index 00000000000..0657f505188 --- /dev/null +++ b/test/e2e/channel_chain_test.go @@ -0,0 +1,115 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/test" + "k8s.io/apimachinery/pkg/util/uuid" +) + +/* +TestChannelChain tests the following scenario: + +EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> Service(Logger) + +*/ +func TestChannelChain(t *testing.T) { + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + + const ( + senderName = "e2e-channelchain-sender" + loggerPodName = "e2e-channelchain-logger-pod" + ) + channelNames := [2]string{"e2e-channelchain1", "e2e-channelchain2"} + // subscriptionNames1 corresponds to Subscriptions on channelNames[0] + subscriptionNames1 := [2]string{"e2e-complexscen-subs11", "e2e-complexscen-subs12"} + // subscriptionNames2 corresponds to Subscriptions on channelNames[1] + subscriptionNames2 := [1]string{"e2e-complexscen-subs21"} + + clients, cleaner := Setup(t, t.Logf) + // verify namespace + ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) + defer cleanupNS() + + // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all + // resources in it. So when TearDown() runs, it spews a lot of not found errors. + defer TearDown(clients, cleaner, t.Logf) + + // create loggerPod and expose it as a service + t.Logf("creating logger pod") + selector := map[string]string{"e2etest": string(uuid.NewUUID())} + loggerPod := test.EventLoggerPod(loggerPodName, ns, selector) + loggerSvc := test.Service(loggerPodName, ns, selector) + loggerPod, err := CreatePodAndServiceReady(clients, loggerPod, loggerSvc, ns, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err) + } + + // create channels + t.Logf("Creating Channel and Subscription") + channels := make([]*v1alpha1.Channel, 0) + for _, channelName := range channelNames { + channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) + t.Logf("channel: %#v", channel) + channels = append(channels, channel) + } + + // create subscriptions + subs := make([]*v1alpha1.Subscription, 0) + // create subscriptions that subscribe the first channel, and reply events directly to the second channel + for _, subscriptionName := range subscriptionNames1 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), nil, test.ReplyStrategyForChannel(channelNames[1])) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + // create subscriptions that subscribe the second channel, and call the logging service + for _, subscriptionName := range subscriptionNames2 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(loggerPodName), nil) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + + // wait for all channels and subscriptions to become ready + if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil { + t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err) + } + + // send fake CloudEvent to the first channel + body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID()) + event := &test.CloudEvent{ + Source: senderName, + Type: test.CloudEventDefaultType, + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventDefaultEncoding, + } + if err := SendFakeEventToChannel(clients, event, channels[0], ns, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the channel %q", channels[0].Name) + } + + // check if the logging service receives the correct number of event messages + expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) + if err := WaitForLogContentCount(clients, loggerPodName, loggerPod.Spec.Containers[0].Name, body, expectedContentCount); err != nil { + t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", body, expectedContentCount, loggerPodName, err) + } +} diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 700fa18ecd0..e679b4102e8 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -127,36 +127,48 @@ func CreateSubscription(clients *test.Clients, sub *v1alpha1.Subscription, _ log return nil } -// WithChannelAndSubscriptionReady creates a Channel and Subscription and waits until both are Ready. -func WithChannelAndSubscriptionReady(clients *test.Clients, channel *v1alpha1.Channel, sub *v1alpha1.Subscription, logf logging.FormatLogger, cleaner *test.Cleaner) error { - if err := CreateChannel(clients, channel, logf, cleaner); err != nil { - return err - } - if err := CreateSubscription(clients, sub, logf, cleaner); err != nil { - return err +// WithChannelsAndSubscriptionsReady creates Channels and Subscriptions and waits until all are Ready. +// When they are ready, chans and subs are altered to get the real Channels and Subscriptions. +func WithChannelsAndSubscriptionsReady(clients *test.Clients, chans *[]*v1alpha1.Channel, subs *[]*v1alpha1.Subscription, logf logging.FormatLogger, cleaner *test.Cleaner) error { + for _, channel := range *chans { + if err := CreateChannel(clients, channel, logf, cleaner); err != nil { + return err + } } channels := clients.Eventing.EventingV1alpha1().Channels(pkgTest.Flags.Namespace) - if err := test.WaitForChannelState(channels, channel.Name, test.IsChannelReady, "ChannelIsReady"); err != nil { - return err + for i, channel := range *chans { + if err := test.WaitForChannelState(channels, channel.Name, test.IsChannelReady, "ChannelIsReady"); err != nil { + return err + } + // Update the given object so they'll reflect the ready state. + updatedchannel, err := channels.Get(channel.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updatedchannel.DeepCopyInto(channel) + (*chans)[i] = channel } - // Update the given object so they'll reflect the ready state - updatedchannel, err := channels.Get(channel.Name, metav1.GetOptions{}) - if err != nil { - return err + + for _, sub := range *subs { + if err := CreateSubscription(clients, sub, logf, cleaner); err != nil { + return err + } } - updatedchannel.DeepCopyInto(channel) subscriptions := clients.Eventing.EventingV1alpha1().Subscriptions(pkgTest.Flags.Namespace) - if err = test.WaitForSubscriptionState(subscriptions, sub.Name, test.IsSubscriptionReady, "SubscriptionIsReady"); err != nil { - return err - } - // Update the given object so they'll reflect the ready state - updatedsub, err := subscriptions.Get(sub.Name, metav1.GetOptions{}) - if err != nil { - return err + for i, sub := range *subs { + if err := test.WaitForSubscriptionState(subscriptions, sub.Name, test.IsSubscriptionReady, "SubscriptionIsReady"); err != nil { + return err + } + // Update the given object so they'll reflect the ready state. + updatedsub, err := subscriptions.Get(sub.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updatedsub.DeepCopyInto(sub) + (*subs)[i] = sub } - updatedsub.DeepCopyInto(sub) return nil } @@ -285,6 +297,29 @@ func CreateServiceAccountAndBinding(clients *test.Clients, name string, logf log return nil } +// CreatePodAndServiceReady will create a Pod and Service, and wait for them to become ready +func CreatePodAndServiceReady(clients *test.Clients, pod *corev1.Pod, svc *corev1.Service, ns string, logf logging.FormatLogger, cleaner *test.Cleaner) (*corev1.Pod, error) { + if err := CreatePod(clients, pod, logf, cleaner); err != nil { + return nil, fmt.Errorf("Failed to create pod: %v", err) + } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + return nil, fmt.Errorf("Error waiting for pod to become running: %v", err) + } + logf("Pod %q starts running", pod.Name) + + if err := CreateService(clients, svc, logf, cleaner); err != nil { + return nil, fmt.Errorf("Failed to create service: %v", err) + } + + // Reload pod to get IP + pod, err := clients.Kube.Kube.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("Failed to get pod: %v", err) + } + + return pod, nil +} + // CreateService will create a Service func CreateService(clients *test.Clients, svc *corev1.Service, _ logging.FormatLogger, cleaner *test.Cleaner) error { svcs := clients.Kube.Kube.CoreV1().Services(svc.GetNamespace()) @@ -306,6 +341,23 @@ func CreatePod(clients *test.Clients, pod *corev1.Pod, _ logging.FormatLogger, c return nil } +// SendFakeEventToChannel will create fake CloudEvent and send it to the given channel. +func SendFakeEventToChannel(clients *test.Clients, event *test.CloudEvent, channel *v1alpha1.Channel, ns string, logf logging.FormatLogger, cleaner *test.Cleaner) error { + logf("Sending fake CloudEvent") + logf("Creating event sender pod") + url := fmt.Sprintf("http://%s", channel.Status.Address.Hostname) + pod := test.EventSenderPod(event.Source, ns, url, event) + logf("Sender pod: %#v", pod) + if err := CreatePod(clients, pod, logf, cleaner); err != nil { + return err + } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + return err + } + logf("Sender pod starts running") + return nil +} + // WaitForLogContents waits until logs for given Pod/Container include the given contents. // If the contents are not present within timeout it returns error. func WaitForLogContents(clients *test.Clients, logf logging.FormatLogger, podName string, containerName string, namespace string, contents []string) error { @@ -326,6 +378,19 @@ func WaitForLogContents(clients *test.Clients, logf logging.FormatLogger, podNam }) } +// WaitForLogContentCount checks if the number of substr occur times equals the given number. +// If the content does not appear the given times it returns error. +func WaitForLogContentCount(client *test.Clients, podName, containerName, content string, appearTimes int) error { + return wait.PollImmediate(interval, timeout, func() (bool, error) { + logs, err := client.Kube.PodLogs(podName, containerName) + if err != nil { + return true, err + } + + return strings.Count(string(logs), content) == appearTimes, nil + }) +} + // FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents. // It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true. func FindAnyLogContents(clients *test.Clients, logf logging.FormatLogger, podName string, containerName string, namespace string, contents []string) (bool, error) { @@ -368,7 +433,8 @@ func LabelNamespace(clients *test.Clients, logf logging.FormatLogger, labels map return err } -func NamespaceExists(t *testing.T, clients *test.Clients, logf logging.FormatLogger) (string, func()) { +// CreateNamespaceIfNeeded creates a new namespace if it does not exist +func CreateNamespaceIfNeeded(t *testing.T, clients *test.Clients, logf logging.FormatLogger) (string, func()) { shutdown := func() {} ns := pkgTest.Flags.Namespace logf("Namespace: %s", ns) diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go new file mode 100644 index 00000000000..deeb8a47e26 --- /dev/null +++ b/test/e2e/event_transformation_test.go @@ -0,0 +1,136 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/test" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/uuid" +) + +/* +TestEventTransformation tests the following scenario: + + 1 2 5 6 7 +EventSource ---> Channel ---> Subscription ---> Channel ---> Subscription ----> Service(Logger) + | ^ + 3 | | 4 + | | + | --------- + -----------> Service(Transformation) +*/ +func TestEventTransformation(t *testing.T) { + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + + senderName := "e2e-eventtransformation-sender" + msgPostfix := string(uuid.NewUUID()) + channelNames := [2]string{"e2e-eventtransformation1", "e2e-eventtransformation2"} + // subscriptionNames1 corresponds to Subscriptions on channelNames[0] + subscriptionNames1 := []string{"e2e-eventtransformation-subs11"} + // subscriptionNames2 corresponds to Subscriptions on channelNames[1] + subscriptionNames2 := []string{"e2e-eventtransformation-subs21", "e2e-eventtransformation-subs22"} + transformationPodName := "e2e-eventtransformation-transformation-pod" + loggerPodName := "e2e-eventtransformation-logger-pod" + + clients, cleaner := Setup(t, t.Logf) + // verify namespace + ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) + defer cleanupNS() + + // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all + // resources in it. So when TearDown() runs, it spews a lot of not found errors. + defer TearDown(clients, cleaner, t.Logf) + + // create subscriberPods and expose them as services + t.Logf("creating subscriber pods") + subscriberPods := make([]*corev1.Pod, 0) + + // create transformation pod and service + transformationPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())} + transformationPod := test.EventTransformationPod(transformationPodName, ns, transformationPodSelector, msgPostfix) + transformationSvc := test.Service(transformationPodName, ns, transformationPodSelector) + transformationPod, err := CreatePodAndServiceReady(clients, transformationPod, transformationSvc, ns, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create transformation pod and service, and get them ready: %v", err) + } + subscriberPods = append(subscriberPods, transformationPod) + // create logger pod and service + loggerPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())} + loggerPod := test.EventLoggerPod(loggerPodName, ns, loggerPodSelector) + loggerSvc := test.Service(loggerPodName, ns, loggerPodSelector) + loggerPod, err = CreatePodAndServiceReady(clients, loggerPod, loggerSvc, ns, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err) + } + subscriberPods = append(subscriberPods, loggerPod) + + // create channels + t.Logf("Creating Channel and Subscription") + channels := make([]*v1alpha1.Channel, 0) + for _, channelName := range channelNames { + channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) + t.Logf("channel: %#v", channel) + + channels = append(channels, channel) + } + + // create subscriptions + subs := make([]*v1alpha1.Subscription, 0) + // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward the transformed events to the second channel + for _, subscriptionName := range subscriptionNames1 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), test.SubscriberSpecForService(transformationPodName), test.ReplyStrategyForChannel(channelNames[1])) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + // create subscriptions that subscribe the second channel, and call the logging service + for _, subscriptionName := range subscriptionNames2 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(loggerPodName), nil) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + + // wait for all channels and subscriptions to become ready + if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil { + t.Fatalf("The Channels or Subscription were not marked as Ready: %v", err) + } + + // send fake CloudEvent to the first channel + body := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID()) + event := &test.CloudEvent{ + Source: senderName, + Type: test.CloudEventDefaultType, + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventDefaultEncoding, + } + if err := SendFakeEventToChannel(clients, event, channels[0], ns, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the channel %q", channels[0].Name) + } + + // check if the logging service receives the correct number of event messages + expectedContent := body + msgPostfix + expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) + if err := WaitForLogContentCount(clients, loggerPod.Name, loggerPod.Spec.Containers[0].Name, expectedContent, expectedContentCount); err != nil { + t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", expectedContent, expectedContentCount, loggerPod.Name, err) + } +} diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index 4e982ac5b00..07515cbbc3a 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -21,20 +21,12 @@ import ( "fmt" "testing" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/test" pkgTest "github.com/knative/pkg/test" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" ) -const ( - channelName = "e2e-singleevent" - subscriberName = "e2e-singleevent-subscriber" - senderName = "e2e-singleevent-sender" - subscriptionName = "e2e-singleevent-subscription" - routeName = "e2e-singleevent-route" -) - func TestSingleBinaryEvent(t *testing.T) { SingleEvent(t, test.CloudEventEncodingBinary) } @@ -43,11 +35,27 @@ func TestSingleStructuredEvent(t *testing.T) { SingleEvent(t, test.CloudEventEncodingStructured) } +/* +SingleEvent tests the following scenario: + +EventSource ---> Channel ---> Subscription ---> Service(Logger) + +*/ func SingleEvent(t *testing.T, encoding string) { - clients, cleaner := Setup(t, t.Logf) + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + + const ( + channelName = "e2e-singleevent" + senderName = "e2e-singleevent-sender" + subscriptionName = "e2e-singleevent-subscription" + loggerPodName = "e2e-singleevent-logger-pod" + ) + clients, cleaner := Setup(t, t.Logf) // verify namespace - ns, cleanupNS := NamespaceExists(t, clients, t.Logf) + ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) defer cleanupNS() // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all @@ -55,63 +63,42 @@ func SingleEvent(t *testing.T, encoding string) { defer TearDown(clients, cleaner, t.Logf) // create logger pod - t.Logf("creating subscriber pod") + t.Logf("creating logger pod") selector := map[string]string{"e2etest": string(uuid.NewUUID())} - subscriberPod := test.EventLoggerPod(routeName, ns, selector) - if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger pod: %v", err) - } - if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { - t.Fatalf("Error waiting for logger pod to become running: %v", err) - } - t.Logf("subscriber pod running") - - subscriberSvc := test.Service(routeName, ns, selector) - if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger service: %v", err) - } - - // Reload subscriberPod to get IP - subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) + loggerPod := test.EventLoggerPod(loggerPodName, ns, selector) + loggerSvc := test.Service(loggerPodName, ns, selector) + loggerPod, err := CreatePodAndServiceReady(clients, loggerPod, loggerSvc, ns, t.Logf, cleaner) if err != nil { - t.Fatalf("Failed to get subscriber pod: %v", err) + t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err) } // create channel t.Logf("Creating Channel and Subscription") - if test.EventingFlags.Provisioner == "" { - t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") - } channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) t.Logf("channel: %#v", channel) - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelName), test.SubscriberSpecForService(routeName), nil) + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelName), test.SubscriberSpecForService(loggerPodName), nil) t.Logf("sub: %#v", sub) - if err := WithChannelAndSubscriptionReady(clients, channel, sub, t.Logf, cleaner); err != nil { + if err := WithChannelsAndSubscriptionsReady(clients, &[]*v1alpha1.Channel{channel}, &[]*v1alpha1.Subscription{sub}, t.Logf, cleaner); err != nil { t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err) } - // create sender pod - - t.Logf("Creating event sender") + // send fake CloudEvent to the first channel body := fmt.Sprintf("TestSingleEvent %s", uuid.NewUUID()) - event := test.CloudEvent{ + event := &test.CloudEvent{ Source: senderName, - Type: "test.eventing.knative.dev", + Type: test.CloudEventDefaultType, Data: fmt.Sprintf(`{"msg":%q}`, body), Encoding: encoding, } - url := fmt.Sprintf("http://%s", channel.Status.Address.Hostname) - pod := test.EventSenderPod(senderName, ns, url, event) - t.Logf("sender pod: %#v", pod) - if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event sender pod: %v", err) + if err := SendFakeEventToChannel(clients, event, channel, ns, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the channel %q", channel.Name) } - if err := pkgTest.WaitForLogContent(clients.Kube, routeName, subscriberPod.Spec.Containers[0].Name, body); err != nil { + if err := pkgTest.WaitForLogContent(clients.Kube, loggerPodName, loggerPod.Spec.Containers[0].Name, body); err != nil { clients.Kube.PodLogs(senderName, "sendevent") clients.Kube.PodLogs(senderName, "istio-proxy") - t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, routeName, err) + t.Fatalf("String %q not found in logs of logger pod %q: %v", body, loggerPodName, err) } } diff --git a/test/test_images/logevents/main.go b/test/test_images/logevents/main.go index ef440024199..9e891509b50 100644 --- a/test/test_images/logevents/main.go +++ b/test/test_images/logevents/main.go @@ -3,7 +3,9 @@ Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://www.apache.org/licenses/LICENSE-2.0 + + https://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,7 +25,6 @@ import ( ) func handler(event cloudevents.Event) { - log.Printf("%s", event) // TODO: in version 0.5.0 of cloudevents, below can be deleted. ctx := event.Context.AsV02() diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go new file mode 100644 index 00000000000..76e79e99612 --- /dev/null +++ b/test/test_images/transformevents/main.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "log" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/client" +) + +type example struct { + ID string `json:"sequence"` + Message string `json:"msg"` +} + +var ( + msgPostfix string +) + +func init() { + flag.StringVar(&msgPostfix, "msg-postfix", "", "The postfix we want to add for the message.") +} + +func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error { + ctx := event.Context.AsV02() + + data := &example{} + if err := event.DataAs(data); err != nil { + fmt.Printf("Got Data Error: %s\n", err.Error()) + return err + } + + data.Message += msgPostfix + log.Printf("[%s] %s %s: %+v", ctx.Time.String(), *ctx.ContentType, ctx.Source.String(), data) + + r := cloudevents.Event{ + Context: event.Context, + Data: data, + } + resp.RespondWith(200, &r) + return nil +} + +func main() { + // parse the command line flags + flag.Parse() + c, err := client.NewDefault() + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + log.Printf("listening on 8080") + log.Printf("msgPostfix: %s", msgPostfix) + log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent)) +}