From 0408940e72a832c090e138d0bdfa0b112302077c Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Mon, 25 Mar 2019 21:06:59 -0700 Subject: [PATCH 01/20] add two e2e test case for Channel and Subscription in Eventing --- test/crd.go | 33 +++++- test/e2e/channel_chain_test.go | 123 ++++++++++++++++++++ test/e2e/complex_scenario_test.go | 140 +++++++++++++++++++++++ test/e2e/e2e.go | 86 ++++++++++---- test/e2e/single_event_test.go | 19 +-- test/test_images/logevents/main.go | 6 +- test/test_images/transformevents/main.go | 70 ++++++++++++ 7 files changed, 443 insertions(+), 34 deletions(-) create mode 100644 test/e2e/channel_chain_test.go create mode 100644 test/e2e/complex_scenario_test.go create mode 100644 test/test_images/transformevents/main.go diff --git a/test/crd.go b/test/crd.go index e4a000b2c59..40a5710ca75 100644 --- a/test/crd.go +++ b/test/crd.go @@ -95,7 +95,7 @@ func Channel(name string, namespace string, provisioner *corev1.ObjectReference) } } -// SubscriberSpecForRoute returns a SubscriberSpec for a given Knative Service. +// SubscriberSpecForRoute returns a SubscriberSpec for a given Knative Route. func SubscriberSpecForRoute(name string) *v1alpha1.SubscriberSpec { return &v1alpha1.SubscriberSpec{ Ref: pkgTest.CoreV1ObjectReference("Route", servingApiVersion, name), @@ -109,6 +109,13 @@ func SubscriberSpecForService(name string) *v1alpha1.SubscriberSpec { } } +// ReplyStrategyForChannel returns a ReplyStrategy for a given Channel +func ReplyStrategyForChannel(name string) *v1alpha1.ReplyStrategy { + return &v1alpha1.ReplyStrategy{ + Channel: pkgTest.CoreV1ObjectReference("Channel", eventsApiVersion, name), + } +} + // Subscription returns a Subscription func Subscription(name string, namespace string, channel *corev1.ObjectReference, subscriber *v1alpha1.SubscriberSpec, reply *v1alpha1.ReplyStrategy) *v1alpha1.Subscription { return &v1alpha1.Subscription{ @@ -214,6 +221,30 @@ func EventLoggerPod(name string, namespace string, selector map[string]string) * } } +// EventTransformationPod creates a Pod that transforms events received. +func EventTransformationPod(name string, namespace string, selector map[string]string, msgPostfix string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: selector, + Annotations: map[string]string{"sidecar.istio.io/inject": "true"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "transformevents", + Image: pkgTest.ImagePath("transformevents"), + ImagePullPolicy: corev1.PullAlways, // TODO: this might not be wanted for local. + Args: []string{ + "-msg-postfix", + msgPostfix, + }, + }}, + RestartPolicy: corev1.RestartPolicyAlways, + }, + } +} + // Service creates a Kubernetes Service with the given name, namespace, and // selector. Port 8080 is assumed the target port. func Service(name string, namespace string, selector map[string]string) *corev1.Service { diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go new file mode 100644 index 00000000000..9b9ec1aa5d8 --- /dev/null +++ b/test/e2e/channel_chain_test.go @@ -0,0 +1,123 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/test" + pkgTest "github.com/knative/pkg/test" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" +) + +func TestChannelChain(t *testing.T) { + const ( + senderName = "e2e-channelchain-sender" + routeName = "e2e-channelchain-route" + ) + var channelNames = [2]string{"e2e-channelchain1", "e2e-channelchain2"} + var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") + var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") + + clients, cleaner := Setup(t, t.Logf) + + // verify namespace + ns, cleanupNS := NamespaceExists(t, clients, t.Logf) + defer cleanupNS() + + // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all + // resources in it. So when TearDown() runs, it spews a lot of not found errors. + defer TearDown(clients, cleaner, t.Logf) + + // create subscriberPod and expose it as a service + t.Logf("creating subscriber pod") + selector := map[string]string{"e2etest": string(uuid.NewUUID())} + subscriberPod := test.EventLoggerPod(routeName, ns, selector) + if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event logger pod: %v", err) + } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + t.Fatalf("Error waiting for logger pod to become running: %v", err) + } + t.Logf("subscriber pod running") + + subscriberSvc := test.Service(routeName, ns, selector) + if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event logger service: %v", err) + } + + // Reload subscriberPod to get IP + subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get subscriber pod: %v", err) + } + + // create channels + t.Logf("Creating Channel and Subscription") + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + channels := make([]*v1alpha1.Channel, 0) + for _, channelName := range channelNames { + channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) + t.Logf("channel: %#v", channel) + channels = append(channels, channel) + } + + // create subscriptions + subs := make([]*v1alpha1.Subscription, 0) + // create subscriptions that subscribe the first channel, and reply events directly to the second channel + for _, subscriptionName := range subscriptionNames1 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), nil, test.ReplyStrategyForChannel(channelNames[1])) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + // create subscriptions that subscribe the second channel, and call the logging service + for _, subscriptionName := range subscriptionNames2 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(routeName), nil) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + + // wait for all channels and subscriptions to become ready + if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil { + t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err) + } + + // create sender pod + t.Logf("Creating event sender") + body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID()) + event := test.CloudEvent{ + Source: senderName, + Type: "test.eventing.knative.dev", + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventEncodingBinary, + } + url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) + pod := test.EventSenderPod(senderName, ns, url, event) + t.Logf("sender pod: %#v", pod) + if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event sender pod: %v", err) + } + + // check if the logging service receives the correct number of event messages + if err := WaitForLogContentCount(clients, routeName, subscriberPod.Spec.Containers[0].Name, body, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { + t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, routeName, err) + } +} diff --git a/test/e2e/complex_scenario_test.go b/test/e2e/complex_scenario_test.go new file mode 100644 index 00000000000..18c03ee5c4a --- /dev/null +++ b/test/e2e/complex_scenario_test.go @@ -0,0 +1,140 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/test" + pkgTest "github.com/knative/pkg/test" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" +) + +func TestComplexScenario(t *testing.T) { + const ( + senderName = "e2e-complexscen-sender" + msgPostfix = "######" + ) + + var channelNames = [2]string{"e2e-complexscen1", "e2e-complexscen2"} + var routeNames = [2]string{"e2e-complexscen-route1", "e2e-complexscen-route2"} + var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") + var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") + + clients, cleaner := Setup(t, t.Logf) + + // verify namespace + ns, cleanupNS := NamespaceExists(t, clients, t.Logf) + defer cleanupNS() + + // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all + // resources in it. So when TearDown() runs, it spews a lot of not found errors. + defer TearDown(clients, cleaner, t.Logf) + + // create subscriberPods and expose them as services + t.Logf("creating subscriber pods") + + subscriberPods := make([]*corev1.Pod, 0) + for i, routeName := range routeNames { + selector := map[string]string{"e2etest": string(uuid.NewUUID())} + var subscriberPod *corev1.Pod + // create the first subscriber for the event transformation, and the second for the logging + if i == 0 { + subscriberPod = test.EventTransformationPod(routeName, ns, selector, msgPostfix) + } else if i == 1 { + subscriberPod = test.EventLoggerPod(routeName, ns, selector) + } + + if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event logger pod: %v", err) + } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + t.Fatalf("Error waiting for logger pod to become running: %v", err) + } + t.Logf("subscriber pod running") + + subscriberSvc := test.Service(routeName, ns, selector) + if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event logger service: %v", err) + } + + // Reload subscriberPod to get IP + subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get subscriber pod: %v", err) + } + + subscriberPods = append(subscriberPods, subscriberPod) + } + + // create channels + t.Logf("Creating Channel and Subscription") + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + channels := make([]*v1alpha1.Channel, 0) + for _, channelName := range channelNames { + channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) + t.Logf("channel: %#v", channel) + + channels = append(channels, channel) + } + + // create subscriptions + subs := make([]*v1alpha1.Subscription, 0) + // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward to the second channel + for _, subscriptionName := range subscriptionNames1 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), test.SubscriberSpecForService(routeNames[0]), test.ReplyStrategyForChannel(channelNames[1])) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + // create subscriptions that subscribe the second channel, and call the logging service + for _, subscriptionName := range subscriptionNames2 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(routeNames[1]), nil) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + + // wait for all channels and subscriptions to become ready + if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil { + t.Fatalf("The Channels or Subscription were not marked as Ready: %v", err) + } + + // create sender pod + t.Logf("Creating event sender") + body := fmt.Sprintf("TestComplexScenario %s", uuid.NewUUID()) + event := test.CloudEvent{ + Source: senderName, + Type: "test.eventing.knative.dev", + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventEncodingBinary, + } + url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) + pod := test.EventSenderPod(senderName, ns, url, event) + t.Logf("sender pod: %#v", pod) + if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event sender pod: %v", err) + } + + // check if the logging service receives the correct number of event messages + if err := WaitForLogContentCount(clients, subscriberPods[1].Name, subscriberPods[1].Spec.Containers[0].Name, body+msgPostfix, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { + t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, subscriberPods[1].Name, err) + } +} diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 700fa18ecd0..f072ea40303 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -17,6 +17,7 @@ package e2e import ( "fmt" + "math/rand" "strings" "testing" "time" @@ -44,6 +45,10 @@ const ( interval = 1 * time.Second timeout = 1 * time.Minute + + // the minimum and maxmium number of subscriptions we generate in the e2e tests + minSubCount = 1 + maxSubCount = 5 ) // Setup creates the client objects needed in the e2e tests. @@ -69,6 +74,18 @@ func TearDown(clients *test.Clients, cleaner *test.Cleaner, _ logging.FormatLogg cleaner.Clean(true) } +// CreateRandomSubscriptionNames will create random number of subscription names +func CreateRandomSubscriptionNames(randSubNamePrefix string) []string { + rand.Seed(time.Now().UnixNano()) + count := rand.Intn(maxSubCount-minSubCount) + minSubCount + var subscriptionNames []string + for i := 0; i < count; i++ { + subscriptionNames = append(subscriptionNames, fmt.Sprintf("%s-%d", randSubNamePrefix, i)) + } + + return subscriptionNames +} + // CreateRouteAndConfig will create Route and Config objects using clients. // The Config object will serve requests to a container started from the image at imagePath. func CreateRouteAndConfig(clients *test.Clients, logf logging.FormatLogger, cleaner *test.Cleaner, name string, imagePath string) error { @@ -127,36 +144,47 @@ func CreateSubscription(clients *test.Clients, sub *v1alpha1.Subscription, _ log return nil } -// WithChannelAndSubscriptionReady creates a Channel and Subscription and waits until both are Ready. -func WithChannelAndSubscriptionReady(clients *test.Clients, channel *v1alpha1.Channel, sub *v1alpha1.Subscription, logf logging.FormatLogger, cleaner *test.Cleaner) error { - if err := CreateChannel(clients, channel, logf, cleaner); err != nil { - return err - } - if err := CreateSubscription(clients, sub, logf, cleaner); err != nil { - return err +// WithChannelsAndSubscriptionsReady creates Channels and Subscriptions and waits until both are Ready. +func WithChannelsAndSubscriptionsReady(clients *test.Clients, chans *[]*v1alpha1.Channel, subs *[]*v1alpha1.Subscription, logf logging.FormatLogger, cleaner *test.Cleaner) error { + for _, channel := range *chans { + if err := CreateChannel(clients, channel, logf, cleaner); err != nil { + return err + } } channels := clients.Eventing.EventingV1alpha1().Channels(pkgTest.Flags.Namespace) - if err := test.WaitForChannelState(channels, channel.Name, test.IsChannelReady, "ChannelIsReady"); err != nil { - return err + for i, channel := range *chans { + if err := test.WaitForChannelState(channels, channel.Name, test.IsChannelReady, "ChannelIsReady"); err != nil { + return err + } + // Update the given object so they'll reflect the ready state + updatedchannel, err := channels.Get(channel.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updatedchannel.DeepCopyInto(channel) + (*chans)[i] = channel } - // Update the given object so they'll reflect the ready state - updatedchannel, err := channels.Get(channel.Name, metav1.GetOptions{}) - if err != nil { - return err + + for _, sub := range *subs { + if err := CreateSubscription(clients, sub, logf, cleaner); err != nil { + return err + } } - updatedchannel.DeepCopyInto(channel) subscriptions := clients.Eventing.EventingV1alpha1().Subscriptions(pkgTest.Flags.Namespace) - if err = test.WaitForSubscriptionState(subscriptions, sub.Name, test.IsSubscriptionReady, "SubscriptionIsReady"); err != nil { - return err - } - // Update the given object so they'll reflect the ready state - updatedsub, err := subscriptions.Get(sub.Name, metav1.GetOptions{}) - if err != nil { - return err + for i, sub := range *subs { + if err := test.WaitForSubscriptionState(subscriptions, sub.Name, test.IsSubscriptionReady, "SubscriptionIsReady"); err != nil { + return err + } + // Update the given object so they'll reflect the ready state + updatedsub, err := subscriptions.Get(sub.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updatedsub.DeepCopyInto(sub) + (*subs)[i] = sub } - updatedsub.DeepCopyInto(sub) return nil } @@ -326,6 +354,19 @@ func WaitForLogContents(clients *test.Clients, logf logging.FormatLogger, podNam }) } +// WaitForLogContentCount checks if the number of substr occur times equals the given number. +// If the content does not appear the given times it returns error. +func WaitForLogContentCount(client *test.Clients, podName, containerName, content string, appearTimes int) error { + return wait.PollImmediate(interval, timeout, func() (bool, error) { + logs, err := client.Kube.PodLogs(podName, containerName) + if err != nil { + return true, err + } + + return strings.Count(string(logs), content) == appearTimes, nil + }) +} + // FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents. // It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true. func FindAnyLogContents(clients *test.Clients, logf logging.FormatLogger, podName string, containerName string, namespace string, contents []string) (bool, error) { @@ -368,6 +409,7 @@ func LabelNamespace(clients *test.Clients, logf logging.FormatLogger, labels map return err } +// NamespaceExists creates a new namespace if it does not exist func NamespaceExists(t *testing.T, clients *test.Clients, logf logging.FormatLogger) (string, func()) { shutdown := func() {} ns := pkgTest.Flags.Namespace diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index 4e982ac5b00..bede07618c4 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -21,20 +21,13 @@ import ( "fmt" "testing" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/test" pkgTest "github.com/knative/pkg/test" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" ) -const ( - channelName = "e2e-singleevent" - subscriberName = "e2e-singleevent-subscriber" - senderName = "e2e-singleevent-sender" - subscriptionName = "e2e-singleevent-subscription" - routeName = "e2e-singleevent-route" -) - func TestSingleBinaryEvent(t *testing.T) { SingleEvent(t, test.CloudEventEncodingBinary) } @@ -44,6 +37,14 @@ func TestSingleStructuredEvent(t *testing.T) { } func SingleEvent(t *testing.T, encoding string) { + const ( + channelName = "e2e-singleevent" + subscriberName = "e2e-singleevent-subscriber" + senderName = "e2e-singleevent-sender" + subscriptionName = "e2e-singleevent-subscription" + routeName = "e2e-singleevent-route" + ) + clients, cleaner := Setup(t, t.Logf) // verify namespace @@ -88,7 +89,7 @@ func SingleEvent(t *testing.T, encoding string) { sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelName), test.SubscriberSpecForService(routeName), nil) t.Logf("sub: %#v", sub) - if err := WithChannelAndSubscriptionReady(clients, channel, sub, t.Logf, cleaner); err != nil { + if err := WithChannelsAndSubscriptionsReady(clients, &[]*v1alpha1.Channel{channel}, &[]*v1alpha1.Subscription{sub}, t.Logf, cleaner); err != nil { t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err) } diff --git a/test/test_images/logevents/main.go b/test/test_images/logevents/main.go index 06a1c70b3b8..0f12cfd62b6 100644 --- a/test/test_images/logevents/main.go +++ b/test/test_images/logevents/main.go @@ -16,13 +16,15 @@ package main import ( "context" "fmt" + "log" + "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/client" - "log" ) func handler(event cloudevents.Event) { - log.Printf("%s", event) + // MEMO(Chi): Comment out since we want to count the number of event messages in the log, and the following code also prints it out + // log.Printf("%s", event) // TODO: in version 0.5.0 of cloudevents, below can be deleted. ctx := event.Context.AsV02() diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go new file mode 100644 index 00000000000..5e7bfdece3c --- /dev/null +++ b/test/test_images/transformevents/main.go @@ -0,0 +1,70 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + https://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "log" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/client" +) + +type example struct { + ID string `json:"sequence"` + Message string `json:"msg"` +} + +var ( + msgPostfix string +) + +func init() { + flag.StringVar(&msgPostfix, "msg-postfix", "", "The postfix we want to add for the message.") +} + +func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error { + ctx := event.Context.AsV02() + + data := &example{} + if err := event.DataAs(data); err != nil { + fmt.Printf("Got Data Error: %s\n", err.Error()) + return err + } + + data.Message += msgPostfix + log.Printf("[%s] %s %s: %+v", ctx.Time.String(), *ctx.ContentType, ctx.Source.String(), data) + + r := cloudevents.Event{ + Context: event.Context, + Data: data, + } + resp.RespondWith(200, &r) + return nil +} + +func main() { + // parse the command line flags + flag.Parse() + c, err := client.NewDefault() + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + log.Printf("listening on 8080") + log.Printf("msgPostfix: %s", msgPostfix) + log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent)) +} From 4546739ce684dc81a5fc3caa71d1adca5618b792 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Mon, 25 Mar 2019 21:37:18 -0700 Subject: [PATCH 02/20] change comment --- test/test_images/logevents/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_images/logevents/main.go b/test/test_images/logevents/main.go index 0f12cfd62b6..65368d9c5d1 100644 --- a/test/test_images/logevents/main.go +++ b/test/test_images/logevents/main.go @@ -23,7 +23,7 @@ import ( ) func handler(event cloudevents.Event) { - // MEMO(Chi): Comment out since we want to count the number of event messages in the log, and the following code also prints it out + // NOTE(Chi): Comment out since we want to count the number of event messages in the log, and the following code also prints it out // log.Printf("%s", event) // TODO: in version 0.5.0 of cloudevents, below can be deleted. From 13475d2007af116f008650a88b070dd32a0e51f6 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 26 Mar 2019 10:03:07 -0700 Subject: [PATCH 03/20] add default encoding and some comments --- test/crd.go | 1 + test/e2e/channel_chain_test.go | 8 +++++++- test/e2e/complex_scenario_test.go | 11 ++++++++++- test/e2e/single_event_test.go | 6 ++++++ 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/test/crd.go b/test/crd.go index 40a5710ca75..e5195e82d07 100644 --- a/test/crd.go +++ b/test/crd.go @@ -161,6 +161,7 @@ type TypeAndSource struct { const ( CloudEventEncodingBinary = "binary" CloudEventEncodingStructured = "structured" + CloudEventDefaultEncoding = CloudEventEncodingBinary ) // EventSenderPod creates a Pod that sends a single event to the given address. diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 9b9ec1aa5d8..05c67416c36 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -26,6 +26,12 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" ) +/* +TestChannelChain tests the following scenario: + +EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> Service(Logger) + +*/ func TestChannelChain(t *testing.T) { const ( senderName = "e2e-channelchain-sender" @@ -107,7 +113,7 @@ func TestChannelChain(t *testing.T) { Source: senderName, Type: "test.eventing.knative.dev", Data: fmt.Sprintf(`{"msg":%q}`, body), - Encoding: test.CloudEventEncodingBinary, + Encoding: test.CloudEventDefaultEncoding, } url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) pod := test.EventSenderPod(senderName, ns, url, event) diff --git a/test/e2e/complex_scenario_test.go b/test/e2e/complex_scenario_test.go index 18c03ee5c4a..d73f9124957 100644 --- a/test/e2e/complex_scenario_test.go +++ b/test/e2e/complex_scenario_test.go @@ -27,6 +27,15 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" ) +/* +TestComplexScenario tests the following scenario: + +EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ----> Service(Logger) + ^ + | + | + |-----------> Service(Transformation) +*/ func TestComplexScenario(t *testing.T) { const ( senderName = "e2e-complexscen-sender" @@ -124,7 +133,7 @@ func TestComplexScenario(t *testing.T) { Source: senderName, Type: "test.eventing.knative.dev", Data: fmt.Sprintf(`{"msg":%q}`, body), - Encoding: test.CloudEventEncodingBinary, + Encoding: test.CloudEventDefaultEncoding, } url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) pod := test.EventSenderPod(senderName, ns, url, event) diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index bede07618c4..86d81103cf7 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -36,6 +36,12 @@ func TestSingleStructuredEvent(t *testing.T) { SingleEvent(t, test.CloudEventEncodingStructured) } +/* +SingleEvent tests the following scenario: + +EventSource ---> Channel ---> Subscriptions ---> Service(Logger) + +*/ func SingleEvent(t *testing.T, encoding string) { const ( channelName = "e2e-singleevent" From 3542880ff8bb063e2239e54e64c1851312a2bd42 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 26 Mar 2019 10:10:30 -0700 Subject: [PATCH 04/20] fix comment error --- test/e2e/complex_scenario_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/complex_scenario_test.go b/test/e2e/complex_scenario_test.go index d73f9124957..14d0d5202fc 100644 --- a/test/e2e/complex_scenario_test.go +++ b/test/e2e/complex_scenario_test.go @@ -32,8 +32,8 @@ TestComplexScenario tests the following scenario: EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ----> Service(Logger) ^ - | - | + | + | |-----------> Service(Transformation) */ func TestComplexScenario(t *testing.T) { From ad91d202a71bc0a25f1a23844c31ea812701d120 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 26 Mar 2019 10:15:36 -0700 Subject: [PATCH 05/20] rename the test case --- ...{complex_scenario_test.go => event_transformation_test.go} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename test/e2e/{complex_scenario_test.go => event_transformation_test.go} (98%) diff --git a/test/e2e/complex_scenario_test.go b/test/e2e/event_transformation_test.go similarity index 98% rename from test/e2e/complex_scenario_test.go rename to test/e2e/event_transformation_test.go index 14d0d5202fc..8e9c32de27e 100644 --- a/test/e2e/complex_scenario_test.go +++ b/test/e2e/event_transformation_test.go @@ -28,7 +28,7 @@ import ( ) /* -TestComplexScenario tests the following scenario: +TestEventTransformation tests the following scenario: EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ----> Service(Logger) ^ @@ -36,7 +36,7 @@ EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---- | |-----------> Service(Transformation) */ -func TestComplexScenario(t *testing.T) { +func TestEventTransformation(t *testing.T) { const ( senderName = "e2e-complexscen-sender" msgPostfix = "######" From 53283c7a0833d2d1c7446edef4ffd21c77ed4592 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 26 Mar 2019 13:25:12 -0700 Subject: [PATCH 06/20] set up in the first fine --- test/e2e/channel_chain_test.go | 4 ++-- test/e2e/event_transformation_test.go | 4 ++-- test/e2e/single_event_test.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 05c67416c36..7bdd4ef1f15 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -33,6 +33,8 @@ EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> */ func TestChannelChain(t *testing.T) { + clients, cleaner := Setup(t, t.Logf) + const ( senderName = "e2e-channelchain-sender" routeName = "e2e-channelchain-route" @@ -41,8 +43,6 @@ func TestChannelChain(t *testing.T) { var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") - clients, cleaner := Setup(t, t.Logf) - // verify namespace ns, cleanupNS := NamespaceExists(t, clients, t.Logf) defer cleanupNS() diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index 8e9c32de27e..fe5c6f53901 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -37,6 +37,8 @@ EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---- |-----------> Service(Transformation) */ func TestEventTransformation(t *testing.T) { + clients, cleaner := Setup(t, t.Logf) + const ( senderName = "e2e-complexscen-sender" msgPostfix = "######" @@ -47,8 +49,6 @@ func TestEventTransformation(t *testing.T) { var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") - clients, cleaner := Setup(t, t.Logf) - // verify namespace ns, cleanupNS := NamespaceExists(t, clients, t.Logf) defer cleanupNS() diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index 86d81103cf7..f455deb7aa6 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -43,6 +43,8 @@ EventSource ---> Channel ---> Subscriptions ---> Service(Logger) */ func SingleEvent(t *testing.T, encoding string) { + clients, cleaner := Setup(t, t.Logf) + const ( channelName = "e2e-singleevent" subscriberName = "e2e-singleevent-subscriber" @@ -51,8 +53,6 @@ func SingleEvent(t *testing.T, encoding string) { routeName = "e2e-singleevent-route" ) - clients, cleaner := Setup(t, t.Logf) - // verify namespace ns, cleanupNS := NamespaceExists(t, clients, t.Logf) defer cleanupNS() From 15ee257c26200a32405501fdfd9c92187a684696 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 26 Mar 2019 14:22:45 -0700 Subject: [PATCH 07/20] try to find presubmit tests fail reason --- test/e2e/channel_chain_test.go | 129 ---------------------- test/e2e/event_transformation_test.go | 149 -------------------------- 2 files changed, 278 deletions(-) delete mode 100644 test/e2e/channel_chain_test.go delete mode 100644 test/e2e/event_transformation_test.go diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go deleted file mode 100644 index 7bdd4ef1f15..00000000000 --- a/test/e2e/channel_chain_test.go +++ /dev/null @@ -1,129 +0,0 @@ -/* -Copyright 2019 The Knative Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package e2e - -import ( - "fmt" - "testing" - - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/test" - pkgTest "github.com/knative/pkg/test" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" -) - -/* -TestChannelChain tests the following scenario: - -EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> Service(Logger) - -*/ -func TestChannelChain(t *testing.T) { - clients, cleaner := Setup(t, t.Logf) - - const ( - senderName = "e2e-channelchain-sender" - routeName = "e2e-channelchain-route" - ) - var channelNames = [2]string{"e2e-channelchain1", "e2e-channelchain2"} - var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") - var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") - - // verify namespace - ns, cleanupNS := NamespaceExists(t, clients, t.Logf) - defer cleanupNS() - - // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all - // resources in it. So when TearDown() runs, it spews a lot of not found errors. - defer TearDown(clients, cleaner, t.Logf) - - // create subscriberPod and expose it as a service - t.Logf("creating subscriber pod") - selector := map[string]string{"e2etest": string(uuid.NewUUID())} - subscriberPod := test.EventLoggerPod(routeName, ns, selector) - if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger pod: %v", err) - } - if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { - t.Fatalf("Error waiting for logger pod to become running: %v", err) - } - t.Logf("subscriber pod running") - - subscriberSvc := test.Service(routeName, ns, selector) - if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger service: %v", err) - } - - // Reload subscriberPod to get IP - subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Failed to get subscriber pod: %v", err) - } - - // create channels - t.Logf("Creating Channel and Subscription") - if test.EventingFlags.Provisioner == "" { - t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") - } - channels := make([]*v1alpha1.Channel, 0) - for _, channelName := range channelNames { - channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) - t.Logf("channel: %#v", channel) - channels = append(channels, channel) - } - - // create subscriptions - subs := make([]*v1alpha1.Subscription, 0) - // create subscriptions that subscribe the first channel, and reply events directly to the second channel - for _, subscriptionName := range subscriptionNames1 { - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), nil, test.ReplyStrategyForChannel(channelNames[1])) - t.Logf("sub: %#v", sub) - subs = append(subs, sub) - } - // create subscriptions that subscribe the second channel, and call the logging service - for _, subscriptionName := range subscriptionNames2 { - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(routeName), nil) - t.Logf("sub: %#v", sub) - subs = append(subs, sub) - } - - // wait for all channels and subscriptions to become ready - if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil { - t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err) - } - - // create sender pod - t.Logf("Creating event sender") - body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID()) - event := test.CloudEvent{ - Source: senderName, - Type: "test.eventing.knative.dev", - Data: fmt.Sprintf(`{"msg":%q}`, body), - Encoding: test.CloudEventDefaultEncoding, - } - url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) - pod := test.EventSenderPod(senderName, ns, url, event) - t.Logf("sender pod: %#v", pod) - if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event sender pod: %v", err) - } - - // check if the logging service receives the correct number of event messages - if err := WaitForLogContentCount(clients, routeName, subscriberPod.Spec.Containers[0].Name, body, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { - t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, routeName, err) - } -} diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go deleted file mode 100644 index fe5c6f53901..00000000000 --- a/test/e2e/event_transformation_test.go +++ /dev/null @@ -1,149 +0,0 @@ -/* -Copyright 2019 The Knative Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package e2e - -import ( - "fmt" - "testing" - - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/test" - pkgTest "github.com/knative/pkg/test" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" -) - -/* -TestEventTransformation tests the following scenario: - -EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ----> Service(Logger) - ^ - | - | - |-----------> Service(Transformation) -*/ -func TestEventTransformation(t *testing.T) { - clients, cleaner := Setup(t, t.Logf) - - const ( - senderName = "e2e-complexscen-sender" - msgPostfix = "######" - ) - - var channelNames = [2]string{"e2e-complexscen1", "e2e-complexscen2"} - var routeNames = [2]string{"e2e-complexscen-route1", "e2e-complexscen-route2"} - var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") - var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") - - // verify namespace - ns, cleanupNS := NamespaceExists(t, clients, t.Logf) - defer cleanupNS() - - // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all - // resources in it. So when TearDown() runs, it spews a lot of not found errors. - defer TearDown(clients, cleaner, t.Logf) - - // create subscriberPods and expose them as services - t.Logf("creating subscriber pods") - - subscriberPods := make([]*corev1.Pod, 0) - for i, routeName := range routeNames { - selector := map[string]string{"e2etest": string(uuid.NewUUID())} - var subscriberPod *corev1.Pod - // create the first subscriber for the event transformation, and the second for the logging - if i == 0 { - subscriberPod = test.EventTransformationPod(routeName, ns, selector, msgPostfix) - } else if i == 1 { - subscriberPod = test.EventLoggerPod(routeName, ns, selector) - } - - if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger pod: %v", err) - } - if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { - t.Fatalf("Error waiting for logger pod to become running: %v", err) - } - t.Logf("subscriber pod running") - - subscriberSvc := test.Service(routeName, ns, selector) - if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger service: %v", err) - } - - // Reload subscriberPod to get IP - subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Failed to get subscriber pod: %v", err) - } - - subscriberPods = append(subscriberPods, subscriberPod) - } - - // create channels - t.Logf("Creating Channel and Subscription") - if test.EventingFlags.Provisioner == "" { - t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") - } - channels := make([]*v1alpha1.Channel, 0) - for _, channelName := range channelNames { - channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) - t.Logf("channel: %#v", channel) - - channels = append(channels, channel) - } - - // create subscriptions - subs := make([]*v1alpha1.Subscription, 0) - // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward to the second channel - for _, subscriptionName := range subscriptionNames1 { - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), test.SubscriberSpecForService(routeNames[0]), test.ReplyStrategyForChannel(channelNames[1])) - t.Logf("sub: %#v", sub) - subs = append(subs, sub) - } - // create subscriptions that subscribe the second channel, and call the logging service - for _, subscriptionName := range subscriptionNames2 { - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(routeNames[1]), nil) - t.Logf("sub: %#v", sub) - subs = append(subs, sub) - } - - // wait for all channels and subscriptions to become ready - if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil { - t.Fatalf("The Channels or Subscription were not marked as Ready: %v", err) - } - - // create sender pod - t.Logf("Creating event sender") - body := fmt.Sprintf("TestComplexScenario %s", uuid.NewUUID()) - event := test.CloudEvent{ - Source: senderName, - Type: "test.eventing.knative.dev", - Data: fmt.Sprintf(`{"msg":%q}`, body), - Encoding: test.CloudEventDefaultEncoding, - } - url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) - pod := test.EventSenderPod(senderName, ns, url, event) - t.Logf("sender pod: %#v", pod) - if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event sender pod: %v", err) - } - - // check if the logging service receives the correct number of event messages - if err := WaitForLogContentCount(clients, subscriberPods[1].Name, subscriberPods[1].Spec.Containers[0].Name, body+msgPostfix, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { - t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, subscriberPods[1].Name, err) - } -} From 3b1acb2eec508a7489569fc1e7862544909c2468 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 26 Mar 2019 14:52:20 -0700 Subject: [PATCH 08/20] add back the e2e test cases --- test/e2e/channel_chain_test.go | 129 ++++++++++++++++++++++ test/e2e/event_transformation_test.go | 149 ++++++++++++++++++++++++++ 2 files changed, 278 insertions(+) create mode 100644 test/e2e/channel_chain_test.go create mode 100644 test/e2e/event_transformation_test.go diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go new file mode 100644 index 00000000000..7bdd4ef1f15 --- /dev/null +++ b/test/e2e/channel_chain_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/test" + pkgTest "github.com/knative/pkg/test" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" +) + +/* +TestChannelChain tests the following scenario: + +EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> Service(Logger) + +*/ +func TestChannelChain(t *testing.T) { + clients, cleaner := Setup(t, t.Logf) + + const ( + senderName = "e2e-channelchain-sender" + routeName = "e2e-channelchain-route" + ) + var channelNames = [2]string{"e2e-channelchain1", "e2e-channelchain2"} + var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") + var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") + + // verify namespace + ns, cleanupNS := NamespaceExists(t, clients, t.Logf) + defer cleanupNS() + + // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all + // resources in it. So when TearDown() runs, it spews a lot of not found errors. + defer TearDown(clients, cleaner, t.Logf) + + // create subscriberPod and expose it as a service + t.Logf("creating subscriber pod") + selector := map[string]string{"e2etest": string(uuid.NewUUID())} + subscriberPod := test.EventLoggerPod(routeName, ns, selector) + if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event logger pod: %v", err) + } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + t.Fatalf("Error waiting for logger pod to become running: %v", err) + } + t.Logf("subscriber pod running") + + subscriberSvc := test.Service(routeName, ns, selector) + if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event logger service: %v", err) + } + + // Reload subscriberPod to get IP + subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get subscriber pod: %v", err) + } + + // create channels + t.Logf("Creating Channel and Subscription") + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + channels := make([]*v1alpha1.Channel, 0) + for _, channelName := range channelNames { + channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) + t.Logf("channel: %#v", channel) + channels = append(channels, channel) + } + + // create subscriptions + subs := make([]*v1alpha1.Subscription, 0) + // create subscriptions that subscribe the first channel, and reply events directly to the second channel + for _, subscriptionName := range subscriptionNames1 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), nil, test.ReplyStrategyForChannel(channelNames[1])) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + // create subscriptions that subscribe the second channel, and call the logging service + for _, subscriptionName := range subscriptionNames2 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(routeName), nil) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + + // wait for all channels and subscriptions to become ready + if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil { + t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err) + } + + // create sender pod + t.Logf("Creating event sender") + body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID()) + event := test.CloudEvent{ + Source: senderName, + Type: "test.eventing.knative.dev", + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventDefaultEncoding, + } + url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) + pod := test.EventSenderPod(senderName, ns, url, event) + t.Logf("sender pod: %#v", pod) + if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event sender pod: %v", err) + } + + // check if the logging service receives the correct number of event messages + if err := WaitForLogContentCount(clients, routeName, subscriberPod.Spec.Containers[0].Name, body, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { + t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, routeName, err) + } +} diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go new file mode 100644 index 00000000000..fe5c6f53901 --- /dev/null +++ b/test/e2e/event_transformation_test.go @@ -0,0 +1,149 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/test" + pkgTest "github.com/knative/pkg/test" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" +) + +/* +TestEventTransformation tests the following scenario: + +EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ----> Service(Logger) + ^ + | + | + |-----------> Service(Transformation) +*/ +func TestEventTransformation(t *testing.T) { + clients, cleaner := Setup(t, t.Logf) + + const ( + senderName = "e2e-complexscen-sender" + msgPostfix = "######" + ) + + var channelNames = [2]string{"e2e-complexscen1", "e2e-complexscen2"} + var routeNames = [2]string{"e2e-complexscen-route1", "e2e-complexscen-route2"} + var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") + var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") + + // verify namespace + ns, cleanupNS := NamespaceExists(t, clients, t.Logf) + defer cleanupNS() + + // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all + // resources in it. So when TearDown() runs, it spews a lot of not found errors. + defer TearDown(clients, cleaner, t.Logf) + + // create subscriberPods and expose them as services + t.Logf("creating subscriber pods") + + subscriberPods := make([]*corev1.Pod, 0) + for i, routeName := range routeNames { + selector := map[string]string{"e2etest": string(uuid.NewUUID())} + var subscriberPod *corev1.Pod + // create the first subscriber for the event transformation, and the second for the logging + if i == 0 { + subscriberPod = test.EventTransformationPod(routeName, ns, selector, msgPostfix) + } else if i == 1 { + subscriberPod = test.EventLoggerPod(routeName, ns, selector) + } + + if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event logger pod: %v", err) + } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + t.Fatalf("Error waiting for logger pod to become running: %v", err) + } + t.Logf("subscriber pod running") + + subscriberSvc := test.Service(routeName, ns, selector) + if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event logger service: %v", err) + } + + // Reload subscriberPod to get IP + subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get subscriber pod: %v", err) + } + + subscriberPods = append(subscriberPods, subscriberPod) + } + + // create channels + t.Logf("Creating Channel and Subscription") + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + channels := make([]*v1alpha1.Channel, 0) + for _, channelName := range channelNames { + channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) + t.Logf("channel: %#v", channel) + + channels = append(channels, channel) + } + + // create subscriptions + subs := make([]*v1alpha1.Subscription, 0) + // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward to the second channel + for _, subscriptionName := range subscriptionNames1 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), test.SubscriberSpecForService(routeNames[0]), test.ReplyStrategyForChannel(channelNames[1])) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + // create subscriptions that subscribe the second channel, and call the logging service + for _, subscriptionName := range subscriptionNames2 { + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(routeNames[1]), nil) + t.Logf("sub: %#v", sub) + subs = append(subs, sub) + } + + // wait for all channels and subscriptions to become ready + if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil { + t.Fatalf("The Channels or Subscription were not marked as Ready: %v", err) + } + + // create sender pod + t.Logf("Creating event sender") + body := fmt.Sprintf("TestComplexScenario %s", uuid.NewUUID()) + event := test.CloudEvent{ + Source: senderName, + Type: "test.eventing.knative.dev", + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventDefaultEncoding, + } + url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) + pod := test.EventSenderPod(senderName, ns, url, event) + t.Logf("sender pod: %#v", pod) + if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to create event sender pod: %v", err) + } + + // check if the logging service receives the correct number of event messages + if err := WaitForLogContentCount(clients, subscriberPods[1].Name, subscriberPods[1].Spec.Containers[0].Name, body+msgPostfix, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { + t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, subscriberPods[1].Name, err) + } +} From 94ff59cd264a75ab83414fe39f4cbbbca14933a3 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 26 Mar 2019 15:07:07 -0700 Subject: [PATCH 09/20] add +build e2e for the test cases --- test/e2e/channel_chain_test.go | 2 ++ test/e2e/event_transformation_test.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 7bdd4ef1f15..8c2833cbaba 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -1,3 +1,5 @@ +// +build e2e + /* Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index fe5c6f53901..6e95aab2732 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -1,3 +1,5 @@ +// +build e2e + /* Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); From dc3aaef216fa316366445d7e8346acbf0b0579fb Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 26 Mar 2019 16:03:01 -0700 Subject: [PATCH 10/20] change timeout to 2mins --- test/e2e/e2e.go | 2 +- test/e2e/event_transformation_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index f072ea40303..3364edc8ad2 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -44,7 +44,7 @@ const ( DefaultTestNamespace = "e2etest-knative-eventing" interval = 1 * time.Second - timeout = 1 * time.Minute + timeout = 2 * time.Minute // the minimum and maxmium number of subscriptions we generate in the e2e tests minSubCount = 1 diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index 6e95aab2732..34f6600b330 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -146,6 +146,6 @@ func TestEventTransformation(t *testing.T) { // check if the logging service receives the correct number of event messages if err := WaitForLogContentCount(clients, subscriberPods[1].Name, subscriberPods[1].Spec.Containers[0].Name, body+msgPostfix, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { - t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, subscriberPods[1].Name, err) + t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body+msgPostfix, subscriberPods[1].Name, err) } } From 5c5475d6e4d0915e82038424e872087c9decdfb5 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Fri, 29 Mar 2019 12:28:32 -0700 Subject: [PATCH 11/20] fix code review problems --- default.profraw | Bin 0 -> 2056 bytes test/e2e/broker_trigger_test.go | 2 +- test/e2e/channel_chain_test.go | 10 +++++++--- test/e2e/e2e.go | 23 +++-------------------- test/e2e/event_transformation_test.go | 18 ++++++++++++------ test/e2e/single_event_test.go | 2 +- test/test_images/logevents/main.go | 6 +++--- test/test_images/transformevents/main.go | 4 +++- 8 files changed, 30 insertions(+), 35 deletions(-) create mode 100644 default.profraw diff --git a/default.profraw b/default.profraw new file mode 100644 index 0000000000000000000000000000000000000000..ccf66ff616ed0dcb3b35f3be1a585e079e19f2af GIT binary patch literal 2056 zcmZoHO3N=Q$obF000E*8oX!mff-#9il}-S~Az_DR1x zk}!1wCJ=QHRSXQAP=q5h`* zjH9VG56w;_#Vm6_awXUSs$Rex)jhEA{3Di<*2a$+tj8g^`es-IwmYCb#E0T&kR*-^>w ze6=ZE<7&lkk?Sv@>K$ybt6%^3sMy&huf%KGrZ#wQz1yi!22qz_i>jUvYQC42g7o75 zhh1R)od8v@;EJjq=3k!eb`M;Ry`O&d?Z1}|^TRb~K-D*(sfUH*0(VV!%gNxw+RQ&>XRP`|P1qJS2^!-%eyf+~FQPVT0#s^UK z3?8WJg`p0R@jZOd;Hr6`1e9M<4pDc(8&y5bzuhyMx>}!io`$J!fU5t1L%oDc<>sx9 zlRv=JcR3h=VCpA8)f?bY-{SBx=7H%PGno1rQ1t;g)WiG%3x9MP zCQpb)*9YUn^uhQrnhDB)$)odO;^;I?KROLlhb}%;_~`D3@nJN~ALx9TIH@$u9CR9H z4on=zCq$#Ghw;&Aa?OG1gP8~86QW`1ijcZx%x4%YZcXhw?Z<4$<2qG-m4K*p*ntfT8<+-FE5!*aO_uSzYFl>DFy|qHN*Zx(< zBEyQK@0soLJF@TZU;AluasBUi#(^_pbc}s>c*oh=-F?1Z{){;DU8T2*Q@aj?%%A&Y q_Q7K;QWEW}g_?HRq~tbeol@#_(dun=X{s- Channel ---> Subscriptions ---> Channel ---> Subscriptions ----> Service(Logger) ^ + | e2 | - | - |-----------> Service(Transformation) + |-----------> Service(Transformation) + e1 */ func TestEventTransformation(t *testing.T) { clients, cleaner := Setup(t, t.Logf) @@ -48,11 +50,11 @@ func TestEventTransformation(t *testing.T) { var channelNames = [2]string{"e2e-complexscen1", "e2e-complexscen2"} var routeNames = [2]string{"e2e-complexscen-route1", "e2e-complexscen-route2"} - var subscriptionNames1 = CreateRandomSubscriptionNames("e2e-complexscen-subs1") - var subscriptionNames2 = CreateRandomSubscriptionNames("e2e-complexscen-subs2") + var subscriptionNames1 = []string{"e2e-complexscen-subs11"} + var subscriptionNames2 = []string{"e2e-complexscen-subs21", "e2e-complexscen-subs22"} // verify namespace - ns, cleanupNS := NamespaceExists(t, clients, t.Logf) + ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) defer cleanupNS() // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all @@ -110,7 +112,7 @@ func TestEventTransformation(t *testing.T) { // create subscriptions subs := make([]*v1alpha1.Subscription, 0) - // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward to the second channel + // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward the transformed events to the second channel for _, subscriptionName := range subscriptionNames1 { sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), test.SubscriberSpecForService(routeNames[0]), test.ReplyStrategyForChannel(channelNames[1])) t.Logf("sub: %#v", sub) @@ -143,6 +145,10 @@ func TestEventTransformation(t *testing.T) { if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { t.Fatalf("Failed to create event sender pod: %v", err) } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + t.Fatalf("Error waiting for sender pod to become running: %v", err) + } + t.Logf("sender pod running") // check if the logging service receives the correct number of event messages if err := WaitForLogContentCount(clients, subscriberPods[1].Name, subscriberPods[1].Spec.Containers[0].Name, body+msgPostfix, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index f455deb7aa6..672566550e2 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -54,7 +54,7 @@ func SingleEvent(t *testing.T, encoding string) { ) // verify namespace - ns, cleanupNS := NamespaceExists(t, clients, t.Logf) + ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) defer cleanupNS() // TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all diff --git a/test/test_images/logevents/main.go b/test/test_images/logevents/main.go index 65368d9c5d1..9e891509b50 100644 --- a/test/test_images/logevents/main.go +++ b/test/test_images/logevents/main.go @@ -3,7 +3,9 @@ Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://www.apache.org/licenses/LICENSE-2.0 + + https://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,8 +25,6 @@ import ( ) func handler(event cloudevents.Event) { - // NOTE(Chi): Comment out since we want to count the number of event messages in the log, and the following code also prints it out - // log.Printf("%s", event) // TODO: in version 0.5.0 of cloudevents, below can be deleted. ctx := event.Context.AsV02() diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go index 5e7bfdece3c..76e79e99612 100644 --- a/test/test_images/transformevents/main.go +++ b/test/test_images/transformevents/main.go @@ -3,7 +3,9 @@ Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://www.apache.org/licenses/LICENSE-2.0 + + https://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. From 7d059a0a2f571b99c9290116552666eab7cec44d Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Fri, 29 Mar 2019 14:11:37 -0700 Subject: [PATCH 12/20] refactor common functions like CreatePodAndServiceReady and SendFakeEventToChannel --- test/crd.go | 1 + test/e2e/channel_chain_test.go | 40 ++----------------- test/e2e/e2e.go | 48 +++++++++++++++++++++++ test/e2e/event_transformation_test.go | 56 ++++++--------------------- test/e2e/single_event_test.go | 36 ++--------------- 5 files changed, 69 insertions(+), 112 deletions(-) diff --git a/test/crd.go b/test/crd.go index e5195e82d07..4d593db428f 100644 --- a/test/crd.go +++ b/test/crd.go @@ -162,6 +162,7 @@ const ( CloudEventEncodingBinary = "binary" CloudEventEncodingStructured = "structured" CloudEventDefaultEncoding = CloudEventEncodingBinary + CloudEventDefaultType = "test.eventing.knative.dev" ) // EventSenderPod creates a Pod that sends a single event to the given address. diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 9076a6c8af9..4bac4c2dd3c 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -23,8 +23,6 @@ import ( "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/test" - pkgTest "github.com/knative/pkg/test" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" ) @@ -57,23 +55,9 @@ func TestChannelChain(t *testing.T) { t.Logf("creating subscriber pod") selector := map[string]string{"e2etest": string(uuid.NewUUID())} subscriberPod := test.EventLoggerPod(routeName, ns, selector) - if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger pod: %v", err) - } - if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { - t.Fatalf("Error waiting for logger pod to become running: %v", err) - } - t.Logf("subscriber pod running") - - subscriberSvc := test.Service(routeName, ns, selector) - if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger service: %v", err) - } - - // Reload subscriberPod to get IP - subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) + subscriberPod, err := CreatePodAndServiceReady(clients, subscriberPod, routeName, ns, selector, t.Logf, cleaner) if err != nil { - t.Fatalf("Failed to get subscriber pod: %v", err) + t.Fatalf("Failed to create subscriber pod and service, and get them ready: %v", err) } // create channels @@ -108,25 +92,9 @@ func TestChannelChain(t *testing.T) { t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err) } - // create sender pod - t.Logf("Creating event sender") + // send fake CloudEvent to the first channel body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID()) - event := test.CloudEvent{ - Source: senderName, - Type: "test.eventing.knative.dev", - Data: fmt.Sprintf(`{"msg":%q}`, body), - Encoding: test.CloudEventDefaultEncoding, - } - url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) - pod := test.EventSenderPod(senderName, ns, url, event) - t.Logf("sender pod: %#v", pod) - if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event sender pod: %v", err) - } - if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { - t.Fatalf("Error waiting for sender pod to become running: %v", err) - } - t.Logf("sender pod running") + SendFakeEventToChannel(clients, senderName, body, test.CloudEventDefaultType, test.CloudEventDefaultEncoding, channels[0], ns, t.Logf, cleaner) // check if the logging service receives the correct number of event messages if err := WaitForLogContentCount(clients, routeName, subscriberPod.Spec.Containers[0].Name, body, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 47d1d5905c1..596ba338a02 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -29,6 +29,7 @@ import ( "github.com/knative/pkg/test/logging" servingV1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -296,6 +297,30 @@ func CreateServiceAccountAndBinding(clients *test.Clients, name string, logf log return nil } +// CreatePodAndServiceReady will create a Pod and Service, and wait for them to become ready +func CreatePodAndServiceReady(clients *test.Clients, pod *corev1.Pod, routeName string, ns string, selector map[string]string, logf logging.FormatLogger, cleaner *test.Cleaner) (*v1.Pod, error) { + if err := CreatePod(clients, pod, logf, cleaner); err != nil { + return nil, fmt.Errorf("Failed to create pod: %v", err) + } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + return nil, fmt.Errorf("Error waiting for pod to become running: %v", err) + } + logf("pod running") + + svc := test.Service(routeName, ns, selector) + if err := CreateService(clients, svc, logf, cleaner); err != nil { + return nil, fmt.Errorf("Failed to create service: %v", err) + } + + // Reload pod to get IP + pod, err := clients.Kube.Kube.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("Failed to get pod: %v", err) + } + + return pod, nil +} + // CreateService will create a Service func CreateService(clients *test.Clients, svc *corev1.Service, _ logging.FormatLogger, cleaner *test.Cleaner) error { svcs := clients.Kube.Kube.CoreV1().Services(svc.GetNamespace()) @@ -317,6 +342,29 @@ func CreatePod(clients *test.Clients, pod *corev1.Pod, _ logging.FormatLogger, c return nil } +// SendFakeEventToChannel will create fake CloudEvent and send it to the given channel +func SendFakeEventToChannel(clients *test.Clients, senderName string, body string, eventType string, encoding string, channel *v1alpha1.Channel, ns string, logf logging.FormatLogger, cleaner *test.Cleaner) error { + logf("Sending fake CloudEvent") + logf("Creating event sender pod") + event := test.CloudEvent{ + Source: senderName, + Type: eventType, + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventDefaultEncoding, + } + url := fmt.Sprintf("http://%s", channel.Status.Address.Hostname) + pod := test.EventSenderPod(senderName, ns, url, event) + logf("sender pod: %#v", pod) + if err := CreatePod(clients, pod, logf, cleaner); err != nil { + return err + } + if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { + return err + } + logf("sender pod running") + return nil +} + // WaitForLogContents waits until logs for given Pod/Container include the given contents. // If the contents are not present within timeout it returns error. func WaitForLogContents(clients *test.Clients, logf logging.FormatLogger, podName string, containerName string, namespace string, contents []string) error { diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index 7d02614fbf5..f8bcd943572 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -23,22 +23,20 @@ import ( "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/test" - pkgTest "github.com/knative/pkg/test" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" ) /* TestEventTransformation tests the following scenario: - e1 e2 -EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ----> Service(Logger) - ^ - | e2 - | - |-----------> Service(Transformation) - e1 + 1 2 5 6 7 +EventSource ---> Channel ---> Subscription ---> Channel ---> Subscription ----> Service(Logger) + | ^ + 3 | | 4 + | | + | --------- + -----------> Service(Transformation) */ func TestEventTransformation(t *testing.T) { clients, cleaner := Setup(t, t.Logf) @@ -75,23 +73,9 @@ func TestEventTransformation(t *testing.T) { subscriberPod = test.EventLoggerPod(routeName, ns, selector) } - if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger pod: %v", err) - } - if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { - t.Fatalf("Error waiting for logger pod to become running: %v", err) - } - t.Logf("subscriber pod running") - - subscriberSvc := test.Service(routeName, ns, selector) - if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger service: %v", err) - } - - // Reload subscriberPod to get IP - subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) + subscriberPod, err := CreatePodAndServiceReady(clients, subscriberPod, routeName, ns, selector, t.Logf, cleaner) if err != nil { - t.Fatalf("Failed to get subscriber pod: %v", err) + t.Fatalf("Failed to create subscriber pod and service, and get them ready: %v", err) } subscriberPods = append(subscriberPods, subscriberPod) @@ -130,25 +114,9 @@ func TestEventTransformation(t *testing.T) { t.Fatalf("The Channels or Subscription were not marked as Ready: %v", err) } - // create sender pod - t.Logf("Creating event sender") - body := fmt.Sprintf("TestComplexScenario %s", uuid.NewUUID()) - event := test.CloudEvent{ - Source: senderName, - Type: "test.eventing.knative.dev", - Data: fmt.Sprintf(`{"msg":%q}`, body), - Encoding: test.CloudEventDefaultEncoding, - } - url := fmt.Sprintf("http://%s", channels[0].Status.Address.Hostname) - pod := test.EventSenderPod(senderName, ns, url, event) - t.Logf("sender pod: %#v", pod) - if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event sender pod: %v", err) - } - if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { - t.Fatalf("Error waiting for sender pod to become running: %v", err) - } - t.Logf("sender pod running") + // send fake CloudEvent to the first channel + body := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID()) + SendFakeEventToChannel(clients, senderName, body, test.CloudEventDefaultType, test.CloudEventDefaultEncoding, channels[0], ns, t.Logf, cleaner) // check if the logging service receives the correct number of event messages if err := WaitForLogContentCount(clients, subscriberPods[1].Name, subscriberPods[1].Spec.Containers[0].Name, body+msgPostfix, len(subscriptionNames1)*len(subscriptionNames2)); err != nil { diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index 672566550e2..ed1b88d2a36 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -24,7 +24,6 @@ import ( "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/test" pkgTest "github.com/knative/pkg/test" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" ) @@ -65,23 +64,9 @@ func SingleEvent(t *testing.T, encoding string) { t.Logf("creating subscriber pod") selector := map[string]string{"e2etest": string(uuid.NewUUID())} subscriberPod := test.EventLoggerPod(routeName, ns, selector) - if err := CreatePod(clients, subscriberPod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger pod: %v", err) - } - if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil { - t.Fatalf("Error waiting for logger pod to become running: %v", err) - } - t.Logf("subscriber pod running") - - subscriberSvc := test.Service(routeName, ns, selector) - if err := CreateService(clients, subscriberSvc, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event logger service: %v", err) - } - - // Reload subscriberPod to get IP - subscriberPod, err := clients.Kube.Kube.CoreV1().Pods(subscriberPod.Namespace).Get(subscriberPod.Name, metav1.GetOptions{}) + subscriberPod, err := CreatePodAndServiceReady(clients, subscriberPod, routeName, ns, selector, t.Logf, cleaner) if err != nil { - t.Fatalf("Failed to get subscriber pod: %v", err) + t.Fatalf("Failed to create subscriber pod and service, and get them ready: %v", err) } // create channel @@ -99,22 +84,9 @@ func SingleEvent(t *testing.T, encoding string) { t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err) } - // create sender pod - - t.Logf("Creating event sender") + // send fake CloudEvent to the first channel body := fmt.Sprintf("TestSingleEvent %s", uuid.NewUUID()) - event := test.CloudEvent{ - Source: senderName, - Type: "test.eventing.knative.dev", - Data: fmt.Sprintf(`{"msg":%q}`, body), - Encoding: encoding, - } - url := fmt.Sprintf("http://%s", channel.Status.Address.Hostname) - pod := test.EventSenderPod(senderName, ns, url, event) - t.Logf("sender pod: %#v", pod) - if err := CreatePod(clients, pod, t.Logf, cleaner); err != nil { - t.Fatalf("Failed to create event sender pod: %v", err) - } + SendFakeEventToChannel(clients, senderName, body, test.CloudEventDefaultType, test.CloudEventDefaultEncoding, channel, ns, t.Logf, cleaner) if err := pkgTest.WaitForLogContent(clients.Kube, routeName, subscriberPod.Spec.Containers[0].Name, body); err != nil { clients.Kube.PodLogs(senderName, "sendevent") From 7a7ad80b175fed7ab2459af2d1520c59f501bf7c Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Mon, 1 Apr 2019 14:10:24 -0700 Subject: [PATCH 13/20] modify the initialization sequence a little bit --- test/e2e/channel_chain_test.go | 5 ++--- test/e2e/event_transformation_test.go | 4 +--- test/e2e/single_event_test.go | 3 +-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 4bac4c2dd3c..622ad16e5e8 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -32,9 +32,7 @@ TestChannelChain tests the following scenario: EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> Service(Logger) */ -func TestChannelChain(t *testing.T) { - clients, cleaner := Setup(t, t.Logf) - +func TestChannelChain(t *testing.T) {\ const ( senderName = "e2e-channelchain-sender" routeName = "e2e-channelchain-route" @@ -43,6 +41,7 @@ func TestChannelChain(t *testing.T) { var subscriptionNames1 = [2]string{"e2e-complexscen-subs11", "e2e-complexscen-subs12"} var subscriptionNames2 = [1]string{"e2e-complexscen-subs21"} + clients, cleaner := Setup(t, t.Logf) // verify namespace ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) defer cleanupNS() diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index f8bcd943572..5fe6a5a6116 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -39,18 +39,16 @@ EventSource ---> Channel ---> Subscription ---> Channel ---> Subscription ----> -----------> Service(Transformation) */ func TestEventTransformation(t *testing.T) { - clients, cleaner := Setup(t, t.Logf) - const ( senderName = "e2e-complexscen-sender" msgPostfix = "######" ) - var channelNames = [2]string{"e2e-complexscen1", "e2e-complexscen2"} var routeNames = [2]string{"e2e-complexscen-route1", "e2e-complexscen-route2"} var subscriptionNames1 = []string{"e2e-complexscen-subs11"} var subscriptionNames2 = []string{"e2e-complexscen-subs21", "e2e-complexscen-subs22"} + clients, cleaner := Setup(t, t.Logf) // verify namespace ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) defer cleanupNS() diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index ed1b88d2a36..9fc465f80ea 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -42,8 +42,6 @@ EventSource ---> Channel ---> Subscriptions ---> Service(Logger) */ func SingleEvent(t *testing.T, encoding string) { - clients, cleaner := Setup(t, t.Logf) - const ( channelName = "e2e-singleevent" subscriberName = "e2e-singleevent-subscriber" @@ -52,6 +50,7 @@ func SingleEvent(t *testing.T, encoding string) { routeName = "e2e-singleevent-route" ) + clients, cleaner := Setup(t, t.Logf) // verify namespace ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf) defer cleanupNS() From 94398030b4ef9e3ae5c267dc36f84e12c0416290 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Mon, 1 Apr 2019 14:24:09 -0700 Subject: [PATCH 14/20] resolve build failure --- test/e2e/channel_chain_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 622ad16e5e8..93e6e2deb5b 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -32,7 +32,7 @@ TestChannelChain tests the following scenario: EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> Service(Logger) */ -func TestChannelChain(t *testing.T) {\ +func TestChannelChain(t *testing.T) { const ( senderName = "e2e-channelchain-sender" routeName = "e2e-channelchain-route" From 5c157a4f54d448834167a3bb9f48836f8401749f Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Mon, 1 Apr 2019 21:13:41 -0700 Subject: [PATCH 15/20] solve code review issues --- default.profraw | Bin 2056 -> 0 bytes test/e2e/channel_chain_test.go | 9 ++++++--- test/e2e/e2e.go | 8 ++++---- test/e2e/event_transformation_test.go | 10 +++++++--- test/e2e/single_event_test.go | 6 ++++-- 5 files changed, 21 insertions(+), 12 deletions(-) delete mode 100644 default.profraw diff --git a/default.profraw b/default.profraw deleted file mode 100644 index ccf66ff616ed0dcb3b35f3be1a585e079e19f2af..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2056 zcmZoHO3N=Q$obF000E*8oX!mff-#9il}-S~Az_DR1x zk}!1wCJ=QHRSXQAP=q5h`* zjH9VG56w;_#Vm6_awXUSs$Rex)jhEA{3Di<*2a$+tj8g^`es-IwmYCb#E0T&kR*-^>w ze6=ZE<7&lkk?Sv@>K$ybt6%^3sMy&huf%KGrZ#wQz1yi!22qz_i>jUvYQC42g7o75 zhh1R)od8v@;EJjq=3k!eb`M;Ry`O&d?Z1}|^TRb~K-D*(sfUH*0(VV!%gNxw+RQ&>XRP`|P1qJS2^!-%eyf+~FQPVT0#s^UK z3?8WJg`p0R@jZOd;Hr6`1e9M<4pDc(8&y5bzuhyMx>}!io`$J!fU5t1L%oDc<>sx9 zlRv=JcR3h=VCpA8)f?bY-{SBx=7H%PGno1rQ1t;g)WiG%3x9MP zCQpb)*9YUn^uhQrnhDB)$)odO;^;I?KROLlhb}%;_~`D3@nJN~ALx9TIH@$u9CR9H z4on=zCq$#Ghw;&Aa?OG1gP8~86QW`1ijcZx%x4%YZcXhw?Z<4$<2qG-m4K*p*ntfT8<+-FE5!*aO_uSzYFl>DFy|qHN*Zx(< zBEyQK@0soLJF@TZU;AluasBUi#(^_pbc}s>c*oh=-F?1Z{){;DU8T2*Q@aj?%%A&Y q_Q7K;QWEW}g_?HRq~tbeol@#_(dun=X{s- Channel ---> Subscriptions ---> Service(Logger) +EventSource ---> Channel ---> Subscription ---> Service(Logger) */ func SingleEvent(t *testing.T, encoding string) { @@ -85,7 +85,9 @@ func SingleEvent(t *testing.T, encoding string) { // send fake CloudEvent to the first channel body := fmt.Sprintf("TestSingleEvent %s", uuid.NewUUID()) - SendFakeEventToChannel(clients, senderName, body, test.CloudEventDefaultType, test.CloudEventDefaultEncoding, channel, ns, t.Logf, cleaner) + if err := SendFakeEventToChannel(clients, senderName, body, test.CloudEventDefaultType, test.CloudEventDefaultEncoding, channel, ns, t.Logf, cleaner); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the channel %q", channel.Name) + } if err := pkgTest.WaitForLogContent(clients.Kube, routeName, subscriberPod.Spec.Containers[0].Name, body); err != nil { clients.Kube.PodLogs(senderName, "sendevent") From 24a4146866a868e3d5bdb637fcf51d10434cbac8 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Mon, 1 Apr 2019 21:26:46 -0700 Subject: [PATCH 16/20] move provisioner check ahead --- test/e2e/channel_chain_test.go | 7 ++++--- test/e2e/event_transformation_test.go | 7 ++++--- test/e2e/single_event_test.go | 7 ++++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index b9dc4228af1..a0f2cf792c5 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -33,6 +33,10 @@ EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> */ func TestChannelChain(t *testing.T) { + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + const ( senderName = "e2e-channelchain-sender" routeName = "e2e-channelchain-route" @@ -61,9 +65,6 @@ func TestChannelChain(t *testing.T) { // create channels t.Logf("Creating Channel and Subscription") - if test.EventingFlags.Provisioner == "" { - t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") - } channels := make([]*v1alpha1.Channel, 0) for _, channelName := range channelNames { channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index 967a85577fa..945013087c7 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -39,6 +39,10 @@ EventSource ---> Channel ---> Subscription ---> Channel ---> Subscription ----> -----------> Service(Transformation) */ func TestEventTransformation(t *testing.T) { + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + const ( senderName = "e2e-complexscen-sender" msgPostfix = "######" @@ -81,9 +85,6 @@ func TestEventTransformation(t *testing.T) { // create channels t.Logf("Creating Channel and Subscription") - if test.EventingFlags.Provisioner == "" { - t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") - } channels := make([]*v1alpha1.Channel, 0) for _, channelName := range channelNames { channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index e9ef8f3fdb9..a7babca99fb 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -42,6 +42,10 @@ EventSource ---> Channel ---> Subscription ---> Service(Logger) */ func SingleEvent(t *testing.T, encoding string) { + if test.EventingFlags.Provisioner == "" { + t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") + } + const ( channelName = "e2e-singleevent" subscriberName = "e2e-singleevent-subscriber" @@ -71,9 +75,6 @@ func SingleEvent(t *testing.T, encoding string) { // create channel t.Logf("Creating Channel and Subscription") - if test.EventingFlags.Provisioner == "" { - t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") - } channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) t.Logf("channel: %#v", channel) sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelName), test.SubscriberSpecForService(routeName), nil) From bef2c439c76e6d132675c03c7b6c3dce546c9cca Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Mon, 1 Apr 2019 21:40:33 -0700 Subject: [PATCH 17/20] remove duplicate import --- test/e2e/e2e.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 811298c7a0c..1c7b4d1c14f 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -29,7 +29,6 @@ import ( "github.com/knative/pkg/test/logging" servingV1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -298,7 +297,7 @@ func CreateServiceAccountAndBinding(clients *test.Clients, name string, logf log } // CreatePodAndServiceReady will create a Pod and Service, and wait for them to become ready -func CreatePodAndServiceReady(clients *test.Clients, pod *corev1.Pod, routeName string, ns string, selector map[string]string, logf logging.FormatLogger, cleaner *test.Cleaner) (*v1.Pod, error) { +func CreatePodAndServiceReady(clients *test.Clients, pod *corev1.Pod, routeName string, ns string, selector map[string]string, logf logging.FormatLogger, cleaner *test.Cleaner) (*corev1.Pod, error) { if err := CreatePod(clients, pod, logf, cleaner); err != nil { return nil, fmt.Errorf("Failed to create pod: %v", err) } From deb6b65334849cfb575326f818539cb369acc1e3 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 2 Apr 2019 15:00:04 -0700 Subject: [PATCH 18/20] minor comment change --- test/e2e/event_transformation_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index 945013087c7..9185d8b3551 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -30,7 +30,7 @@ import ( /* TestEventTransformation tests the following scenario: - 1 2 5 6 7 + 1 2 5 6 7 EventSource ---> Channel ---> Subscription ---> Channel ---> Subscription ----> Service(Logger) | ^ 3 | | 4 From 52db4720bdfafdf90c65a561fb0a3ae6348c9603 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Tue, 2 Apr 2019 18:34:50 -0700 Subject: [PATCH 19/20] fix the code review issues --- test/crd.go | 16 +++---- test/e2e/broker_trigger_test.go | 2 +- test/e2e/channel_chain_test.go | 37 +++++++++------ test/e2e/e2e.go | 20 +++----- test/e2e/event_transformation_test.go | 68 +++++++++++++++------------ test/e2e/single_event_test.go | 26 ++++++---- 6 files changed, 93 insertions(+), 76 deletions(-) diff --git a/test/crd.go b/test/crd.go index 4d593db428f..52acf69c6fb 100644 --- a/test/crd.go +++ b/test/crd.go @@ -31,7 +31,7 @@ const ( servingApiVersion = "serving.knative.dev/v1alpha1" ) -// Route returns a Route object in namespace +// Route returns a Route object in namespace. func Route(name string, namespace string, configName string) *servingv1alpha1.Route { return &servingv1alpha1.Route{ ObjectMeta: metav1.ObjectMeta{ @@ -72,17 +72,17 @@ func Configuration(name string, namespace string, imagePath string) *servingv1al } } -// ClusterChannelProvisioner returns a ClusterChannelProvisioner for a given name +// ClusterChannelProvisioner returns a ClusterChannelProvisioner for a given name. func ClusterChannelProvisioner(name string) *corev1.ObjectReference { return pkgTest.CoreV1ObjectReference("ClusterChannelProvisioner", eventsApiVersion, name) } -// ChannelRef returns an ObjectReference for a given Channel Name +// ChannelRef returns an ObjectReference for a given Channel Name. func ChannelRef(name string) *corev1.ObjectReference { return pkgTest.CoreV1ObjectReference("Channel", eventsApiVersion, name) } -// Channel returns a Channel with the specified provisioner +// Channel returns a Channel with the specified provisioner. func Channel(name string, namespace string, provisioner *corev1.ObjectReference) *v1alpha1.Channel { return &v1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ @@ -109,14 +109,14 @@ func SubscriberSpecForService(name string) *v1alpha1.SubscriberSpec { } } -// ReplyStrategyForChannel returns a ReplyStrategy for a given Channel +// ReplyStrategyForChannel returns a ReplyStrategy for a given Channel. func ReplyStrategyForChannel(name string) *v1alpha1.ReplyStrategy { return &v1alpha1.ReplyStrategy{ Channel: pkgTest.CoreV1ObjectReference("Channel", eventsApiVersion, name), } } -// Subscription returns a Subscription +// Subscription returns a Subscription. func Subscription(name string, namespace string, channel *corev1.ObjectReference, subscriber *v1alpha1.SubscriberSpec, reply *v1alpha1.ReplyStrategy) *v1alpha1.Subscription { return &v1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ @@ -162,11 +162,11 @@ const ( CloudEventEncodingBinary = "binary" CloudEventEncodingStructured = "structured" CloudEventDefaultEncoding = CloudEventEncodingBinary - CloudEventDefaultType = "test.eventing.knative.dev" + CloudEventDefaultType = "dev.knative.test.event" ) // EventSenderPod creates a Pod that sends a single event to the given address. -func EventSenderPod(name string, namespace string, sink string, event CloudEvent) *corev1.Pod { +func EventSenderPod(name string, namespace string, sink string, event *CloudEvent) *corev1.Pod { if event.Encoding == "" { event.Encoding = CloudEventEncodingBinary } diff --git a/test/e2e/broker_trigger_test.go b/test/e2e/broker_trigger_test.go index b6d8c811328..d6b917c63bf 100644 --- a/test/e2e/broker_trigger_test.go +++ b/test/e2e/broker_trigger_test.go @@ -186,7 +186,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { // Create cloud event. // Using event type and source as part of the body for easier debugging. body := fmt.Sprintf("Body-%s-%s", eventToSend.Type, eventToSend.Source) - cloudEvent := test.CloudEvent{ + cloudEvent := &test.CloudEvent{ Source: eventToSend.Source, Type: eventToSend.Type, Data: fmt.Sprintf(`{"msg":%q}`, body), diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index a0f2cf792c5..0657f505188 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -38,12 +38,14 @@ func TestChannelChain(t *testing.T) { } const ( - senderName = "e2e-channelchain-sender" - routeName = "e2e-channelchain-route" + senderName = "e2e-channelchain-sender" + loggerPodName = "e2e-channelchain-logger-pod" ) - var channelNames = [2]string{"e2e-channelchain1", "e2e-channelchain2"} - var subscriptionNames1 = [2]string{"e2e-complexscen-subs11", "e2e-complexscen-subs12"} - var subscriptionNames2 = [1]string{"e2e-complexscen-subs21"} + channelNames := [2]string{"e2e-channelchain1", "e2e-channelchain2"} + // subscriptionNames1 corresponds to Subscriptions on channelNames[0] + subscriptionNames1 := [2]string{"e2e-complexscen-subs11", "e2e-complexscen-subs12"} + // subscriptionNames2 corresponds to Subscriptions on channelNames[1] + subscriptionNames2 := [1]string{"e2e-complexscen-subs21"} clients, cleaner := Setup(t, t.Logf) // verify namespace @@ -54,13 +56,14 @@ func TestChannelChain(t *testing.T) { // resources in it. So when TearDown() runs, it spews a lot of not found errors. defer TearDown(clients, cleaner, t.Logf) - // create subscriberPod and expose it as a service - t.Logf("creating subscriber pod") + // create loggerPod and expose it as a service + t.Logf("creating logger pod") selector := map[string]string{"e2etest": string(uuid.NewUUID())} - subscriberPod := test.EventLoggerPod(routeName, ns, selector) - subscriberPod, err := CreatePodAndServiceReady(clients, subscriberPod, routeName, ns, selector, t.Logf, cleaner) + loggerPod := test.EventLoggerPod(loggerPodName, ns, selector) + loggerSvc := test.Service(loggerPodName, ns, selector) + loggerPod, err := CreatePodAndServiceReady(clients, loggerPod, loggerSvc, ns, t.Logf, cleaner) if err != nil { - t.Fatalf("Failed to create subscriber pod and service, and get them ready: %v", err) + t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err) } // create channels @@ -82,7 +85,7 @@ func TestChannelChain(t *testing.T) { } // create subscriptions that subscribe the second channel, and call the logging service for _, subscriptionName := range subscriptionNames2 { - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(routeName), nil) + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(loggerPodName), nil) t.Logf("sub: %#v", sub) subs = append(subs, sub) } @@ -94,13 +97,19 @@ func TestChannelChain(t *testing.T) { // send fake CloudEvent to the first channel body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID()) - if err := SendFakeEventToChannel(clients, senderName, body, test.CloudEventDefaultType, test.CloudEventDefaultEncoding, channels[0], ns, t.Logf, cleaner); err != nil { + event := &test.CloudEvent{ + Source: senderName, + Type: test.CloudEventDefaultType, + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventDefaultEncoding, + } + if err := SendFakeEventToChannel(clients, event, channels[0], ns, t.Logf, cleaner); err != nil { t.Fatalf("Failed to send fake CloudEvent to the channel %q", channels[0].Name) } // check if the logging service receives the correct number of event messages expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) - if err := WaitForLogContentCount(clients, routeName, subscriberPod.Spec.Containers[0].Name, body, expectedContentCount); err != nil { - t.Fatalf("String %q does not appear %d times in logs of subscriber pod %q: %v", body, expectedContentCount, subscriberPod.Name, err) + if err := WaitForLogContentCount(clients, loggerPodName, loggerPod.Spec.Containers[0].Name, body, expectedContentCount); err != nil { + t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", body, expectedContentCount, loggerPodName, err) } } diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 1c7b4d1c14f..e679b4102e8 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -128,6 +128,7 @@ func CreateSubscription(clients *test.Clients, sub *v1alpha1.Subscription, _ log } // WithChannelsAndSubscriptionsReady creates Channels and Subscriptions and waits until all are Ready. +// When they are ready, chans and subs are altered to get the real Channels and Subscriptions. func WithChannelsAndSubscriptionsReady(clients *test.Clients, chans *[]*v1alpha1.Channel, subs *[]*v1alpha1.Subscription, logf logging.FormatLogger, cleaner *test.Cleaner) error { for _, channel := range *chans { if err := CreateChannel(clients, channel, logf, cleaner); err != nil { @@ -140,7 +141,7 @@ func WithChannelsAndSubscriptionsReady(clients *test.Clients, chans *[]*v1alpha1 if err := test.WaitForChannelState(channels, channel.Name, test.IsChannelReady, "ChannelIsReady"); err != nil { return err } - // Update the given object so they'll reflect the ready state + // Update the given object so they'll reflect the ready state. updatedchannel, err := channels.Get(channel.Name, metav1.GetOptions{}) if err != nil { return err @@ -160,7 +161,7 @@ func WithChannelsAndSubscriptionsReady(clients *test.Clients, chans *[]*v1alpha1 if err := test.WaitForSubscriptionState(subscriptions, sub.Name, test.IsSubscriptionReady, "SubscriptionIsReady"); err != nil { return err } - // Update the given object so they'll reflect the ready state + // Update the given object so they'll reflect the ready state. updatedsub, err := subscriptions.Get(sub.Name, metav1.GetOptions{}) if err != nil { return err @@ -297,7 +298,7 @@ func CreateServiceAccountAndBinding(clients *test.Clients, name string, logf log } // CreatePodAndServiceReady will create a Pod and Service, and wait for them to become ready -func CreatePodAndServiceReady(clients *test.Clients, pod *corev1.Pod, routeName string, ns string, selector map[string]string, logf logging.FormatLogger, cleaner *test.Cleaner) (*corev1.Pod, error) { +func CreatePodAndServiceReady(clients *test.Clients, pod *corev1.Pod, svc *corev1.Service, ns string, logf logging.FormatLogger, cleaner *test.Cleaner) (*corev1.Pod, error) { if err := CreatePod(clients, pod, logf, cleaner); err != nil { return nil, fmt.Errorf("Failed to create pod: %v", err) } @@ -306,7 +307,6 @@ func CreatePodAndServiceReady(clients *test.Clients, pod *corev1.Pod, routeName } logf("Pod %q starts running", pod.Name) - svc := test.Service(routeName, ns, selector) if err := CreateService(clients, svc, logf, cleaner); err != nil { return nil, fmt.Errorf("Failed to create service: %v", err) } @@ -341,18 +341,12 @@ func CreatePod(clients *test.Clients, pod *corev1.Pod, _ logging.FormatLogger, c return nil } -// SendFakeEventToChannel will create fake CloudEvent and send it to the given channel -func SendFakeEventToChannel(clients *test.Clients, senderName string, body string, eventType string, encoding string, channel *v1alpha1.Channel, ns string, logf logging.FormatLogger, cleaner *test.Cleaner) error { +// SendFakeEventToChannel will create fake CloudEvent and send it to the given channel. +func SendFakeEventToChannel(clients *test.Clients, event *test.CloudEvent, channel *v1alpha1.Channel, ns string, logf logging.FormatLogger, cleaner *test.Cleaner) error { logf("Sending fake CloudEvent") logf("Creating event sender pod") - event := test.CloudEvent{ - Source: senderName, - Type: eventType, - Data: fmt.Sprintf(`{"msg":%q}`, body), - Encoding: test.CloudEventDefaultEncoding, - } url := fmt.Sprintf("http://%s", channel.Status.Address.Hostname) - pod := test.EventSenderPod(senderName, ns, url, event) + pod := test.EventSenderPod(event.Source, ns, url, event) logf("Sender pod: %#v", pod) if err := CreatePod(clients, pod, logf, cleaner); err != nil { return err diff --git a/test/e2e/event_transformation_test.go b/test/e2e/event_transformation_test.go index 9185d8b3551..deeb8a47e26 100644 --- a/test/e2e/event_transformation_test.go +++ b/test/e2e/event_transformation_test.go @@ -43,14 +43,15 @@ func TestEventTransformation(t *testing.T) { t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string") } - const ( - senderName = "e2e-complexscen-sender" - msgPostfix = "######" - ) - var channelNames = [2]string{"e2e-complexscen1", "e2e-complexscen2"} - var routeNames = [2]string{"e2e-complexscen-route1", "e2e-complexscen-route2"} - var subscriptionNames1 = []string{"e2e-complexscen-subs11"} - var subscriptionNames2 = []string{"e2e-complexscen-subs21", "e2e-complexscen-subs22"} + senderName := "e2e-eventtransformation-sender" + msgPostfix := string(uuid.NewUUID()) + channelNames := [2]string{"e2e-eventtransformation1", "e2e-eventtransformation2"} + // subscriptionNames1 corresponds to Subscriptions on channelNames[0] + subscriptionNames1 := []string{"e2e-eventtransformation-subs11"} + // subscriptionNames2 corresponds to Subscriptions on channelNames[1] + subscriptionNames2 := []string{"e2e-eventtransformation-subs21", "e2e-eventtransformation-subs22"} + transformationPodName := "e2e-eventtransformation-transformation-pod" + loggerPodName := "e2e-eventtransformation-logger-pod" clients, cleaner := Setup(t, t.Logf) // verify namespace @@ -63,25 +64,26 @@ func TestEventTransformation(t *testing.T) { // create subscriberPods and expose them as services t.Logf("creating subscriber pods") - subscriberPods := make([]*corev1.Pod, 0) - for i, routeName := range routeNames { - selector := map[string]string{"e2etest": string(uuid.NewUUID())} - var subscriberPod *corev1.Pod - // create the first subscriber for the event transformation, and the second for the logging - if i == 0 { - subscriberPod = test.EventTransformationPod(routeName, ns, selector, msgPostfix) - } else if i == 1 { - subscriberPod = test.EventLoggerPod(routeName, ns, selector) - } - - subscriberPod, err := CreatePodAndServiceReady(clients, subscriberPod, routeName, ns, selector, t.Logf, cleaner) - if err != nil { - t.Fatalf("Failed to create subscriber pod and service, and get them ready: %v", err) - } - - subscriberPods = append(subscriberPods, subscriberPod) + + // create transformation pod and service + transformationPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())} + transformationPod := test.EventTransformationPod(transformationPodName, ns, transformationPodSelector, msgPostfix) + transformationSvc := test.Service(transformationPodName, ns, transformationPodSelector) + transformationPod, err := CreatePodAndServiceReady(clients, transformationPod, transformationSvc, ns, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create transformation pod and service, and get them ready: %v", err) } + subscriberPods = append(subscriberPods, transformationPod) + // create logger pod and service + loggerPodSelector := map[string]string{"e2etest": string(uuid.NewUUID())} + loggerPod := test.EventLoggerPod(loggerPodName, ns, loggerPodSelector) + loggerSvc := test.Service(loggerPodName, ns, loggerPodSelector) + loggerPod, err = CreatePodAndServiceReady(clients, loggerPod, loggerSvc, ns, t.Logf, cleaner) + if err != nil { + t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err) + } + subscriberPods = append(subscriberPods, loggerPod) // create channels t.Logf("Creating Channel and Subscription") @@ -97,13 +99,13 @@ func TestEventTransformation(t *testing.T) { subs := make([]*v1alpha1.Subscription, 0) // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward the transformed events to the second channel for _, subscriptionName := range subscriptionNames1 { - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), test.SubscriberSpecForService(routeNames[0]), test.ReplyStrategyForChannel(channelNames[1])) + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), test.SubscriberSpecForService(transformationPodName), test.ReplyStrategyForChannel(channelNames[1])) t.Logf("sub: %#v", sub) subs = append(subs, sub) } // create subscriptions that subscribe the second channel, and call the logging service for _, subscriptionName := range subscriptionNames2 { - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(routeNames[1]), nil) + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(loggerPodName), nil) t.Logf("sub: %#v", sub) subs = append(subs, sub) } @@ -115,14 +117,20 @@ func TestEventTransformation(t *testing.T) { // send fake CloudEvent to the first channel body := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID()) - if err := SendFakeEventToChannel(clients, senderName, body, test.CloudEventDefaultType, test.CloudEventDefaultEncoding, channels[0], ns, t.Logf, cleaner); err != nil { + event := &test.CloudEvent{ + Source: senderName, + Type: test.CloudEventDefaultType, + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: test.CloudEventDefaultEncoding, + } + if err := SendFakeEventToChannel(clients, event, channels[0], ns, t.Logf, cleaner); err != nil { t.Fatalf("Failed to send fake CloudEvent to the channel %q", channels[0].Name) } // check if the logging service receives the correct number of event messages expectedContent := body + msgPostfix expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) - if err := WaitForLogContentCount(clients, subscriberPods[1].Name, subscriberPods[1].Spec.Containers[0].Name, expectedContent, expectedContentCount); err != nil { - t.Fatalf("String %q does not appear %d times in logs of subscriber pod %q: %v", expectedContent, expectedContentCount, subscriberPods[1].Name, err) + if err := WaitForLogContentCount(clients, loggerPod.Name, loggerPod.Spec.Containers[0].Name, expectedContent, expectedContentCount); err != nil { + t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", expectedContent, expectedContentCount, loggerPod.Name, err) } } diff --git a/test/e2e/single_event_test.go b/test/e2e/single_event_test.go index a7babca99fb..07515cbbc3a 100644 --- a/test/e2e/single_event_test.go +++ b/test/e2e/single_event_test.go @@ -48,10 +48,9 @@ func SingleEvent(t *testing.T, encoding string) { const ( channelName = "e2e-singleevent" - subscriberName = "e2e-singleevent-subscriber" senderName = "e2e-singleevent-sender" subscriptionName = "e2e-singleevent-subscription" - routeName = "e2e-singleevent-route" + loggerPodName = "e2e-singleevent-logger-pod" ) clients, cleaner := Setup(t, t.Logf) @@ -64,12 +63,13 @@ func SingleEvent(t *testing.T, encoding string) { defer TearDown(clients, cleaner, t.Logf) // create logger pod - t.Logf("creating subscriber pod") + t.Logf("creating logger pod") selector := map[string]string{"e2etest": string(uuid.NewUUID())} - subscriberPod := test.EventLoggerPod(routeName, ns, selector) - subscriberPod, err := CreatePodAndServiceReady(clients, subscriberPod, routeName, ns, selector, t.Logf, cleaner) + loggerPod := test.EventLoggerPod(loggerPodName, ns, selector) + loggerSvc := test.Service(loggerPodName, ns, selector) + loggerPod, err := CreatePodAndServiceReady(clients, loggerPod, loggerSvc, ns, t.Logf, cleaner) if err != nil { - t.Fatalf("Failed to create subscriber pod and service, and get them ready: %v", err) + t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err) } // create channel @@ -77,7 +77,7 @@ func SingleEvent(t *testing.T, encoding string) { t.Logf("Creating Channel and Subscription") channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner)) t.Logf("channel: %#v", channel) - sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelName), test.SubscriberSpecForService(routeName), nil) + sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelName), test.SubscriberSpecForService(loggerPodName), nil) t.Logf("sub: %#v", sub) if err := WithChannelsAndSubscriptionsReady(clients, &[]*v1alpha1.Channel{channel}, &[]*v1alpha1.Subscription{sub}, t.Logf, cleaner); err != nil { @@ -86,13 +86,19 @@ func SingleEvent(t *testing.T, encoding string) { // send fake CloudEvent to the first channel body := fmt.Sprintf("TestSingleEvent %s", uuid.NewUUID()) - if err := SendFakeEventToChannel(clients, senderName, body, test.CloudEventDefaultType, test.CloudEventDefaultEncoding, channel, ns, t.Logf, cleaner); err != nil { + event := &test.CloudEvent{ + Source: senderName, + Type: test.CloudEventDefaultType, + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: encoding, + } + if err := SendFakeEventToChannel(clients, event, channel, ns, t.Logf, cleaner); err != nil { t.Fatalf("Failed to send fake CloudEvent to the channel %q", channel.Name) } - if err := pkgTest.WaitForLogContent(clients.Kube, routeName, subscriberPod.Spec.Containers[0].Name, body); err != nil { + if err := pkgTest.WaitForLogContent(clients.Kube, loggerPodName, loggerPod.Spec.Containers[0].Name, body); err != nil { clients.Kube.PodLogs(senderName, "sendevent") clients.Kube.PodLogs(senderName, "istio-proxy") - t.Fatalf("String %q not found in logs of subscriber pod %q: %v", body, routeName, err) + t.Fatalf("String %q not found in logs of logger pod %q: %v", body, loggerPodName, err) } } From 36264fca90294d6135499bed4aa7b5d89731dca6 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Fri, 5 Apr 2019 15:53:43 -0700 Subject: [PATCH 20/20] change the timeout to 20 mins, by default it is 10 mins --- test/e2e-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 1903ce9c378..4051b5d0283 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -87,6 +87,6 @@ function dump_extra_cluster_state() { initialize $@ -go_test_e2e ./test/e2e || fail_test +go_test_e2e -timeout=20m ./test/e2e || fail_test success