Skip to content
18 changes: 13 additions & 5 deletions test/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ func Subscription(name string, namespace string, channel *corev1.ObjectReference
}

// Broker returns a Broker.
func Broker(name string, namespace string) *v1alpha1.Broker {
func Broker(name, namespace string, provisioner *corev1.ObjectReference) *v1alpha1.Broker {
return &v1alpha1.Broker{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v1alpha1.BrokerSpec{},
Spec: v1alpha1.BrokerSpec{
ChannelTemplate: &v1alpha1.ChannelSpec{
Provisioner: provisioner,
},
},
}
}

Expand Down Expand Up @@ -179,7 +183,7 @@ func EventLoggerPod(name string, namespace string, selector map[string]string) *
}

// EventTransformationPod creates a Pod that transforms events received.
func EventTransformationPod(name string, namespace string, selector map[string]string, msgPostfix string) *corev1.Pod {
func EventTransformationPod(name string, namespace string, selector map[string]string, event *CloudEvent) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -193,8 +197,12 @@ func EventTransformationPod(name string, namespace string, selector map[string]s
Image: pkgTest.ImagePath("transformevents"),
ImagePullPolicy: corev1.PullAlways, // TODO: this might not be wanted for local.
Args: []string{
"-msg-postfix",
msgPostfix,
"-event-type",
event.Type,
"-event-source",
event.Source,
"-event-data",
event.Data,
},
}},
RestartPolicy: corev1.RestartPolicyAlways,
Expand Down
157 changes: 157 additions & 0 deletions test/e2e/broker_event_transformation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// +build e2e

/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"fmt"
"testing"

"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/test"
pkgTest "github.com/knative/pkg/test"
"k8s.io/apimachinery/pkg/util/uuid"
)

/*
TestEventTransformationForTrigger tests the following scenario:

5 4
------------- ----------------------
| | | |
1 v 2 | v 3 |
EventSource ---> Broker ---> Trigger1 -------> Service(Transformation)
|
| 6 7
|-------> Trigger2 -------> Service(Logger)

Note: the number denotes the sequence of the event that flows in this test case.
*/
func TestEventTransformationForTrigger(t *testing.T) {
const (
brokerName = "e2e-eventtransformation-broker"
saName = "eventing-broker-filter"
// This ClusterRole is installed in Knative Eventing setup, see https://github.com/knative/eventing/tree/master/docs/broker#manual-setup.
crName = "eventing-broker-filter"

any = v1alpha1.TriggerAnyFilter
eventType1 = "type1"
eventType2 = "type2"
eventSource1 = "source1"
eventSource2 = "source2"
eventBody = "e2e-eventtransformation-body"

triggerName1 = "trigger1"
triggerName2 = "trigger2"

transformationPodName = "trans-pod"
loggerPodName = "logger-pod"
)

clients, ns, provisioner, cleaner := Setup(t, true, t.Logf)
defer TearDown(clients, ns, cleaner, t.Logf)

// creates ServiceAccount and ClusterRoleBinding with default cluster-admin role
err := CreateServiceAccountAndBinding(clients, saName, crName, ns, t.Logf, cleaner)
if err != nil {
t.Fatalf("Failed to create the ServiceAccount and ServiceAccountRoleBinding: %v", err)
}

// create a new broker
broker := test.Broker(brokerName, ns, test.ClusterChannelProvisioner(provisioner))
t.Logf("provisioner name is: %s", broker.Spec.ChannelTemplate.Provisioner.Name)
err = WithBrokerReady(clients, broker, t.Logf, cleaner)
if err != nil {
Comment thread
chizhg marked this conversation as resolved.
t.Fatalf("Error waiting for the broker to become ready: %v, %v", err, broker)
}
brokerUrl := fmt.Sprintf("http://%s", broker.Status.Address.Hostname)
t.Logf("The broker is ready with url: %q", brokerUrl)

// create an event we want to send
eventToSend := &test.CloudEvent{
Source: eventSource1,
Type: eventType1,
Data: fmt.Sprintf(`{"msg":%q}`, eventBody),
Encoding: test.CloudEventDefaultEncoding,
}

// create the event we want to tranform to
transformedEventBody := fmt.Sprintf("%s %s", eventBody, string(uuid.NewUUID()))
eventAfterTransformation := &test.CloudEvent{
Source: eventSource2,
Type: eventType2,
Data: fmt.Sprintf(`{"msg":%q}`, transformedEventBody),
Encoding: test.CloudEventDefaultEncoding,
}

// create the transformation pod and service, and get them ready
transformationPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())}
transformationPod := test.EventTransformationPod(transformationPodName, ns, transformationPodSelector, eventAfterTransformation)
transformationSvc := test.Service(transformationPodName, ns, transformationPodSelector)
transformationPod, err = CreatePodAndServiceReady(clients, transformationPod, transformationSvc, t.Logf, cleaner)
if err != nil {
t.Fatalf("Failed to create transformation pod and service, and get them ready: %v", err)
}

trigger1 := test.NewTriggerBuilder(triggerName1, ns).
Comment thread
chizhg marked this conversation as resolved.
EventType(eventType1).
EventSource(eventSource1).
Broker(brokerName).
SubscriberSvc(transformationPodName).
Build()
err = CreateTrigger(clients, trigger1, t.Logf, cleaner)
if err != nil {
t.Fatalf("Error creating trigger1: %v", err)
}

// create logger pod and service, and get them ready
loggerPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())}
loggerPod := test.EventLoggerPod(loggerPodName, ns, loggerPodSelector)
loggerSvc := test.Service(loggerPodName, ns, loggerPodSelector)
loggerPod, err = CreatePodAndServiceReady(clients, loggerPod, loggerSvc, t.Logf, cleaner)
if err != nil {
t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err)
}

trigger2 := test.NewTriggerBuilder(triggerName2, ns).
EventType(eventType2).
EventSource(eventSource2).
Broker(brokerName).
SubscriberSvc(loggerPodName).
Build()
err = CreateTrigger(clients, trigger2, t.Logf, cleaner)
if err != nil {
t.Fatalf("Error creating trigger2: %v", err)
}

// Wait for all of the triggers in the namespace to be ready.
if err := WaitForAllTriggersReady(clients, ns, t.Logf); err != nil {
t.Fatalf("Error waiting for triggers to become ready: %v", err)
}

// send fake CloudEvent to the broker
if err := SendFakeEventToBroker(clients, eventToSend, broker, t.Logf, cleaner); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the broker %q", broker.Name)
}

if err := pkgTest.WaitForLogContent(clients.Kube, loggerPodName, loggerPod.Spec.Containers[0].Name, ns, transformedEventBody); err != nil {
logPodLogsForDebugging(clients, transformationPodName, transformationPod.Spec.Containers[0].Name, ns, t.Logf)
logPodLogsForDebugging(clients, loggerPodName, loggerPod.Spec.Containers[0].Name, ns, t.Logf)
logPodLogsForDebugging(clients, eventSource1, "sendevent", ns, t.Logf)
t.Fatalf("String %q not found in logs of logger pod %q: %v", transformedEventBody, loggerPodName, err)
}
}
2 changes: 1 addition & 1 deletion test/e2e/broker_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) {

// Wait for default broker ready.
t.Logf("Waiting for default broker to be ready")
defaultBroker := test.Broker(brokerName, ns)
defaultBroker := test.Broker(brokerName, ns, test.ClusterChannelProvisioner(""))
err = WaitForBrokerReady(clients, defaultBroker)
if err != nil {
t.Fatalf("Error waiting for default broker to become ready: %v", err)
Expand Down
22 changes: 17 additions & 5 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,10 @@ func CreateClusterRoleBinding(clients *test.Clients, crb *rbacv1.ClusterRoleBind

// CreateServiceAccountAndBinding creates both ServiceAccount and ClusterRoleBinding with default
// cluster-admin role.
func CreateServiceAccountAndBinding(clients *test.Clients, name string, namespace string, logf logging.FormatLogger, cleaner *test.Cleaner) error {
func CreateServiceAccountAndBinding(clients *test.Clients, saName, crName, namespace string, logf logging.FormatLogger, cleaner *test.Cleaner) error {
sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: saName,
Namespace: namespace,
},
}
Expand All @@ -318,7 +318,7 @@ func CreateServiceAccountAndBinding(clients *test.Clients, name string, namespac
},
RoleRef: rbacv1.RoleRef{
Kind: "ClusterRole",
Name: "cluster-admin",
Name: crName,
APIGroup: "rbac.authorization.k8s.io",
},
}
Expand Down Expand Up @@ -377,10 +377,22 @@ func CreatePod(clients *test.Clients, pod *corev1.Pod, _ logging.FormatLogger, c

// SendFakeEventToChannel will create fake CloudEvent and send it to the given channel.
func SendFakeEventToChannel(clients *test.Clients, event *test.CloudEvent, channel *v1alpha1.Channel, logf logging.FormatLogger, cleaner *test.Cleaner) error {
logf("Sending fake CloudEvent")
logf("Creating event sender pod")
namespace := channel.Namespace
url := fmt.Sprintf("http://%s", channel.Status.Address.Hostname)
return sendFakeEventToAddress(clients, event, url, namespace, logf, cleaner)
}

// SendFakeEventToBroker will create fake CloudEvent and send it to the given broker.
func SendFakeEventToBroker(clients *test.Clients, event *test.CloudEvent, broker *v1alpha1.Broker, logf logging.FormatLogger, cleaner *test.Cleaner) error {
namespace := broker.Namespace
url := fmt.Sprintf("http://%s", broker.Status.Address.Hostname)
return sendFakeEventToAddress(clients, event, url, namespace, logf, cleaner)
}

func sendFakeEventToAddress(clients *test.Clients, event *test.CloudEvent, url, namespace string, logf logging.FormatLogger, cleaner *test.Cleaner) error {
logf("Sending fake CloudEvent")
logf("Creating event sender pod %q", event.Source)

pod := test.EventSenderPod(event.Source, namespace, url, event)
if err := CreatePod(clients, pod, logf, cleaner); err != nil {
return err
Expand Down
32 changes: 19 additions & 13 deletions test/e2e/event_transformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ EventSource ---> Channel ---> Subscription ---> Channel ---> Subscription ---->
*/
func TestEventTransformation(t *testing.T) {
senderName := "e2e-eventtransformation-sender"
msgPostfix := string(uuid.NewUUID())
channelNames := [2]string{"e2e-eventtransformation1", "e2e-eventtransformation2"}
// subscriptionNames1 corresponds to Subscriptions on channelNames[0]
subscriptionNames1 := []string{"e2e-eventtransformation-subs11", "e2e-eventtransformation-subs12"}
// subscriptionNames2 corresponds to Subscriptions on channelNames[1]
subscriptionNames2 := []string{"e2e-eventtransformation-subs21", "e2e-eventtransformation-subs22"}
transformationPodName := "e2e-eventtransformation-transformation-pod"
loggerPodName := "e2e-eventtransformation-logger-pod"
eventBody := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID())

clients, ns, provisioner, cleaner := Setup(t, true, t.Logf)
defer TearDown(clients, ns, cleaner, t.Logf)
Expand All @@ -58,8 +58,15 @@ func TestEventTransformation(t *testing.T) {
subscriberPods := make([]*corev1.Pod, 0)

// create transformation pod and service
transformedEventBody := fmt.Sprintf("eventBody %s", uuid.NewUUID())
eventAfterTransformation := &test.CloudEvent{
Source: senderName,
Type: test.CloudEventDefaultType,
Data: fmt.Sprintf(`{"msg":%q}`, transformedEventBody),
Encoding: test.CloudEventDefaultEncoding,
}
transformationPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())}
transformationPod := test.EventTransformationPod(transformationPodName, ns, transformationPodSelector, msgPostfix)
transformationPod := test.EventTransformationPod(transformationPodName, ns, transformationPodSelector, eventAfterTransformation)
transformationSvc := test.Service(transformationPodName, ns, transformationPodSelector)
transformationPod, err := CreatePodAndServiceReady(clients, transformationPod, transformationSvc, t.Logf, cleaner)
if err != nil {
Expand Down Expand Up @@ -102,25 +109,24 @@ func TestEventTransformation(t *testing.T) {
t.Fatalf("The Channels or Subscription were not marked as Ready: %v", err)
}

// send fake CloudEvent to the first channel
body := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID())
event := &test.CloudEvent{
eventToSend := &test.CloudEvent{
Source: senderName,
Type: test.CloudEventDefaultType,
Data: fmt.Sprintf(`{"msg":%q}`, body),
Data: fmt.Sprintf(`{"msg":%q}`, eventBody),
Encoding: test.CloudEventDefaultEncoding,
}
if err := SendFakeEventToChannel(clients, event, channels[0], t.Logf, cleaner); err != nil {
// send fake CloudEvent to the first channel
if err := SendFakeEventToChannel(clients, eventToSend, channels[0], t.Logf, cleaner); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the channel %q", channels[0].Name)
}

// check if the logging service receives the correct number of event messages
expectedContent := body + msgPostfix
expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2)
podName := loggerPod.Name
containerName := loggerPod.Spec.Containers[0].Name
if err := WaitForLogContentCount(clients, podName, containerName, ns, expectedContent, expectedContentCount); err != nil {
logPodLogsForDebugging(clients, podName, containerName, ns, t.Logf)
t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", expectedContent, expectedContentCount, loggerPod.Name, err)
loggerContainerName := loggerPod.Spec.Containers[0].Name
if err := WaitForLogContentCount(clients, loggerPodName, loggerContainerName, ns, transformedEventBody, expectedContentCount); err != nil {
logPodLogsForDebugging(clients, transformationPodName, transformationPod.Spec.Containers[0].Name, ns, t.Logf)
logPodLogsForDebugging(clients, loggerPodName, loggerContainerName, ns, t.Logf)
logPodLogsForDebugging(clients, eventSource1, "sendevent", ns, t.Logf)
t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", transformedEventBody, expectedContentCount, loggerPod.Name, err)
}
}
3 changes: 3 additions & 0 deletions test/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ var channelTestMap = map[string][]func(t *testing.T){
TestEventTransformation,
TestChannelChain,
TestDefaultBrokerWithManyTriggers,
TestEventTransformationForTrigger,
},
test.InMemoryChannelProvisioner: {
TestSingleBinaryEvent,
TestSingleStructuredEvent,
TestEventTransformation,
TestChannelChain,
TestEventTransformationForTrigger,
},
test.GCPPubSubProvisioner: {
TestSingleBinaryEvent,
Expand All @@ -52,6 +54,7 @@ var channelTestMap = map[string][]func(t *testing.T){
TestSingleStructuredEvent,
TestEventTransformation,
TestChannelChain,
TestEventTransformationForTrigger,
},
}

Expand Down
10 changes: 5 additions & 5 deletions test/test_images/sendevent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {
r := recover()
if r != nil {
err = r.(error)
fmt.Printf("recovered from panic: %v", err)
log.Printf("recovered from panic: %v", err)
}
}()

Expand All @@ -105,7 +105,7 @@ func main() {
case "structured":
encodingOption = cloudevents.WithStructuredEncoding()
default:
fmt.Printf("unsupported encoding option: %q\n", encoding)
log.Printf("unsupported encoding option: %q\n", encoding)
os.Exit(1)
}

Expand All @@ -126,7 +126,7 @@ func main() {

var untyped map[string]interface{}
if err := json.Unmarshal([]byte(data), &untyped); err != nil {
fmt.Println("Currently sendevent only supports JSON event data")
log.Println("Currently sendevent only supports JSON event data")
os.Exit(1)
}

Expand All @@ -148,9 +148,9 @@ func main() {
}

if resp, err := c.Send(context.Background(), event); err != nil {
fmt.Printf("send returned an error: %v\n", err)
log.Printf("send returned an error: %v\n", err)
} else if resp != nil {
fmt.Printf("Got response from %s\n%s\n", sink, resp)
log.Printf("Got response from %s\n%s\n", sink, resp)
Comment thread
chizhg marked this conversation as resolved.
}

// Wait for next tick
Expand Down
Loading