diff --git a/test/crd.go b/test/crd.go index 046532f5f7b..233fa9ab2d7 100644 --- a/test/crd.go +++ b/test/crd.go @@ -86,13 +86,17 @@ func Subscription(name string, namespace string, channel *corev1.ObjectReference } // Broker returns a Broker. -func Broker(name string, namespace string) *v1alpha1.Broker { +func Broker(name, namespace string, provisioner *corev1.ObjectReference) *v1alpha1.Broker { return &v1alpha1.Broker{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, }, - Spec: v1alpha1.BrokerSpec{}, + Spec: v1alpha1.BrokerSpec{ + ChannelTemplate: &v1alpha1.ChannelSpec{ + Provisioner: provisioner, + }, + }, } } @@ -179,7 +183,7 @@ func EventLoggerPod(name string, namespace string, selector map[string]string) * } // EventTransformationPod creates a Pod that transforms events received. -func EventTransformationPod(name string, namespace string, selector map[string]string, msgPostfix string) *corev1.Pod { +func EventTransformationPod(name string, namespace string, selector map[string]string, event *CloudEvent) *corev1.Pod { return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -193,8 +197,12 @@ func EventTransformationPod(name string, namespace string, selector map[string]s Image: pkgTest.ImagePath("transformevents"), ImagePullPolicy: corev1.PullAlways, // TODO: this might not be wanted for local. Args: []string{ - "-msg-postfix", - msgPostfix, + "-event-type", + event.Type, + "-event-source", + event.Source, + "-event-data", + event.Data, }, }}, RestartPolicy: corev1.RestartPolicyAlways, diff --git a/test/e2e/broker_event_transformation_test.go b/test/e2e/broker_event_transformation_test.go new file mode 100644 index 00000000000..861d9b4e658 --- /dev/null +++ b/test/e2e/broker_event_transformation_test.go @@ -0,0 +1,157 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/test" + pkgTest "github.com/knative/pkg/test" + "k8s.io/apimachinery/pkg/util/uuid" +) + +/* +TestEventTransformationForTrigger tests the following scenario: + + 5 4 + ------------- ---------------------- + | | | | + 1 v 2 | v 3 | +EventSource ---> Broker ---> Trigger1 -------> Service(Transformation) + | + | 6 7 + |-------> Trigger2 -------> Service(Logger) + +Note: the number denotes the sequence of the event that flows in this test case. +*/ +func TestEventTransformationForTrigger(t *testing.T) { + const ( + brokerName = "e2e-eventtransformation-broker" + saName = "eventing-broker-filter" + // This ClusterRole is installed in Knative Eventing setup, see https://github.com/knative/eventing/tree/master/docs/broker#manual-setup. + crName = "eventing-broker-filter" + + any = v1alpha1.TriggerAnyFilter + eventType1 = "type1" + eventType2 = "type2" + eventSource1 = "source1" + eventSource2 = "source2" + eventBody = "e2e-eventtransformation-body" + + triggerName1 = "trigger1" + triggerName2 = "trigger2" + + transformationPodName = "trans-pod" + loggerPodName = "logger-pod" + ) + + clients, ns, provisioner, cleaner := Setup(t, true, t.Logf) + defer TearDown(clients, ns, cleaner, t.Logf) + + // creates ServiceAccount and ClusterRoleBinding with default cluster-admin role + err := CreateServiceAccountAndBinding(clients, saName, crName, ns, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create the ServiceAccount and ServiceAccountRoleBinding: %v", err) + } + + // create a new broker + broker := test.Broker(brokerName, ns, test.ClusterChannelProvisioner(provisioner)) + t.Logf("provisioner name is: %s", broker.Spec.ChannelTemplate.Provisioner.Name) + err = WithBrokerReady(clients, broker, t.Logf, cleaner) + if err != nil { + t.Fatalf("Error waiting for the broker to become ready: %v, %v", err, broker) + } + brokerUrl := fmt.Sprintf("http://%s", broker.Status.Address.Hostname) + t.Logf("The broker is ready with url: %q", brokerUrl) + + // create an event we want to send + eventToSend := &test.CloudEvent{ + Source: eventSource1, + Type: eventType1, + Data: fmt.Sprintf(`{"msg":%q}`, eventBody), + Encoding: test.CloudEventDefaultEncoding, + } + + // create the event we want to tranform to + transformedEventBody := fmt.Sprintf("%s %s", eventBody, string(uuid.NewUUID())) + eventAfterTransformation := &test.CloudEvent{ + Source: eventSource2, + Type: eventType2, + Data: fmt.Sprintf(`{"msg":%q}`, transformedEventBody), + Encoding: test.CloudEventDefaultEncoding, + } + + // create the transformation pod and service, and get them ready + transformationPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())} + transformationPod := test.EventTransformationPod(transformationPodName, ns, transformationPodSelector, eventAfterTransformation) + transformationSvc := test.Service(transformationPodName, ns, transformationPodSelector) + transformationPod, err = CreatePodAndServiceReady(clients, transformationPod, transformationSvc, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create transformation pod and service, and get them ready: %v", err) + } + + trigger1 := test.NewTriggerBuilder(triggerName1, ns). + EventType(eventType1). + EventSource(eventSource1). + Broker(brokerName). + SubscriberSvc(transformationPodName). + Build() + err = CreateTrigger(clients, trigger1, t.Logf, cleaner) + if err != nil { + t.Fatalf("Error creating trigger1: %v", err) + } + + // create logger pod and service, and get them ready + loggerPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())} + loggerPod := test.EventLoggerPod(loggerPodName, ns, loggerPodSelector) + loggerSvc := test.Service(loggerPodName, ns, loggerPodSelector) + loggerPod, err = CreatePodAndServiceReady(clients, loggerPod, loggerSvc, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err) + } + + trigger2 := test.NewTriggerBuilder(triggerName2, ns). + EventType(eventType2). + EventSource(eventSource2). + Broker(brokerName). + SubscriberSvc(loggerPodName). + Build() + err = CreateTrigger(clients, trigger2, t.Logf, cleaner) + if err != nil { + t.Fatalf("Error creating trigger2: %v", err) + } + + // Wait for all of the triggers in the namespace to be ready. + if err := WaitForAllTriggersReady(clients, ns, t.Logf); err != nil { + t.Fatalf("Error waiting for triggers to become ready: %v", err) + } + + // send fake CloudEvent to the broker + if err := SendFakeEventToBroker(clients, eventToSend, broker, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the broker %q", broker.Name) + } + + if err := pkgTest.WaitForLogContent(clients.Kube, loggerPodName, loggerPod.Spec.Containers[0].Name, ns, transformedEventBody); err != nil { + logPodLogsForDebugging(clients, transformationPodName, transformationPod.Spec.Containers[0].Name, ns, t.Logf) + logPodLogsForDebugging(clients, loggerPodName, loggerPod.Spec.Containers[0].Name, ns, t.Logf) + logPodLogsForDebugging(clients, eventSource1, "sendevent", ns, t.Logf) + t.Fatalf("String %q not found in logs of logger pod %q: %v", transformedEventBody, loggerPodName, err) + } +} diff --git a/test/e2e/broker_trigger_test.go b/test/e2e/broker_trigger_test.go index 862f39a828b..bd19e0c3a16 100644 --- a/test/e2e/broker_trigger_test.go +++ b/test/e2e/broker_trigger_test.go @@ -70,7 +70,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { // Wait for default broker ready. t.Logf("Waiting for default broker to be ready") - defaultBroker := test.Broker(brokerName, ns) + defaultBroker := test.Broker(brokerName, ns, test.ClusterChannelProvisioner("")) err = WaitForBrokerReady(clients, defaultBroker) if err != nil { t.Fatalf("Error waiting for default broker to become ready: %v", err) diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index f559c3935e6..2ee88c047c5 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -294,10 +294,10 @@ func CreateClusterRoleBinding(clients *test.Clients, crb *rbacv1.ClusterRoleBind // CreateServiceAccountAndBinding creates both ServiceAccount and ClusterRoleBinding with default // cluster-admin role. -func CreateServiceAccountAndBinding(clients *test.Clients, name string, namespace string, logf logging.FormatLogger, cleaner *test.Cleaner) error { +func CreateServiceAccountAndBinding(clients *test.Clients, saName, crName, namespace string, logf logging.FormatLogger, cleaner *test.Cleaner) error { sa := &corev1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: saName, Namespace: namespace, }, } @@ -318,7 +318,7 @@ func CreateServiceAccountAndBinding(clients *test.Clients, name string, namespac }, RoleRef: rbacv1.RoleRef{ Kind: "ClusterRole", - Name: "cluster-admin", + Name: crName, APIGroup: "rbac.authorization.k8s.io", }, } @@ -377,10 +377,22 @@ func CreatePod(clients *test.Clients, pod *corev1.Pod, _ logging.FormatLogger, c // SendFakeEventToChannel will create fake CloudEvent and send it to the given channel. func SendFakeEventToChannel(clients *test.Clients, event *test.CloudEvent, channel *v1alpha1.Channel, logf logging.FormatLogger, cleaner *test.Cleaner) error { - logf("Sending fake CloudEvent") - logf("Creating event sender pod") namespace := channel.Namespace url := fmt.Sprintf("http://%s", channel.Status.Address.Hostname) + return sendFakeEventToAddress(clients, event, url, namespace, logf, cleaner) +} + +// SendFakeEventToBroker will create fake CloudEvent and send it to the given broker. +func SendFakeEventToBroker(clients *test.Clients, event *test.CloudEvent, broker *v1alpha1.Broker, logf logging.FormatLogger, cleaner *test.Cleaner) error { + namespace := broker.Namespace + url := fmt.Sprintf("http://%s", broker.Status.Address.Hostname) + return sendFakeEventToAddress(clients, event, url, namespace, logf, cleaner) +} + +func sendFakeEventToAddress(clients *test.Clients, event *test.CloudEvent, url, namespace string, logf logging.FormatLogger, cleaner *test.Cleaner) error { + logf("Sending fake CloudEvent") + logf("Creating event sender pod %q", event.Source) + pod := test.EventSenderPod(event.Source, namespace, url, event) if err := CreatePod(clients, pod, logf, cleaner); err != nil { return err diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index 7d5b1c87fd4..26353e8cba8 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -41,7 +41,6 @@ EventSource ---> Channel ---> Subscription ---> Channel ---> Subscription ----> */ func TestEventTransformation(t *testing.T) { senderName := "e2e-eventtransformation-sender" - msgPostfix := string(uuid.NewUUID()) channelNames := [2]string{"e2e-eventtransformation1", "e2e-eventtransformation2"} // subscriptionNames1 corresponds to Subscriptions on channelNames[0] subscriptionNames1 := []string{"e2e-eventtransformation-subs11", "e2e-eventtransformation-subs12"} @@ -49,6 +48,7 @@ func TestEventTransformation(t *testing.T) { subscriptionNames2 := []string{"e2e-eventtransformation-subs21", "e2e-eventtransformation-subs22"} transformationPodName := "e2e-eventtransformation-transformation-pod" loggerPodName := "e2e-eventtransformation-logger-pod" + eventBody := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID()) clients, ns, provisioner, cleaner := Setup(t, true, t.Logf) defer TearDown(clients, ns, cleaner, t.Logf) @@ -58,8 +58,15 @@ func TestEventTransformation(t *testing.T) { subscriberPods := make([]*corev1.Pod, 0) // create transformation pod and service + transformedEventBody := fmt.Sprintf("eventBody %s", uuid.NewUUID()) + eventAfterTransformation := &test.CloudEvent{ + Source: senderName, + Type: test.CloudEventDefaultType, + Data: fmt.Sprintf(`{"msg":%q}`, transformedEventBody), + Encoding: test.CloudEventDefaultEncoding, + } transformationPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())} - transformationPod := test.EventTransformationPod(transformationPodName, ns, transformationPodSelector, msgPostfix) + transformationPod := test.EventTransformationPod(transformationPodName, ns, transformationPodSelector, eventAfterTransformation) transformationSvc := test.Service(transformationPodName, ns, transformationPodSelector) transformationPod, err := CreatePodAndServiceReady(clients, transformationPod, transformationSvc, t.Logf, cleaner) if err != nil { @@ -102,25 +109,24 @@ func TestEventTransformation(t *testing.T) { t.Fatalf("The Channels or Subscription were not marked as Ready: %v", err) } - // send fake CloudEvent to the first channel - body := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID()) - event := &test.CloudEvent{ + eventToSend := &test.CloudEvent{ Source: senderName, Type: test.CloudEventDefaultType, - Data: fmt.Sprintf(`{"msg":%q}`, body), + Data: fmt.Sprintf(`{"msg":%q}`, eventBody), Encoding: test.CloudEventDefaultEncoding, } - if err := SendFakeEventToChannel(clients, event, channels[0], t.Logf, cleaner); err != nil { + // send fake CloudEvent to the first channel + if err := SendFakeEventToChannel(clients, eventToSend, channels[0], t.Logf, cleaner); err != nil { t.Fatalf("Failed to send fake CloudEvent to the channel %q", channels[0].Name) } // check if the logging service receives the correct number of event messages - expectedContent := body + msgPostfix expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) - podName := loggerPod.Name - containerName := loggerPod.Spec.Containers[0].Name - if err := WaitForLogContentCount(clients, podName, containerName, ns, expectedContent, expectedContentCount); err != nil { - logPodLogsForDebugging(clients, podName, containerName, ns, t.Logf) - t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", expectedContent, expectedContentCount, loggerPod.Name, err) + loggerContainerName := loggerPod.Spec.Containers[0].Name + if err := WaitForLogContentCount(clients, loggerPodName, loggerContainerName, ns, transformedEventBody, expectedContentCount); err != nil { + logPodLogsForDebugging(clients, transformationPodName, transformationPod.Spec.Containers[0].Name, ns, t.Logf) + logPodLogsForDebugging(clients, loggerPodName, loggerContainerName, ns, t.Logf) + logPodLogsForDebugging(clients, eventSource1, "sendevent", ns, t.Logf) + t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", transformedEventBody, expectedContentCount, loggerPod.Name, err) } } diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index 8035bbcf009..0ea5854771b 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -34,12 +34,14 @@ var channelTestMap = map[string][]func(t *testing.T){ TestEventTransformation, TestChannelChain, TestDefaultBrokerWithManyTriggers, + TestEventTransformationForTrigger, }, test.InMemoryChannelProvisioner: { TestSingleBinaryEvent, TestSingleStructuredEvent, TestEventTransformation, TestChannelChain, + TestEventTransformationForTrigger, }, test.GCPPubSubProvisioner: { TestSingleBinaryEvent, @@ -52,6 +54,7 @@ var channelTestMap = map[string][]func(t *testing.T){ TestSingleStructuredEvent, TestEventTransformation, TestChannelChain, + TestEventTransformationForTrigger, }, } diff --git a/test/test_images/sendevent/main.go b/test/test_images/sendevent/main.go index 7650afd6290..45566571496 100644 --- a/test/test_images/sendevent/main.go +++ b/test/test_images/sendevent/main.go @@ -84,7 +84,7 @@ func main() { r := recover() if r != nil { err = r.(error) - fmt.Printf("recovered from panic: %v", err) + log.Printf("recovered from panic: %v", err) } }() @@ -105,7 +105,7 @@ func main() { case "structured": encodingOption = cloudevents.WithStructuredEncoding() default: - fmt.Printf("unsupported encoding option: %q\n", encoding) + log.Printf("unsupported encoding option: %q\n", encoding) os.Exit(1) } @@ -126,7 +126,7 @@ func main() { var untyped map[string]interface{} if err := json.Unmarshal([]byte(data), &untyped); err != nil { - fmt.Println("Currently sendevent only supports JSON event data") + log.Println("Currently sendevent only supports JSON event data") os.Exit(1) } @@ -148,9 +148,9 @@ func main() { } if resp, err := c.Send(context.Background(), event); err != nil { - fmt.Printf("send returned an error: %v\n", err) + log.Printf("send returned an error: %v\n", err) } else if resp != nil { - fmt.Printf("Got response from %s\n%s\n", sink, resp) + log.Printf("Got response from %s\n%s\n", sink, resp) } // Wait for next tick diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go index 88e8db3ae6f..6698b665da5 100644 --- a/test/test_images/transformevents/main.go +++ b/test/test_images/transformevents/main.go @@ -21,6 +21,7 @@ import ( "flag" "fmt" "log" + "os" cloudevents "github.com/cloudevents/sdk-go" ) @@ -31,11 +32,15 @@ type example struct { } var ( - msgPostfix string + eventType string + eventSource string + eventData string ) func init() { - flag.StringVar(&msgPostfix, "msg-postfix", "", "The postfix we want to add for the message.") + flag.StringVar(&eventType, "event-type", "knative.eventing.test.e2e", "The Event Type to use.") + flag.StringVar(&eventSource, "event-source", "", "Source URI to use. Defaults to the current machine's hostname") + flag.StringVar(&eventData, "event-data", `{"hello": "world!"}`, "Cloudevent data body.") } func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error { @@ -47,13 +52,19 @@ func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error { return err } - data.Message += msgPostfix - log.Printf("[%s] %s %s: %+v", ctx.Time.String(), *ctx.ContentType, ctx.Source.String(), data) + log.Println("Received a new event: ") + log.Printf("[%s] %s %s: %+v", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), data) + ctx.SetSource(eventSource) + ctx.SetType(eventType) r := cloudevents.Event{ - Context: event.Context, - Data: data, + Context: ctx, + Data: eventData, } + + log.Println("Transform the event to: ") + log.Printf("[%s] %s %s: %+v", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), eventData) + resp.RespondWith(200, &r) return nil } @@ -61,12 +72,21 @@ func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error { func main() { // parse the command line flags flag.Parse() + + // default eventSource to the current machine's hostname + if eventSource == "" { + if hostname, err := os.Hostname(); err != nil { + eventSource = hostname + } else { + eventSource = "localhost" + } + } + c, err := cloudevents.NewDefaultClient() if err != nil { log.Fatalf("failed to create client, %v", err) } log.Printf("listening on 8080") - log.Printf("msgPostfix: %s", msgPostfix) log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent)) }