Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions test/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
servingApiVersion = "serving.knative.dev/v1alpha1"
)

// Route returns a Route object in namespace
// Route returns a Route object in namespace.
func Route(name string, namespace string, configName string) *servingv1alpha1.Route {
return &servingv1alpha1.Route{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -72,17 +72,17 @@ func Configuration(name string, namespace string, imagePath string) *servingv1al
}
}

// ClusterChannelProvisioner returns a ClusterChannelProvisioner for a given name
// ClusterChannelProvisioner returns a ClusterChannelProvisioner for a given name.
func ClusterChannelProvisioner(name string) *corev1.ObjectReference {
return pkgTest.CoreV1ObjectReference("ClusterChannelProvisioner", eventsApiVersion, name)
}

// ChannelRef returns an ObjectReference for a given Channel Name
// ChannelRef returns an ObjectReference for a given Channel Name.
func ChannelRef(name string) *corev1.ObjectReference {
return pkgTest.CoreV1ObjectReference("Channel", eventsApiVersion, name)
}

// Channel returns a Channel with the specified provisioner
// Channel returns a Channel with the specified provisioner.
func Channel(name string, namespace string, provisioner *corev1.ObjectReference) *v1alpha1.Channel {
return &v1alpha1.Channel{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -95,7 +95,7 @@ func Channel(name string, namespace string, provisioner *corev1.ObjectReference)
}
}

// SubscriberSpecForRoute returns a SubscriberSpec for a given Knative Service.
// SubscriberSpecForRoute returns a SubscriberSpec for a given Knative Route.
func SubscriberSpecForRoute(name string) *v1alpha1.SubscriberSpec {
return &v1alpha1.SubscriberSpec{
Ref: pkgTest.CoreV1ObjectReference("Route", servingApiVersion, name),
Expand All @@ -109,7 +109,14 @@ func SubscriberSpecForService(name string) *v1alpha1.SubscriberSpec {
}
}

// Subscription returns a Subscription
// ReplyStrategyForChannel returns a ReplyStrategy for a given Channel.
func ReplyStrategyForChannel(name string) *v1alpha1.ReplyStrategy {
return &v1alpha1.ReplyStrategy{
Channel: pkgTest.CoreV1ObjectReference("Channel", eventsApiVersion, name),
}
}

// Subscription returns a Subscription.
func Subscription(name string, namespace string, channel *corev1.ObjectReference, subscriber *v1alpha1.SubscriberSpec, reply *v1alpha1.ReplyStrategy) *v1alpha1.Subscription {
return &v1alpha1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -154,10 +161,12 @@ type TypeAndSource struct {
const (
CloudEventEncodingBinary = "binary"
CloudEventEncodingStructured = "structured"
CloudEventDefaultEncoding = CloudEventEncodingBinary
CloudEventDefaultType = "dev.knative.test.event"
)

// EventSenderPod creates a Pod that sends a single event to the given address.
func EventSenderPod(name string, namespace string, sink string, event CloudEvent) *corev1.Pod {
func EventSenderPod(name string, namespace string, sink string, event *CloudEvent) *corev1.Pod {
if event.Encoding == "" {
event.Encoding = CloudEventEncodingBinary
}
Expand Down Expand Up @@ -214,6 +223,30 @@ func EventLoggerPod(name string, namespace string, selector map[string]string) *
}
}

// EventTransformationPod creates a Pod that transforms events received.
func EventTransformationPod(name string, namespace string, selector map[string]string, msgPostfix string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: selector,
Annotations: map[string]string{"sidecar.istio.io/inject": "true"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "transformevents",
Image: pkgTest.ImagePath("transformevents"),
ImagePullPolicy: corev1.PullAlways, // TODO: this might not be wanted for local.
Args: []string{
"-msg-postfix",
msgPostfix,
},
}},
RestartPolicy: corev1.RestartPolicyAlways,
},
}
}

// Service creates a Kubernetes Service with the given name, namespace, and
// selector. Port 8080 is assumed the target port.
func Service(name string, namespace string, selector map[string]string) *corev1.Service {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ function dump_extra_cluster_state() {

initialize $@

go_test_e2e ./test/e2e || fail_test
go_test_e2e -timeout=20m ./test/e2e || fail_test

success
4 changes: 2 additions & 2 deletions test/e2e/broker_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) {
clients, cleaner := Setup(t, t.Logf)

// Verify namespace exists.
ns, cleanupNS := NamespaceExists(t, clients, t.Logf)
ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf)

defer cleanupNS()
defer TearDown(clients, cleaner, t.Logf)
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) {
// Create cloud event.
// Using event type and source as part of the body for easier debugging.
body := fmt.Sprintf("Body-%s-%s", eventToSend.Type, eventToSend.Source)
cloudEvent := test.CloudEvent{
cloudEvent := &test.CloudEvent{
Source: eventToSend.Source,
Type: eventToSend.Type,
Data: fmt.Sprintf(`{"msg":%q}`, body),
Expand Down
115 changes: 115 additions & 0 deletions test/e2e/channel_chain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// +build e2e

/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"fmt"
"testing"

"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/test"
"k8s.io/apimachinery/pkg/util/uuid"
)

/*
TestChannelChain tests the following scenario:

EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> Service(Logger)

*/
func TestChannelChain(t *testing.T) {
if test.EventingFlags.Provisioner == "" {
t.Fatal("ClusterChannelProvisioner must be set to a non-empty string. Either do not specify --clusterChannelProvisioner or set to something other than the empty string")
}

const (
senderName = "e2e-channelchain-sender"
loggerPodName = "e2e-channelchain-logger-pod"
)
channelNames := [2]string{"e2e-channelchain1", "e2e-channelchain2"}
// subscriptionNames1 corresponds to Subscriptions on channelNames[0]
subscriptionNames1 := [2]string{"e2e-complexscen-subs11", "e2e-complexscen-subs12"}
// subscriptionNames2 corresponds to Subscriptions on channelNames[1]
subscriptionNames2 := [1]string{"e2e-complexscen-subs21"}

clients, cleaner := Setup(t, t.Logf)
// verify namespace
ns, cleanupNS := CreateNamespaceIfNeeded(t, clients, t.Logf)
defer cleanupNS()

// TearDown() needs to be deferred after cleanupNS(). Otherwise the namespace is deleted and all
// resources in it. So when TearDown() runs, it spews a lot of not found errors.
defer TearDown(clients, cleaner, t.Logf)

// create loggerPod and expose it as a service
t.Logf("creating logger pod")
selector := map[string]string{"e2etest": string(uuid.NewUUID())}
loggerPod := test.EventLoggerPod(loggerPodName, ns, selector)
loggerSvc := test.Service(loggerPodName, ns, selector)
loggerPod, err := CreatePodAndServiceReady(clients, loggerPod, loggerSvc, ns, t.Logf, cleaner)
if err != nil {
t.Fatalf("Failed to create logger pod and service, and get them ready: %v", err)
}

// create channels
t.Logf("Creating Channel and Subscription")
channels := make([]*v1alpha1.Channel, 0)
for _, channelName := range channelNames {
channel := test.Channel(channelName, ns, test.ClusterChannelProvisioner(test.EventingFlags.Provisioner))
t.Logf("channel: %#v", channel)
channels = append(channels, channel)
}

// create subscriptions
subs := make([]*v1alpha1.Subscription, 0)
// create subscriptions that subscribe the first channel, and reply events directly to the second channel
for _, subscriptionName := range subscriptionNames1 {
sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[0]), nil, test.ReplyStrategyForChannel(channelNames[1]))
t.Logf("sub: %#v", sub)
subs = append(subs, sub)
}
// create subscriptions that subscribe the second channel, and call the logging service
for _, subscriptionName := range subscriptionNames2 {
sub := test.Subscription(subscriptionName, ns, test.ChannelRef(channelNames[1]), test.SubscriberSpecForService(loggerPodName), nil)
t.Logf("sub: %#v", sub)
subs = append(subs, sub)
}

// wait for all channels and subscriptions to become ready
if err := WithChannelsAndSubscriptionsReady(clients, &channels, &subs, t.Logf, cleaner); err != nil {
t.Fatalf("The Channel or Subscription were not marked as Ready: %v", err)
}

// send fake CloudEvent to the first channel
body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID())
event := &test.CloudEvent{
Source: senderName,
Type: test.CloudEventDefaultType,
Data: fmt.Sprintf(`{"msg":%q}`, body),
Encoding: test.CloudEventDefaultEncoding,
}
if err := SendFakeEventToChannel(clients, event, channels[0], ns, t.Logf, cleaner); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the channel %q", channels[0].Name)
}

// check if the logging service receives the correct number of event messages
expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2)
if err := WaitForLogContentCount(clients, loggerPodName, loggerPod.Spec.Containers[0].Name, body, expectedContentCount); err != nil {
t.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", body, expectedContentCount, loggerPodName, err)
}
}
112 changes: 89 additions & 23 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,36 +127,48 @@ func CreateSubscription(clients *test.Clients, sub *v1alpha1.Subscription, _ log
return nil
}

// WithChannelAndSubscriptionReady creates a Channel and Subscription and waits until both are Ready.
func WithChannelAndSubscriptionReady(clients *test.Clients, channel *v1alpha1.Channel, sub *v1alpha1.Subscription, logf logging.FormatLogger, cleaner *test.Cleaner) error {
if err := CreateChannel(clients, channel, logf, cleaner); err != nil {
return err
}
if err := CreateSubscription(clients, sub, logf, cleaner); err != nil {
return err
// WithChannelsAndSubscriptionsReady creates Channels and Subscriptions and waits until all are Ready.
Comment thread
chizhg marked this conversation as resolved.
// When they are ready, chans and subs are altered to get the real Channels and Subscriptions.
func WithChannelsAndSubscriptionsReady(clients *test.Clients, chans *[]*v1alpha1.Channel, subs *[]*v1alpha1.Subscription, logf logging.FormatLogger, cleaner *test.Cleaner) error {
for _, channel := range *chans {
if err := CreateChannel(clients, channel, logf, cleaner); err != nil {
return err
}
}

channels := clients.Eventing.EventingV1alpha1().Channels(pkgTest.Flags.Namespace)
if err := test.WaitForChannelState(channels, channel.Name, test.IsChannelReady, "ChannelIsReady"); err != nil {
return err
for i, channel := range *chans {
if err := test.WaitForChannelState(channels, channel.Name, test.IsChannelReady, "ChannelIsReady"); err != nil {
return err
}
// Update the given object so they'll reflect the ready state.
updatedchannel, err := channels.Get(channel.Name, metav1.GetOptions{})
if err != nil {
return err
}
updatedchannel.DeepCopyInto(channel)
(*chans)[i] = channel
}
// Update the given object so they'll reflect the ready state
updatedchannel, err := channels.Get(channel.Name, metav1.GetOptions{})
if err != nil {
return err

for _, sub := range *subs {
if err := CreateSubscription(clients, sub, logf, cleaner); err != nil {
return err
}
}
updatedchannel.DeepCopyInto(channel)

subscriptions := clients.Eventing.EventingV1alpha1().Subscriptions(pkgTest.Flags.Namespace)
if err = test.WaitForSubscriptionState(subscriptions, sub.Name, test.IsSubscriptionReady, "SubscriptionIsReady"); err != nil {
return err
}
// Update the given object so they'll reflect the ready state
updatedsub, err := subscriptions.Get(sub.Name, metav1.GetOptions{})
if err != nil {
return err
for i, sub := range *subs {
if err := test.WaitForSubscriptionState(subscriptions, sub.Name, test.IsSubscriptionReady, "SubscriptionIsReady"); err != nil {
return err
}
// Update the given object so they'll reflect the ready state.
updatedsub, err := subscriptions.Get(sub.Name, metav1.GetOptions{})
if err != nil {
return err
}
updatedsub.DeepCopyInto(sub)
(*subs)[i] = sub
}
updatedsub.DeepCopyInto(sub)

return nil
}
Expand Down Expand Up @@ -285,6 +297,29 @@ func CreateServiceAccountAndBinding(clients *test.Clients, name string, logf log
return nil
}

// CreatePodAndServiceReady will create a Pod and Service, and wait for them to become ready
func CreatePodAndServiceReady(clients *test.Clients, pod *corev1.Pod, svc *corev1.Service, ns string, logf logging.FormatLogger, cleaner *test.Cleaner) (*corev1.Pod, error) {
if err := CreatePod(clients, pod, logf, cleaner); err != nil {
return nil, fmt.Errorf("Failed to create pod: %v", err)
}
if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil {
return nil, fmt.Errorf("Error waiting for pod to become running: %v", err)
}
logf("Pod %q starts running", pod.Name)

if err := CreateService(clients, svc, logf, cleaner); err != nil {
return nil, fmt.Errorf("Failed to create service: %v", err)
}

// Reload pod to get IP
pod, err := clients.Kube.Kube.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Failed to get pod: %v", err)
}

return pod, nil
}

// CreateService will create a Service
func CreateService(clients *test.Clients, svc *corev1.Service, _ logging.FormatLogger, cleaner *test.Cleaner) error {
svcs := clients.Kube.Kube.CoreV1().Services(svc.GetNamespace())
Expand All @@ -306,6 +341,23 @@ func CreatePod(clients *test.Clients, pod *corev1.Pod, _ logging.FormatLogger, c
return nil
}

// SendFakeEventToChannel will create fake CloudEvent and send it to the given channel.
func SendFakeEventToChannel(clients *test.Clients, event *test.CloudEvent, channel *v1alpha1.Channel, ns string, logf logging.FormatLogger, cleaner *test.Cleaner) error {
logf("Sending fake CloudEvent")
logf("Creating event sender pod")
url := fmt.Sprintf("http://%s", channel.Status.Address.Hostname)
pod := test.EventSenderPod(event.Source, ns, url, event)
logf("Sender pod: %#v", pod)
if err := CreatePod(clients, pod, logf, cleaner); err != nil {
return err
}
if err := pkgTest.WaitForAllPodsRunning(clients.Kube, ns); err != nil {
return err
}
logf("Sender pod starts running")
return nil
}

// WaitForLogContents waits until logs for given Pod/Container include the given contents.
// If the contents are not present within timeout it returns error.
func WaitForLogContents(clients *test.Clients, logf logging.FormatLogger, podName string, containerName string, namespace string, contents []string) error {
Expand All @@ -326,6 +378,19 @@ func WaitForLogContents(clients *test.Clients, logf logging.FormatLogger, podNam
})
}

// WaitForLogContentCount checks if the number of substr occur times equals the given number.
// If the content does not appear the given times it returns error.
func WaitForLogContentCount(client *test.Clients, podName, containerName, content string, appearTimes int) error {
Comment thread
chizhg marked this conversation as resolved.
return wait.PollImmediate(interval, timeout, func() (bool, error) {
logs, err := client.Kube.PodLogs(podName, containerName)
if err != nil {
return true, err
}

return strings.Count(string(logs), content) == appearTimes, nil
})
}

// FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents.
// It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true.
func FindAnyLogContents(clients *test.Clients, logf logging.FormatLogger, podName string, containerName string, namespace string, contents []string) (bool, error) {
Expand Down Expand Up @@ -368,7 +433,8 @@ func LabelNamespace(clients *test.Clients, logf logging.FormatLogger, labels map
return err
}

func NamespaceExists(t *testing.T, clients *test.Clients, logf logging.FormatLogger) (string, func()) {
// CreateNamespaceIfNeeded creates a new namespace if it does not exist
func CreateNamespaceIfNeeded(t *testing.T, clients *test.Clients, logf logging.FormatLogger) (string, func()) {
shutdown := func() {}
ns := pkgTest.Flags.Namespace
logf("Namespace: %s", ns)
Expand Down
Loading