From 6dcfca7e0dd3493f7569c3b888ce8b744fc83590 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Wed, 29 May 2019 18:51:47 -0700 Subject: [PATCH 01/15] support fail fast in all creation methods --- test/common/client.go | 11 +-- test/common/creation.go | 73 ++++++++----------- test/common/operation.go | 6 +- test/e2e/broker_channel_flow_test.go | 52 ++++--------- test/e2e/broker_default_test.go | 10 +-- test/e2e/broker_event_transformation_test.go | 32 +++----- test/e2e/channel_chain_test.go | 16 +--- test/e2e/channel_event_transformation_test.go | 24 ++---- test/e2e/channel_single_event_test.go | 14 +--- test/e2e/test_runner.go | 8 +- 10 files changed, 83 insertions(+), 163 deletions(-) diff --git a/test/common/client.go b/test/common/client.go index 17a4d4f49cb..53e8a612442 100644 --- a/test/common/client.go +++ b/test/common/client.go @@ -20,9 +20,10 @@ limitations under the License. package common import ( + "testing" + eventing "github.com/knative/eventing/pkg/client/clientset/versioned" "github.com/knative/pkg/test" - "github.com/knative/pkg/test/logging" "k8s.io/client-go/dynamic" ) @@ -33,13 +34,13 @@ type Client struct { Dynamic dynamic.Interface Namespace string - Logf logging.FormatLogger + T *testing.T Cleaner *Cleaner } // NewClient instantiates and returns several clientsets required for making request to the // cluster specified by the combination of clusterName and configPath. -func NewClient(configPath string, clusterName string, namespace string, logger logging.FormatLogger) (*Client, error) { +func NewClient(configPath string, clusterName string, namespace string, t *testing.T) (*Client, error) { client := &Client{} cfg, err := test.BuildClientConfig(configPath, clusterName) if err != nil { @@ -61,7 +62,7 @@ func NewClient(configPath string, clusterName string, namespace string, logger l } client.Namespace = namespace - client.Logf = logger - client.Cleaner = NewCleaner(logger, client.Dynamic) + client.T = t + client.Cleaner = NewCleaner(t.Logf, client.Dynamic) return client, nil } diff --git a/test/common/creation.go b/test/common/creation.go index 14472b8359c..4eca1a06dc5 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -28,8 +28,8 @@ var coreAPIVersion = corev1.SchemeGroupVersion.Version var rbacAPIGroup = rbacv1.SchemeGroupVersion.Group var rbacAPIVersion = rbacv1.SchemeGroupVersion.Version -// CreateChannel will create a Channel Resource in Eventing. -func (client *Client) CreateChannel(name, provisonerName string) error { +// CreateChannelOrFail will create a Channel Resource in Eventing. +func (client *Client) CreateChannelOrFail(name, provisonerName string) { namespace := client.Namespace channel := base.Channel(name, provisonerName) @@ -37,24 +37,20 @@ func (client *Client) CreateChannel(name, provisonerName string) error { // update channel with the new reference channel, err := channels.Create(channel) if err != nil { - return err + client.T.Fatalf("Failed to create channel %q: %v", name, err) } client.Cleaner.AddObj(channel) - return nil } -// CreateChannels will create a list of Channel Resources in Eventing. -func (client *Client) CreateChannels(names []string, provisionerName string) error { +// CreateChannelsOrFail will create a list of Channel Resources in Eventing. +func (client *Client) CreateChannelsOrFail(names []string, provisionerName string) { for _, name := range names { - if err := client.CreateChannel(name, provisionerName); err != nil { - return err - } + client.CreateChannelOrFail(name, provisionerName) } - return nil } -// CreateSubscription will create a Subscription. -func (client *Client) CreateSubscription(name, channelName string, options ...func(*v1alpha1.Subscription)) error { +// CreateSubscriptionOrFail will create a Subscription. +func (client *Client) CreateSubscriptionOrFail(name, channelName string, options ...func(*v1alpha1.Subscription)) { namespace := client.Namespace subscription := base.Subscription(name, channelName, options...) @@ -62,24 +58,20 @@ func (client *Client) CreateSubscription(name, channelName string, options ...fu // update subscription with the new reference subscription, err := subscriptions.Create(subscription) if err != nil { - return err + client.T.Fatalf("Failed to create subscription %q: %v", name, err) } client.Cleaner.AddObj(subscription) - return nil } -// CreateSubscriptions will create a list of Subscriptions. -func (client *Client) CreateSubscriptions(names []string, channelName string, options ...func(*v1alpha1.Subscription)) error { +// CreateSubscriptionsOrFail will create a list of Subscriptions with the same configuration except the name. +func (client *Client) CreateSubscriptionsOrFail(names []string, channelName string, options ...func(*v1alpha1.Subscription)) { for _, name := range names { - if err := client.CreateSubscription(name, channelName, options...); err != nil { - return err - } + client.CreateSubscriptionOrFail(name, channelName, options...) } - return nil } -// CreateBroker will create a Broker. -func (client *Client) CreateBroker(name, provisionerName string) error { +// CreateBrokerOrFail will create a Broker. +func (client *Client) CreateBrokerOrFail(name, provisionerName string) { namespace := client.Namespace broker := base.Broker(name, provisionerName) @@ -87,24 +79,20 @@ func (client *Client) CreateBroker(name, provisionerName string) error { // update broker with the new reference broker, err := brokers.Create(broker) if err != nil { - return err + client.T.Fatalf("Failed to create broker %q: %v", name, err) } client.Cleaner.AddObj(broker) - return nil } -// CreateBrokers will create a list of Brokers. -func (client *Client) CreateBrokers(names []string, provisionerName string) error { +// CreateBrokersOrFail will create a list of Brokers. +func (client *Client) CreateBrokersOrFail(names []string, provisionerName string) { for _, name := range names { - if err := client.CreateBroker(name, provisionerName); err != nil { - return err - } + client.CreateBrokerOrFail(name, provisionerName) } - return nil } -// CreateTrigger will create a Trigger. -func (client *Client) CreateTrigger(name string, options ...func(*v1alpha1.Trigger)) error { +// CreateTriggerOrFail will create a Trigger. +func (client *Client) CreateTriggerOrFail(name string, options ...func(*v1alpha1.Trigger)) { namespace := client.Namespace trigger := base.Trigger(name, options...) @@ -112,10 +100,9 @@ func (client *Client) CreateTrigger(name string, options ...func(*v1alpha1.Trigg // update trigger with the new reference trigger, err := triggers.Create(trigger) if err != nil { - return err + client.T.Fatalf("Failed to create trigger %q: %v", name, err) } client.Cleaner.AddObj(trigger) - return nil } // WithService returns an option that creates a Service binded with the given pod. @@ -133,40 +120,38 @@ func WithService(name string) func(*corev1.Pod, *Client) error { } } -// CreatePod will create a Pod. -func (client *Client) CreatePod(pod *corev1.Pod, options ...func(*corev1.Pod, *Client) error) error { +// CreatePodOrFail will create a Pod. +func (client *Client) CreatePodOrFail(pod *corev1.Pod, options ...func(*corev1.Pod, *Client) error) { // set namespace for the pod in case it's empty namespace := client.Namespace pod.Namespace = namespace // apply options on the pod before creation for _, option := range options { if err := option(pod, client); err != nil { - return err + client.T.Fatalf("Failed to configure pod %q: %v", pod.Name, err) } } if _, err := client.Kube.CreatePod(pod); err != nil { - return err + client.T.Fatalf("Failed to create pod %q: %v", pod.Name, err) } client.Cleaner.Add(coreAPIGroup, coreAPIVersion, "pods", namespace, pod.Name) - return nil } -// CreateServiceAccountAndBinding creates both ServiceAccount and ClusterRoleBinding with default +// CreateServiceAccountAndBindingOrFail creates both ServiceAccount and ClusterRoleBinding with default // cluster-admin role. -func (client *Client) CreateServiceAccountAndBinding(saName, crName string) error { +func (client *Client) CreateServiceAccountAndBindingOrFail(saName, crName string) { namespace := client.Namespace sa := base.ServiceAccount(saName, namespace) sas := client.Kube.Kube.CoreV1().ServiceAccounts(namespace) if _, err := sas.Create(sa); err != nil { - return err + client.T.Fatalf("Failed to create service account %q: %v", saName, err) } client.Cleaner.Add(coreAPIGroup, coreAPIVersion, "serviceaccounts", namespace, saName) crb := base.ClusterRoleBinding(saName, crName, namespace) crbs := client.Kube.Kube.RbacV1().ClusterRoleBindings() if _, err := crbs.Create(crb); err != nil { - return err + client.T.Fatalf("Failed to create cluster role binding %q: %v", crName, err) } client.Cleaner.Add(rbacAPIGroup, rbacAPIVersion, "clusterrolebindings", "", crb.GetName()) - return nil } diff --git a/test/common/operation.go b/test/common/operation.go index d2680685a72..f8eb07a5bd1 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -80,11 +80,9 @@ func (client *Client) sendFakeEventToAddress( event *base.CloudEvent, ) error { namespace := client.Namespace - client.Logf("Sending fake CloudEvent") + client.T.Logf("Sending fake CloudEvent") pod := base.EventSenderPod(senderName, url, event) - if err := client.CreatePod(pod); err != nil { - return err - } + client.CreatePodOrFail(pod) if err := pkgTest.WaitForPodRunning(client.Kube, senderName, namespace); err != nil { return err } diff --git a/test/e2e/broker_channel_flow_test.go b/test/e2e/broker_channel_flow_test.go index 55f44c4a8de..043ebffcca4 100644 --- a/test/e2e/broker_channel_flow_test.go +++ b/test/e2e/broker_channel_flow_test.go @@ -85,17 +85,11 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { defer TearDown(client) // creates ServiceAccount and ClusterRoleBinding with default cluster-admin role - if err := client.CreateServiceAccountAndBinding(saIngressName, crIngressName); err != nil { - t.Fatalf("Failed to create the Ingress ServiceAccount and ServiceAccountRoleBinding: %v", err) - } - if err := client.CreateServiceAccountAndBinding(saFilterName, crFilterName); err != nil { - t.Fatalf("Failed to create the Filter ServiceAccount and ServiceAccountRoleBinding: %v", err) - } + client.CreateServiceAccountAndBindingOrFail(saIngressName, crIngressName) + client.CreateServiceAccountAndBindingOrFail(saFilterName, crFilterName) // create a new broker - if err := client.CreateBroker(brokerName, provisioner); err != nil { - t.Fatalf("Failed to create the Broker: %q, %v", brokerName, err) - } + client.CreateBrokerOrFail(brokerName, provisioner) client.WaitForBrokerReady(brokerName) // create the event we want to transform to @@ -109,40 +103,30 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { // create the transformation service for trigger1 transformationPod := base.EventTransformationPod(transformationPodName, eventAfterTransformation) - if err := client.CreatePod(transformationPod, common.WithService(transformationPodName)); err != nil { - t.Fatalf("Failed to create transformation service %q: %v", transformationPodName, err) - } + client.CreatePodOrFail(transformationPod, common.WithService(transformationPodName)) // create trigger1 to receive the original event, and do event transformation - if err := client.CreateTrigger( + client.CreateTriggerOrFail( triggerName1, base.WithBroker(brokerName), base.WithTriggerFilter(eventSource1, eventType1), base.WithSubscriberRefForTrigger(transformationPodName), - ); err != nil { - t.Fatalf("Error creating trigger %q: %v", triggerName1, err) - } + ) // create logger pod and service for trigger2 loggerPod1 := base.EventLoggerPod(loggerPodName1) - if err := client.CreatePod(loggerPod1, common.WithService(loggerPodName1)); err != nil { - t.Fatalf("Failed to create logger service %q: %v", loggerPodName1, err) - } + client.CreatePodOrFail(loggerPod1, common.WithService(loggerPodName1)) // create trigger2 to receive all the events - if err := client.CreateTrigger( + client.CreateTriggerOrFail( triggerName2, base.WithBroker(brokerName), base.WithTriggerFilter(any, any), base.WithSubscriberRefForTrigger(loggerPodName1), - ); err != nil { - t.Fatalf("Error creating trigger %q: %v", triggerName2, err) - } + ) // create channel for trigger3 - if err := client.CreateChannel(channelName, provisioner); err != nil { - t.Fatalf("Failed to create channel %q: %v", channelName, err) - } + client.CreateChannelOrFail(channelName, provisioner) client.WaitForChannelReady(channelName) // create trigger3 to receive the transformed event, and send it to the channel @@ -150,29 +134,23 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { if err != nil { t.Fatalf("Failed to get the url for the channel %q: %v", channelName, err) } - if err := client.CreateTrigger( + client.CreateTriggerOrFail( triggerName3, base.WithBroker(brokerName), base.WithTriggerFilter(eventSource2, eventType2), base.WithSubscriberURIForTrigger(channelURL), - ); err != nil { - t.Fatalf("Error creating trigger %q: %v", triggerName3, err) - } + ) // create logger pod and service for subscription loggerPod2 := base.EventLoggerPod(loggerPodName2) - if err := client.CreatePod(loggerPod2, common.WithService(loggerPodName2)); err != nil { - t.Fatalf("Failed to create logger service %q: %v", loggerPodName2, err) - } + client.CreatePodOrFail(loggerPod2, common.WithService(loggerPodName2)) // create subscription - if err := client.CreateSubscription( + client.CreateSubscriptionOrFail( subscriptionName, channelName, base.WithSubscriberForSubscription(loggerPodName2), - ); err != nil { - t.Fatalf("Error creating subscription %q: %v", subscriptionName, err) - } + ) // wait for all test resources to be ready, so that we can start sending events if err := client.WaitForAllTestResourcesReady(); err != nil { diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index 5b7d82cec97..68a53791af5 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -81,21 +81,17 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { for _, event := range eventsToReceive { subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) pod := base.EventLoggerPod(subscriberName) - if err := client.CreatePod(pod, common.WithService(subscriberName)); err != nil { - t.Fatalf("Failed to create the subscriber %q: %v", subscriberName, err) - } + client.CreatePodOrFail(pod, common.WithService(subscriberName)) } // Create triggers. for _, event := range eventsToReceive { triggerName := name("trigger", event.typeAndSource.Type, event.typeAndSource.Source) subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) - if err := client.CreateTrigger(triggerName, + client.CreateTriggerOrFail(triggerName, base.WithSubscriberRefForTrigger(subscriberName), base.WithTriggerFilter(event.typeAndSource.Source, event.typeAndSource.Type), - ); err != nil { - t.Fatalf("Failed to create the trigger %q: %v", triggerName, err) - } + ) } // Wait for all test resources to become ready before sending the events. diff --git a/test/e2e/broker_event_transformation_test.go b/test/e2e/broker_event_transformation_test.go index fb5f5c06c0d..7aba9f24f1a 100644 --- a/test/e2e/broker_event_transformation_test.go +++ b/test/e2e/broker_event_transformation_test.go @@ -74,17 +74,11 @@ func testEventTransformationForTrigger(t *testing.T, provisioner string) { defer TearDown(client) // creates ServiceAccount and ClusterRoleBinding with default cluster-admin role - if err := client.CreateServiceAccountAndBinding(saIngressName, crIngressName); err != nil { - t.Fatalf("Failed to create the Ingress ServiceAccount and ServiceAccountRoleBinding: %v", err) - } - if err := client.CreateServiceAccountAndBinding(saFilterName, crFilterName); err != nil { - t.Fatalf("Failed to create the Filter ServiceAccount and ServiceAccountRoleBinding: %v", err) - } + client.CreateServiceAccountAndBindingOrFail(saIngressName, crIngressName) + client.CreateServiceAccountAndBindingOrFail(saFilterName, crFilterName) // create a new broker - if err := client.CreateBroker(brokerName, provisioner); err != nil { - t.Fatalf("Failed to create the Broker: %q, %v", brokerName, err) - } + client.CreateBrokerOrFail(brokerName, provisioner) client.WaitForBrokerReady(brokerName) // create the event we want to transform to @@ -98,35 +92,27 @@ func testEventTransformationForTrigger(t *testing.T, provisioner string) { // create the transformation service transformationPod := base.EventTransformationPod(transformationPodName, eventAfterTransformation) - if err := client.CreatePod(transformationPod, common.WithService(transformationPodName)); err != nil { - t.Fatalf("Failed to create transformation service %q: %v", transformationPodName, err) - } + client.CreatePodOrFail(transformationPod, common.WithService(transformationPodName)) // create trigger1 for event transformation - if err := client.CreateTrigger( + client.CreateTriggerOrFail( triggerName1, base.WithBroker(brokerName), base.WithTriggerFilter(eventSource1, eventType1), base.WithSubscriberRefForTrigger(transformationPodName), - ); err != nil { - t.Fatalf("Error creating trigger %q: %v", triggerName1, err) - } + ) // create logger pod and service loggerPod := base.EventLoggerPod(loggerPodName) - if err := client.CreatePod(loggerPod, common.WithService(loggerPodName)); err != nil { - t.Fatalf("Failed to create logger service %q: %v", loggerPodName, err) - } + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) // create trigger2 for event receiving - if err := client.CreateTrigger( + client.CreateTriggerOrFail( triggerName2, base.WithBroker(brokerName), base.WithTriggerFilter(eventSource2, eventType2), base.WithSubscriberRefForTrigger(loggerPodName), - ); err != nil { - t.Fatalf("Error creating trigger %q: %v", triggerName2, err) - } + ) // wait for all test resources to be ready, so that we can start sending events if err := client.WaitForAllTestResourcesReady(); err != nil { diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 861576a115f..6d96ac1e5da 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -52,25 +52,17 @@ func testChannelChain(t *testing.T, provisioner string) { defer TearDown(client) // create channels - if err := client.CreateChannels(channelNames, provisioner); err != nil { - t.Fatalf("Failed to create channels %q: %v", channelNames, err) - } + client.CreateChannelsOrFail(channelNames, provisioner) client.WaitForChannelsReady() // create loggerPod and expose it as a service pod := base.EventLoggerPod(loggerPodName) - if err := client.CreatePod(pod, common.WithService(loggerPodName)); err != nil { - t.Fatalf("Failed to create logger service: %v", err) - } + client.CreatePodOrFail(pod, common.WithService(loggerPodName)) // create subscriptions that subscribe the first channel, and reply events directly to the second channel - if err := client.CreateSubscriptions(subscriptionNames1, channelNames[0], base.WithReply(channelNames[1])); err != nil { - t.Fatalf("Failed to create subscriptions %q for channel %q: %v", subscriptionNames1, channelNames[0], err) - } + client.CreateSubscriptionsOrFail(subscriptionNames1, channelNames[0], base.WithReply(channelNames[1])) // create subscriptions that subscribe the second channel, and call the logging service - if err := client.CreateSubscriptions(subscriptionNames2, channelNames[1], base.WithSubscriberForSubscription(loggerPodName)); err != nil { - t.Fatalf("Failed to create subscriptions %q for channel %q: %v", subscriptionNames2, channelNames[1], err) - } + client.CreateSubscriptionsOrFail(subscriptionNames2, channelNames[1], base.WithSubscriberForSubscription(loggerPodName)) // wait for all test resources to be ready, so that we can start sending events if err := client.WaitForAllTestResourcesReady(); err != nil { diff --git a/test/e2e/channel_event_transformation_test.go b/test/e2e/channel_event_transformation_test.go index ff65988ab22..17466cb5546 100644 --- a/test/e2e/channel_event_transformation_test.go +++ b/test/e2e/channel_event_transformation_test.go @@ -53,9 +53,7 @@ func TestEventTransformationForSubscription(t *testing.T) { defer TearDown(client) // create channels - if err := client.CreateChannels(channelNames, provisioner); err != nil { - st.Fatalf("Failed to create channels %q: %v", channelNames, err) - } + client.CreateChannelsOrFail(channelNames, provisioner) client.WaitForChannelsReady() // create transformation pod and service @@ -67,33 +65,25 @@ func TestEventTransformationForSubscription(t *testing.T) { Encoding: base.CloudEventDefaultEncoding, } transformationPod := base.EventTransformationPod(transformationPodName, eventAfterTransformation) - if err := client.CreatePod(transformationPod, common.WithService(transformationPodName)); err != nil { - st.Fatalf("Failed to create transformation service %q: %v", transformationPodName, err) - } + client.CreatePodOrFail(transformationPod, common.WithService(transformationPodName)) // create logger pod and service loggerPod := base.EventLoggerPod(loggerPodName) - if err := client.CreatePod(loggerPod, common.WithService(loggerPodName)); err != nil { - st.Fatalf("Failed to create logger service %q: %v", loggerPodName, err) - } + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward the transformed events to the second channel - if err := client.CreateSubscriptions( + client.CreateSubscriptionsOrFail( subscriptionNames1, channelNames[0], base.WithSubscriberForSubscription(transformationPodName), base.WithReply(channelNames[1]), - ); err != nil { - st.Fatalf("Failed to create subscriptions %q for channel %q: %v", subscriptionNames1, channelNames[0], err) - } + ) // create subscriptions that subscribe the second channel, and forward the received events to the logger service - if err := client.CreateSubscriptions( + client.CreateSubscriptionsOrFail( subscriptionNames2, channelNames[1], base.WithSubscriberForSubscription(loggerPodName), - ); err != nil { - st.Fatalf("Failed to create subscriptions %q for channel %q: %v", subscriptionNames2, channelNames[1], err) - } + ) // wait for all test resources to be ready, so that we can start sending events if err := client.WaitForAllTestResourcesReady(); err != nil { diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index 4af98952e6c..b7641d87baa 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -53,24 +53,18 @@ func singleEvent(t *testing.T, encoding string) { defer TearDown(client) // create channel - if err := client.CreateChannel(channelName, provisioner); err != nil { - st.Fatalf("Failed to create channel: %v", err) - } + client.CreateChannelOrFail(channelName, provisioner) // create logger service as the subscriber pod := base.EventLoggerPod(loggerPodName) - if err := client.CreatePod(pod, common.WithService(loggerPodName)); err != nil { - st.Fatalf("Failed to create logger service: %v", err) - } + client.CreatePodOrFail(pod, common.WithService(loggerPodName)) // create subscription to subscribe the channel, and forward the received events to the logger service - if err := client.CreateSubscription( + client.CreateSubscriptionOrFail( subscriptionName, channelName, base.WithSubscriberForSubscription(loggerPodName), - ); err != nil { - st.Fatalf("Failed to create subscription: %v", err) - } + ) // wait for all test resources to be ready, so that we can start sending events if err := client.WaitForAllTestResourcesReady(); err != nil { diff --git a/test/e2e/test_runner.go b/test/e2e/test_runner.go index c1a53aaaac1..28f0b9e93fe 100644 --- a/test/e2e/test_runner.go +++ b/test/e2e/test_runner.go @@ -66,7 +66,7 @@ func Setup(t *testing.T, provisioner string, runInParallel bool) *common.Client pkgTest.Flags.Kubeconfig, pkgTest.Flags.Cluster, namespace, - t.Logf) + t) if err != nil { t.Fatalf("Couldn't initialize clients: %v", err) } @@ -95,7 +95,7 @@ func Setup(t *testing.T, provisioner string, runInParallel bool) *common.Client func TearDown(client *common.Client) { client.Cleaner.Clean(true) if err := DeleteNameSpace(client); err != nil { - client.Logf("Could not delete the namespace %q: %v", client.Namespace, err) + client.T.Logf("Could not delete the namespace %q: %v", client.Namespace, err) } } @@ -183,8 +183,8 @@ func logPodLogsForDebugging(client *common.Client, podName, containerName string namespace := client.Namespace logs, err := client.Kube.PodLogs(podName, containerName, namespace) if err != nil { - client.Logf("Failed to get the logs for container %q of the pod %q in namespace %q: %v", containerName, podName, namespace, err) + client.T.Logf("Failed to get the logs for container %q of the pod %q in namespace %q: %v", containerName, podName, namespace, err) } else { - client.Logf("Logs for the container %q of the pod %q in namespace %q:\n%s", containerName, podName, namespace, string(logs)) + client.T.Logf("Logs for the container %q of the pod %q in namespace %q:\n%s", containerName, podName, namespace, string(logs)) } } From bebf9d3ebe7b23301e8118a6623401430a061dd5 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Wed, 29 May 2019 22:38:10 -0700 Subject: [PATCH 02/15] add resources --- test/base/generics.go | 2 +- test/base/resources.go | 106 +++++++++++++++++++++---------- test/e2e/source_cron_job_test.go | 42 ++++++++++++ 3 files changed, 114 insertions(+), 36 deletions(-) create mode 100644 test/e2e/source_cron_job_test.go diff --git a/test/base/generics.go b/test/base/generics.go index ede044bf141..0d17f6c43e0 100644 --- a/test/base/generics.go +++ b/test/base/generics.go @@ -40,7 +40,7 @@ func Meta(name, namespace, kind string) *MetaResource { }, TypeMeta: metav1.TypeMeta{ Kind: kind, - APIVersion: EventingAPIVersion, + APIVersion: eventingAPIVersion, }, } } diff --git a/test/base/resources.go b/test/base/resources.go index 5732d7b0cad..5bf23fbf7e1 100644 --- a/test/base/resources.go +++ b/test/base/resources.go @@ -21,7 +21,8 @@ package base import ( "fmt" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" pkgTest "github.com/knative/pkg/test" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -30,38 +31,38 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" ) -const EventingAPIVersion = "eventing.knative.dev/v1alpha1" +const eventingAPIVersion = "eventing.knative.dev/v1alpha1" // clusterChannelProvisioner returns a ClusterChannelProvisioner for a given name. func clusterChannelProvisioner(name string) *corev1.ObjectReference { if name == "" { return nil } - return pkgTest.CoreV1ObjectReference("ClusterChannelProvisioner", EventingAPIVersion, name) + return pkgTest.CoreV1ObjectReference("ClusterChannelProvisioner", eventingAPIVersion, name) } // channelRef returns an ObjectReference for a given Channel Name. func channelRef(name string) *corev1.ObjectReference { - return pkgTest.CoreV1ObjectReference("Channel", EventingAPIVersion, name) + return pkgTest.CoreV1ObjectReference("Channel", eventingAPIVersion, name) } // Channel returns a Channel with the specified provisioner. -func Channel(name, provisioner string) *v1alpha1.Channel { - return &v1alpha1.Channel{ +func Channel(name, provisioner string) *eventingv1alpha1.Channel { + return &eventingv1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, - Spec: v1alpha1.ChannelSpec{ + Spec: eventingv1alpha1.ChannelSpec{ Provisioner: clusterChannelProvisioner(provisioner), }, } } // WithSubscriberForSubscription returns an option that adds a Subscriber for the given Subscription. -func WithSubscriberForSubscription(name string) func(*v1alpha1.Subscription) { - return func(s *v1alpha1.Subscription) { +func WithSubscriberForSubscription(name string) func(*eventingv1alpha1.Subscription) { + return func(s *eventingv1alpha1.Subscription) { if name != "" { - s.Spec.Subscriber = &v1alpha1.SubscriberSpec{ + s.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ Ref: pkgTest.CoreV1ObjectReference("Service", "v1", name), } } @@ -69,23 +70,27 @@ func WithSubscriberForSubscription(name string) func(*v1alpha1.Subscription) { } // WithReply returns an options that adds a ReplyStrategy for the given Subscription. -func WithReply(name string) func(*v1alpha1.Subscription) { - return func(s *v1alpha1.Subscription) { +func WithReply(name string) func(*eventingv1alpha1.Subscription) { + return func(s *eventingv1alpha1.Subscription) { if name != "" { - s.Spec.Reply = &v1alpha1.ReplyStrategy{ - Channel: pkgTest.CoreV1ObjectReference("Channel", EventingAPIVersion, name), + s.Spec.Reply = &eventingv1alpha1.ReplyStrategy{ + Channel: pkgTest.CoreV1ObjectReference("Channel", eventingAPIVersion, name), } } } } // Subscription returns a Subscription. -func Subscription(name, channelName string, options ...func(*v1alpha1.Subscription)) *v1alpha1.Subscription { - subscription := &v1alpha1.Subscription{ +func Subscription( + name, + channelName string, + options ...func(*eventingv1alpha1.Subscription), +) *eventingv1alpha1.Subscription { + subscription := &eventingv1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, - Spec: v1alpha1.SubscriptionSpec{ + Spec: eventingv1alpha1.SubscriptionSpec{ Channel: *channelRef(channelName), }, } @@ -96,13 +101,13 @@ func Subscription(name, channelName string, options ...func(*v1alpha1.Subscripti } // Broker returns a Broker. -func Broker(name, provisioner string) *v1alpha1.Broker { - return &v1alpha1.Broker{ +func Broker(name, provisioner string) *eventingv1alpha1.Broker { + return &eventingv1alpha1.Broker{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, - Spec: v1alpha1.BrokerSpec{ - ChannelTemplate: &v1alpha1.ChannelSpec{ + Spec: eventingv1alpha1.BrokerSpec{ + ChannelTemplate: &eventingv1alpha1.ChannelSpec{ Provisioner: clusterChannelProvisioner(provisioner), }, }, @@ -110,10 +115,10 @@ func Broker(name, provisioner string) *v1alpha1.Broker { } // WithTriggerFilter returns an option that adds a TriggerFilter for the given Trigger. -func WithTriggerFilter(eventSource, eventType string) func(*v1alpha1.Trigger) { - return func(t *v1alpha1.Trigger) { - triggerFilter := &v1alpha1.TriggerFilter{ - SourceAndType: &v1alpha1.TriggerFilterSourceAndType{ +func WithTriggerFilter(eventSource, eventType string) func(*eventingv1alpha1.Trigger) { + return func(t *eventingv1alpha1.Trigger) { + triggerFilter := &eventingv1alpha1.TriggerFilter{ + SourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ Type: eventType, Source: eventSource, }, @@ -123,17 +128,17 @@ func WithTriggerFilter(eventSource, eventType string) func(*v1alpha1.Trigger) { } // WithBroker returns an option that adds a Broker for the given Trigger. -func WithBroker(brokerName string) func(*v1alpha1.Trigger) { - return func(t *v1alpha1.Trigger) { +func WithBroker(brokerName string) func(*eventingv1alpha1.Trigger) { + return func(t *eventingv1alpha1.Trigger) { t.Spec.Broker = brokerName } } // WithSubscriberRefForTrigger returns an option that adds a Subscriber Ref for the given Trigger. -func WithSubscriberRefForTrigger(name string) func(*v1alpha1.Trigger) { - return func(t *v1alpha1.Trigger) { +func WithSubscriberRefForTrigger(name string) func(*eventingv1alpha1.Trigger) { + return func(t *eventingv1alpha1.Trigger) { if name != "" { - t.Spec.Subscriber = &v1alpha1.SubscriberSpec{ + t.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ Ref: pkgTest.CoreV1ObjectReference("Service", "v1", name), } } @@ -141,17 +146,17 @@ func WithSubscriberRefForTrigger(name string) func(*v1alpha1.Trigger) { } // WithSubscriberURIForTrigger returns an option that adds a Subscriber URI for the given Trigger. -func WithSubscriberURIForTrigger(uri string) func(*v1alpha1.Trigger) { - return func(t *v1alpha1.Trigger) { - t.Spec.Subscriber = &v1alpha1.SubscriberSpec{ +func WithSubscriberURIForTrigger(uri string) func(*eventingv1alpha1.Trigger) { + return func(t *eventingv1alpha1.Trigger) { + t.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ URI: &uri, } } } // Trigger returns a Trigger. -func Trigger(name string, options ...func(*v1alpha1.Trigger)) *v1alpha1.Trigger { - trigger := &v1alpha1.Trigger{ +func Trigger(name string, options ...func(*eventingv1alpha1.Trigger)) *eventingv1alpha1.Trigger { + trigger := &eventingv1alpha1.Trigger{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, @@ -162,6 +167,37 @@ func Trigger(name string, options ...func(*v1alpha1.Trigger)) *v1alpha1.Trigger return trigger } +// WithSinkServiceForCronJobSource returns an option that adds a Kubernetes Service sink for the given CronJobSource. +func WithSinkServiceForCronJobSource(name string) func(*sourcesv1alpha1.CronJobSource) { + return func(cjs *sourcesv1alpha1.CronJobSource) { + cjs.Spec.Sink = pkgTest.CoreV1ObjectReference("Service", "v1", name) + } +} + +// CronJobSource returns a CronJob EventSource. +func CronJobSource( + name, + schedule, + data, + serviceAccountName string, + options ...func(*sourcesv1alpha1.CronJobSource), +) *sourcesv1alpha1.CronJobSource { + cronJobSource := &sourcesv1alpha1.CronJobSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: sourcesv1alpha1.CronJobSourceSpec{ + Schedule: schedule, + Data: data, + ServiceAccountName: serviceAccountName, + }, + } + for _, option := range options { + option(cronJobSource) + } + return cronJobSource +} + // CloudEvent specifies the arguments for a CloudEvent sent by the sendevent // binary. type CloudEvent struct { diff --git a/test/e2e/source_cron_job_test.go b/test/e2e/source_cron_job_test.go new file mode 100644 index 00000000000..1b7c5e4714a --- /dev/null +++ b/test/e2e/source_cron_job_test.go @@ -0,0 +1,42 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import "testing" + +func TestCronJobSource(t *testing.T) { + const ( + cronJobSourceName = "e2e-cron-job-source" + schedule = "0/30 * * * * ? *" + + loggerPodName = "e2e-cron-job-source-logger-pod" + saIngressName = "eventing-broker-ingress" + crIngressName = "eventing-broker-ingress" + ) + + client := Setup(t, "", true) + defer TearDown(client) + + // creates ServiceAccount and ClusterRoleBinding with default cluster-admin role + if err := client.CreateServiceAccountAndBinding(saIngressName, crIngressName); err != nil { + t.Fatalf("Failed to create the Ingress ServiceAccount and ServiceAccountRoleBinding: %v", err) + } + if err := client.CreateServiceAccountAndBinding(saFilterName, crFilterName); err != nil { + t.Fatalf("Failed to create the Filter ServiceAccount and ServiceAccountRoleBinding: %v", err) + } +} From 2bace45478e206c0e8588129a16567c012a32f17 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Wed, 29 May 2019 22:42:04 -0700 Subject: [PATCH 03/15] provisioner is not needed in Setup --- test/e2e/broker_channel_flow_test.go | 2 +- test/e2e/broker_default_test.go | 2 +- test/e2e/broker_event_transformation_test.go | 2 +- test/e2e/channel_chain_test.go | 2 +- test/e2e/channel_event_transformation_test.go | 2 +- test/e2e/channel_single_event_test.go | 2 +- test/e2e/test_runner.go | 3 +-- 7 files changed, 7 insertions(+), 8 deletions(-) diff --git a/test/e2e/broker_channel_flow_test.go b/test/e2e/broker_channel_flow_test.go index 043ebffcca4..f4807d59407 100644 --- a/test/e2e/broker_channel_flow_test.go +++ b/test/e2e/broker_channel_flow_test.go @@ -81,7 +81,7 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { subscriptionName = "e2e-brokerchannel-subscription" ) - client := Setup(t, provisioner, true) + client := Setup(t, true) defer TearDown(client) // creates ServiceAccount and ClusterRoleBinding with default cluster-admin role diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index 68a53791af5..f70b4ba5412 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -55,7 +55,7 @@ type eventReceiver struct { // and sends different events to the broker's address. Finally, it verifies that only // the appropriate events are routed to the subscribers. func TestDefaultBrokerWithManyTriggers(t *testing.T) { - client := Setup(t, common.DefaultClusterChannelProvisioner, true) + client := Setup(t, true) defer TearDown(client) // Label namespace so that it creates the default broker. diff --git a/test/e2e/broker_event_transformation_test.go b/test/e2e/broker_event_transformation_test.go index 7aba9f24f1a..d9ec7a52c90 100644 --- a/test/e2e/broker_event_transformation_test.go +++ b/test/e2e/broker_event_transformation_test.go @@ -70,7 +70,7 @@ func testEventTransformationForTrigger(t *testing.T, provisioner string) { loggerPodName = "logger-pod" ) - client := Setup(t, provisioner, true) + client := Setup(t, true) defer TearDown(client) // creates ServiceAccount and ClusterRoleBinding with default cluster-admin role diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 6d96ac1e5da..2446ba68b29 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -48,7 +48,7 @@ func testChannelChain(t *testing.T, provisioner string) { // subscriptionNames2 corresponds to Subscriptions on channelNames[1] subscriptionNames2 := []string{"e2e-channelchain-subs21"} - client := Setup(t, provisioner, true) + client := Setup(t, true) defer TearDown(client) // create channels diff --git a/test/e2e/channel_event_transformation_test.go b/test/e2e/channel_event_transformation_test.go index 17466cb5546..e405f9dbba6 100644 --- a/test/e2e/channel_event_transformation_test.go +++ b/test/e2e/channel_event_transformation_test.go @@ -49,7 +49,7 @@ func TestEventTransformationForSubscription(t *testing.T) { loggerPodName := "e2e-eventtransformation-logger-pod" RunTests(t, common.FeatureBasic, func(st *testing.T, provisioner string) { - client := Setup(st, provisioner, true) + client := Setup(st, true) defer TearDown(client) // create channels diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index b7641d87baa..138a804683b 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -49,7 +49,7 @@ func singleEvent(t *testing.T, encoding string) { RunTests(t, common.FeatureBasic, func(st *testing.T, provisioner string) { st.Logf("Run test with provisioner %q", provisioner) - client := Setup(st, provisioner, true) + client := Setup(st, true) defer TearDown(client) // create channel diff --git a/test/e2e/test_runner.go b/test/e2e/test_runner.go index 28f0b9e93fe..b13e16b336a 100644 --- a/test/e2e/test_runner.go +++ b/test/e2e/test_runner.go @@ -56,9 +56,8 @@ func RunTests(t *testing.T, feature common.Feature, testFunc func(st *testing.T, // Setup creates the client objects needed in the e2e tests, // and does other setups, like creating namespaces, run the test case in parallel, etc. -func Setup(t *testing.T, provisioner string, runInParallel bool) *common.Client { +func Setup(t *testing.T, runInParallel bool) *common.Client { // Create a new namespace to run this test case. - // Combine the test name and CCP to avoid duplication. baseFuncName := getBaseFuncName(t.Name()) namespace := makeK8sNamePrefix(baseFuncName) t.Logf("namespace is : %q", namespace) From c750a41f01064b41d10f53dcd0d470794faa9b74 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Wed, 29 May 2019 23:44:06 -0700 Subject: [PATCH 04/15] add test case for cronjobsource --- test/base/generics.go | 12 +++++++++- test/base/resources.go | 1 + test/common/creation.go | 29 +++++++++++++++++++---- test/common/operation.go | 40 +++++++++++++++++++++++++++----- test/common/validation.go | 9 ++++++- test/e2e/source_cron_job_test.go | 36 ++++++++++++++++++++++------ 6 files changed, 108 insertions(+), 19 deletions(-) diff --git a/test/base/generics.go b/test/base/generics.go index 0d17f6c43e0..f3a342e1f4b 100644 --- a/test/base/generics.go +++ b/test/base/generics.go @@ -31,8 +31,18 @@ type MetaResource struct { metav1.ObjectMeta `json:"metadata,omitempty"` } +// MetaEventing returns a MetaResource that can represent a Knative Eventing resource. +func MetaEventing(name, namespace, kind string) *MetaResource { + return Meta(name, namespace, kind, eventingAPIVersion) +} + +// MetaSource returns a MetaResource that can represent a Knative Sources resource. +func MetaSource(name, namespace, kind string) *MetaResource { + return Meta(name, namespace, kind, sourcesAPIVersion) +} + // Meta returns a MetaResource built from the given name, namespace and kind. -func Meta(name, namespace, kind string) *MetaResource { +func Meta(name, namespace, kind, apiVersion string) *MetaResource { return &MetaResource{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, diff --git a/test/base/resources.go b/test/base/resources.go index 5bf23fbf7e1..9ec60959c02 100644 --- a/test/base/resources.go +++ b/test/base/resources.go @@ -32,6 +32,7 @@ import ( ) const eventingAPIVersion = "eventing.knative.dev/v1alpha1" +const sourcesAPIVersion = "sources.eventing.knative.dev/v1alpha1" // clusterChannelProvisioner returns a ClusterChannelProvisioner for a given name. func clusterChannelProvisioner(name string) *corev1.ObjectReference { diff --git a/test/common/creation.go b/test/common/creation.go index 4eca1a06dc5..427a4612dc5 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -17,7 +17,8 @@ limitations under the License. package common import ( - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" "github.com/knative/eventing/test/base" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -50,7 +51,7 @@ func (client *Client) CreateChannelsOrFail(names []string, provisionerName strin } // CreateSubscriptionOrFail will create a Subscription. -func (client *Client) CreateSubscriptionOrFail(name, channelName string, options ...func(*v1alpha1.Subscription)) { +func (client *Client) CreateSubscriptionOrFail(name, channelName string, options ...func(*eventingv1alpha1.Subscription)) { namespace := client.Namespace subscription := base.Subscription(name, channelName, options...) @@ -64,7 +65,7 @@ func (client *Client) CreateSubscriptionOrFail(name, channelName string, options } // CreateSubscriptionsOrFail will create a list of Subscriptions with the same configuration except the name. -func (client *Client) CreateSubscriptionsOrFail(names []string, channelName string, options ...func(*v1alpha1.Subscription)) { +func (client *Client) CreateSubscriptionsOrFail(names []string, channelName string, options ...func(*eventingv1alpha1.Subscription)) { for _, name := range names { client.CreateSubscriptionOrFail(name, channelName, options...) } @@ -92,7 +93,7 @@ func (client *Client) CreateBrokersOrFail(names []string, provisionerName string } // CreateTriggerOrFail will create a Trigger. -func (client *Client) CreateTriggerOrFail(name string, options ...func(*v1alpha1.Trigger)) { +func (client *Client) CreateTriggerOrFail(name string, options ...func(*eventingv1alpha1.Trigger)) { namespace := client.Namespace trigger := base.Trigger(name, options...) @@ -105,6 +106,26 @@ func (client *Client) CreateTriggerOrFail(name string, options ...func(*v1alpha1 client.Cleaner.AddObj(trigger) } +// CreateCronJobSourceOrFail will create a CronJobSource. +func (client *Client) CreateCronJobSourceOrFail( + name, + schedule, + data, + serviceAccountName string, + options ...func(*sourcesv1alpha1.CronJobSource), +) { + namespace := client.Namespace + cronJobSource := base.CronJobSource(name, schedule, data, serviceAccountName, options...) + + cronJobSources := client.Eventing.SourcesV1alpha1().CronJobSources(namespace) + // update cronJobSource with the new reference + cronJobSource, err := cronJobSources.Create(cronJobSource) + if err != nil { + client.T.Fatalf("Failed to create cronjobsource %q: %v", name, err) + } + client.Cleaner.AddObj(cronJobSource) +} + // WithService returns an option that creates a Service binded with the given pod. func WithService(name string) func(*corev1.Pod, *Client) error { return func(pod *corev1.Pod, client *Client) error { diff --git a/test/common/operation.go b/test/common/operation.go index f8eb07a5bd1..a7f6b300d84 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -53,7 +53,7 @@ func (client *Client) SendFakeEventToChannel(senderName, channelName string, eve // GetChannelURL will return the url for the given channel. func (client *Client) GetChannelURL(name string) (string, error) { namespace := client.Namespace - channelMeta := base.Meta(name, namespace, "Channel") + channelMeta := base.MetaEventing(name, namespace, "Channel") return base.GetAddressableURI(client.Dynamic, channelMeta) } @@ -69,7 +69,7 @@ func (client *Client) SendFakeEventToBroker(senderName, brokerName string, event // GetBrokerURL will return the url for the given broker. func (client *Client) GetBrokerURL(name string) (string, error) { namespace := client.Namespace - brokerMeta := base.Meta(name, namespace, "Broker") + brokerMeta := base.MetaEventing(name, namespace, "Broker") return base.GetAddressableURI(client.Dynamic, brokerMeta) } @@ -92,7 +92,7 @@ func (client *Client) sendFakeEventToAddress( // WaitForBrokerReady waits until the broker is Ready. func (client *Client) WaitForBrokerReady(name string) error { namespace := client.Namespace - brokerMeta := base.Meta(name, namespace, "Broker") + brokerMeta := base.MetaEventing(name, namespace, "Broker") if err := base.WaitForResourceReady(client.Dynamic, brokerMeta); err != nil { return err } @@ -117,7 +117,7 @@ func (client *Client) WaitForBrokersReady() error { // WaitForTriggerReady waits until the trigger is Ready. func (client *Client) WaitForTriggerReady(name string) error { namespace := client.Namespace - triggerMeta := base.Meta(name, namespace, "Trigger") + triggerMeta := base.MetaEventing(name, namespace, "Trigger") if err := base.WaitForResourceReady(client.Dynamic, triggerMeta); err != nil { return err } @@ -142,7 +142,7 @@ func (client *Client) WaitForTriggersReady() error { // WaitForChannelReady waits until the channel is Ready. func (client *Client) WaitForChannelReady(name string) error { namespace := client.Namespace - channelMeta := base.Meta(name, namespace, "Channel") + channelMeta := base.MetaEventing(name, namespace, "Channel") if err := base.WaitForResourceReady(client.Dynamic, channelMeta); err != nil { return err } @@ -167,7 +167,7 @@ func (client *Client) WaitForChannelsReady() error { // WaitForSubscriptionReady waits until the subscription is Ready. func (client *Client) WaitForSubscriptionReady(name string) error { namespace := client.Namespace - subscriptionMeta := base.Meta(name, namespace, "Subscription") + subscriptionMeta := base.MetaEventing(name, namespace, "Subscription") if err := base.WaitForResourceReady(client.Dynamic, subscriptionMeta); err != nil { return err } @@ -189,6 +189,31 @@ func (client *Client) WaitForSubscriptionsReady() error { return nil } +// WaitForCronJobSourceReady waits until the cronjobsource is Ready. +func (client *Client) WaitForCronJobSourceReady(name string) error { + namespace := client.Namespace + cronJobSourceMeta := base.MetaSource(name, namespace, "CronJobSource") + if err := base.WaitForResourceReady(client.Dynamic, cronJobSourceMeta); err != nil { + return err + } + return nil +} + +// WaitForCronJobSourcesReady waits until all cronjobsources in the namespace are Ready. +func (client *Client) WaitForCronJobSourcesReady() error { + namespace := client.Namespace + cronJobSources, err := client.Eventing.SourcesV1alpha1().CronJobSources(namespace).List(metav1.ListOptions{}) + if err != nil { + return err + } + for _, cronJobSource := range cronJobSources.Items { + if err := client.WaitForCronJobSourceReady(cronJobSource.Name); err != nil { + return err + } + } + return nil +} + // WaitForAllTestResourcesReady waits until all test resources in the namespace are Ready. // Currently the test resources include Pod, Channel, Subscription, Broker and Trigger. // If there are new resources, this function needs to be changed. @@ -205,6 +230,9 @@ func (client *Client) WaitForAllTestResourcesReady() error { if err := client.WaitForTriggersReady(); err != nil { return err } + // if err := client.WaitForCronJobSourcesReady(); err != nil { + // return err + // } if err := pkgTest.WaitForAllPodsRunning(client.Kube, client.Namespace); err != nil { return err } diff --git a/test/common/validation.go b/test/common/validation.go index 4d2cbaca62c..d16cd49b544 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -66,13 +66,20 @@ func CheckerContainsAll(contents []string) func(string) bool { } } -// CheckerContainsCount returns a checker functions to check if the log contains the count number of given content. +// CheckerContainsCount returns a checker function to check if the log contains the count number of given content. func CheckerContainsCount(content string, count int) func(string) bool { return func(log string) bool { return strings.Count(log, content) == count } } +// CheckerContainsAtLeast returns a checker function to check if the log contains at least the count number of given content. +func CheckerContainsAtLeast(content string, count int) func(string) bool { + return func(log string) bool { + return strings.Count(log, content) >= count + } +} + // FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents. // It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true. func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) { diff --git a/test/e2e/source_cron_job_test.go b/test/e2e/source_cron_job_test.go index 1b7c5e4714a..02095a983eb 100644 --- a/test/e2e/source_cron_job_test.go +++ b/test/e2e/source_cron_job_test.go @@ -17,26 +17,48 @@ limitations under the License. package e2e -import "testing" +import ( + "fmt" + "testing" + + "github.com/knative/eventing/test/base" + "github.com/knative/eventing/test/common" + "k8s.io/apimachinery/pkg/util/uuid" +) func TestCronJobSource(t *testing.T) { const ( cronJobSourceName = "e2e-cron-job-source" - schedule = "0/30 * * * * ? *" + // Every 10 seconds starting at :00 second after the minute + schedule = "0/10 * * * * ? *" loggerPodName = "e2e-cron-job-source-logger-pod" saIngressName = "eventing-broker-ingress" crIngressName = "eventing-broker-ingress" ) - client := Setup(t, "", true) + client := Setup(t, true) defer TearDown(client) // creates ServiceAccount and ClusterRoleBinding with default cluster-admin role - if err := client.CreateServiceAccountAndBinding(saIngressName, crIngressName); err != nil { - t.Fatalf("Failed to create the Ingress ServiceAccount and ServiceAccountRoleBinding: %v", err) + client.CreateServiceAccountAndBindingOrFail(saIngressName, crIngressName) + + // create event logger pod and service + loggerPod := base.EventLoggerPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + + // create cron job source + data := fmt.Sprintf("TestCronJobSource %s", uuid.NewUUID()) + sinkOption := base.WithSinkServiceForCronJobSource(loggerPodName) + client.CreateCronJobSourceOrFail(cronJobSourceName, schedule, data, saIngressName, sinkOption) + + // wait for all test resources to be ready, so that we can start sending events + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) } - if err := client.CreateServiceAccountAndBinding(saFilterName, crFilterName); err != nil { - t.Fatalf("Failed to create the Filter ServiceAccount and ServiceAccountRoleBinding: %v", err) + + // verify the logger service receives the event + if err := client.CheckLog(loggerPodName, common.CheckerContainsAtLeast(data, 2)); err != nil { + t.Fatalf("String %q not found in logs of logger pod %q: %v", data, loggerPodName, err) } } From b5786792e025f7f6f83a091c358128f745b7918a Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Wed, 29 May 2019 23:45:07 -0700 Subject: [PATCH 05/15] fix README --- test/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/README.md b/test/README.md index 274d9850da9..a05ffd3e432 100644 --- a/test/README.md +++ b/test/README.md @@ -93,11 +93,11 @@ go test -v -tags=e2e -count=1 ./test/e2e -clusterChannelProvisioners=in-memory,g #### One test case -To run one e2e test case, e.g. `TestSingleBinaryEvents`, use +To run one e2e test case, e.g. `TestSingleBinaryEventForChannel`, use [the `-run` flag with `go test`](https://golang.org/cmd/go/#hdr-Testing_flags): ```bash -go test -v -tags=e2e -count=1 ./test/e2e -run ^TestSingleBinaryEvent$ +go test -v -tags=e2e -count=1 ./test/e2e -run ^TestSingleBinaryEventForChannel$ ``` By default, it will run the test against the default @@ -107,7 +107,7 @@ If you want to run it against another `ClusterChannelProvisioner`, you can specify it through `-clusterChannelProvisioners`. ```bash -go test -v -tags=e2e -count=1 ./test/e2e -run ^TestSingleBinaryEvent$ -clusterChannelProvisioners=in-memory +go test -v -tags=e2e -count=1 ./test/e2e -run ^TestSingleBinaryEventForChannel$ -clusterChannelProvisioners=in-memory ``` ## Environment requirements From 6d6416f6e02709d3a347bc16145f3503a52bab74 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 11:46:59 -0700 Subject: [PATCH 06/15] cron job source test is now working --- test/base/generics.go | 2 +- test/common/operation.go | 6 +++--- test/common/validation.go | 9 +-------- test/e2e/source_cron_job_test.go | 6 +++--- 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/test/base/generics.go b/test/base/generics.go index f3a342e1f4b..93fe161fc4d 100644 --- a/test/base/generics.go +++ b/test/base/generics.go @@ -50,7 +50,7 @@ func Meta(name, namespace, kind, apiVersion string) *MetaResource { }, TypeMeta: metav1.TypeMeta{ Kind: kind, - APIVersion: eventingAPIVersion, + APIVersion: apiVersion, }, } } diff --git a/test/common/operation.go b/test/common/operation.go index a7f6b300d84..7e8fe951080 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -230,9 +230,9 @@ func (client *Client) WaitForAllTestResourcesReady() error { if err := client.WaitForTriggersReady(); err != nil { return err } - // if err := client.WaitForCronJobSourcesReady(); err != nil { - // return err - // } + if err := client.WaitForCronJobSourcesReady(); err != nil { + return err + } if err := pkgTest.WaitForAllPodsRunning(client.Kube, client.Namespace); err != nil { return err } diff --git a/test/common/validation.go b/test/common/validation.go index d16cd49b544..b46a636973b 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -27,7 +27,7 @@ import ( const ( // The interval and timeout used for polling pod logs. interval = 1 * time.Second - timeout = 2 * time.Minute + timeout = 4 * time.Minute ) // CheckLog waits until logs for the logger Pod satisfy the checker. @@ -73,13 +73,6 @@ func CheckerContainsCount(content string, count int) func(string) bool { } } -// CheckerContainsAtLeast returns a checker function to check if the log contains at least the count number of given content. -func CheckerContainsAtLeast(content string, count int) func(string) bool { - return func(log string) bool { - return strings.Count(log, content) >= count - } -} - // FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents. // It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true. func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) { diff --git a/test/e2e/source_cron_job_test.go b/test/e2e/source_cron_job_test.go index 02095a983eb..36d3c78be5a 100644 --- a/test/e2e/source_cron_job_test.go +++ b/test/e2e/source_cron_job_test.go @@ -29,8 +29,8 @@ import ( func TestCronJobSource(t *testing.T) { const ( cronJobSourceName = "e2e-cron-job-source" - // Every 10 seconds starting at :00 second after the minute - schedule = "0/10 * * * * ? *" + // Every 1 minute starting from now + schedule = "*/1 * * * *" loggerPodName = "e2e-cron-job-source-logger-pod" saIngressName = "eventing-broker-ingress" @@ -58,7 +58,7 @@ func TestCronJobSource(t *testing.T) { } // verify the logger service receives the event - if err := client.CheckLog(loggerPodName, common.CheckerContainsAtLeast(data, 2)); err != nil { + if err := client.CheckLog(loggerPodName, common.CheckerContains(data)); err != nil { t.Fatalf("String %q not found in logs of logger pod %q: %v", data, loggerPodName, err) } } From 34f41bb13347160e2741c9e08bce8720dd1c09ae Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 13:54:05 -0700 Subject: [PATCH 07/15] add image for testing ContainerSource --- test/test_images/heartbeats/main.go | 119 ++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 test/test_images/heartbeats/main.go diff --git a/test/test_images/heartbeats/main.go b/test/test_images/heartbeats/main.go new file mode 100644 index 00000000000..1ae333a8431 --- /dev/null +++ b/test/test_images/heartbeats/main.go @@ -0,0 +1,119 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/knative/eventing/pkg/kncloudevents" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/kelseyhightower/envconfig" +) + +type Heartbeat struct { + Sequence int `json:"id"` + Msg string `json:"msg"` +} + +var ( + sink string + msg string + periodStr string +) + +func init() { + flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") + flag.StringVar(&msg, "msg", "", "the data message") + flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") +} + +type envConfig struct { + // Sink URL where to send heartbeat cloudevents + Sink string `envconfig:"SINK"` + + // Name of this pod. + Name string `envconfig:"POD_NAME" required:"true"` + + // Namespace this pod exists in. + Namespace string `envconfig:"POD_NAMESPACE" required:"true"` +} + +func main() { + flag.Parse() + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + + if env.Sink != "" { + sink = env.Sink + } + + c, err := kncloudevents.NewDefaultClient(sink) + if err != nil { + log.Fatalf("failed to create client: %s", err.Error()) + } + + var period time.Duration + if p, err := strconv.Atoi(periodStr); err != nil { + period = time.Duration(5) * time.Second + } else { + period = time.Duration(p) * time.Second + } + + source := types.ParseURLRef( + fmt.Sprintf("https://github.com/knative/eventing-sources/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)) + log.Printf("Heartbeats Source: %s", source) + + hb := &Heartbeat{ + Sequence: 0, + Msg: msg, + } + ticker := time.NewTicker(period) + for { + hb.Sequence++ + + event := cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.eventing.samples.heartbeat", + Source: *source, + Extensions: map[string]interface{}{ + "the": 42, + "heart": "yes", + "beats": true, + }, + }.AsV02(), + Data: hb, + } + + if _, err := c.Send(context.Background(), event); err != nil { + log.Printf("failed to send cloudevent: %s", err.Error()) + } + // Wait for next tick + <-ticker.C + } +} From f7e5385740948b07be60efc7fe075775a0edf302 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 14:00:44 -0700 Subject: [PATCH 08/15] serviceaccount is not required for cronjobsource --- test/base/resources.go | 15 ++++++++++----- test/common/creation.go | 5 ++--- test/e2e/source_cron_job_test.go | 7 +------ 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/test/base/resources.go b/test/base/resources.go index 9ec60959c02..88591a805b4 100644 --- a/test/base/resources.go +++ b/test/base/resources.go @@ -175,12 +175,18 @@ func WithSinkServiceForCronJobSource(name string) func(*sourcesv1alpha1.CronJobS } } +// WithServiceAccountForCronJobSource returns an option that adds a ServiceAccount for the given CronJobSource. +func WithServiceAccountForCronJobSource(saName string) func(*sourcesv1alpha1.CronJobSource) { + return func(cjs *sourcesv1alpha1.CronJobSource) { + cjs.Spec.ServiceAccountName = saName + } +} + // CronJobSource returns a CronJob EventSource. func CronJobSource( name, schedule, - data, - serviceAccountName string, + data string, options ...func(*sourcesv1alpha1.CronJobSource), ) *sourcesv1alpha1.CronJobSource { cronJobSource := &sourcesv1alpha1.CronJobSource{ @@ -188,9 +194,8 @@ func CronJobSource( Name: name, }, Spec: sourcesv1alpha1.CronJobSourceSpec{ - Schedule: schedule, - Data: data, - ServiceAccountName: serviceAccountName, + Schedule: schedule, + Data: data, }, } for _, option := range options { diff --git a/test/common/creation.go b/test/common/creation.go index 427a4612dc5..bc287b6d37e 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -110,12 +110,11 @@ func (client *Client) CreateTriggerOrFail(name string, options ...func(*eventing func (client *Client) CreateCronJobSourceOrFail( name, schedule, - data, - serviceAccountName string, + data string, options ...func(*sourcesv1alpha1.CronJobSource), ) { namespace := client.Namespace - cronJobSource := base.CronJobSource(name, schedule, data, serviceAccountName, options...) + cronJobSource := base.CronJobSource(name, schedule, data, options...) cronJobSources := client.Eventing.SourcesV1alpha1().CronJobSources(namespace) // update cronJobSource with the new reference diff --git a/test/e2e/source_cron_job_test.go b/test/e2e/source_cron_job_test.go index 36d3c78be5a..f7300597c19 100644 --- a/test/e2e/source_cron_job_test.go +++ b/test/e2e/source_cron_job_test.go @@ -33,16 +33,11 @@ func TestCronJobSource(t *testing.T) { schedule = "*/1 * * * *" loggerPodName = "e2e-cron-job-source-logger-pod" - saIngressName = "eventing-broker-ingress" - crIngressName = "eventing-broker-ingress" ) client := Setup(t, true) defer TearDown(client) - // creates ServiceAccount and ClusterRoleBinding with default cluster-admin role - client.CreateServiceAccountAndBindingOrFail(saIngressName, crIngressName) - // create event logger pod and service loggerPod := base.EventLoggerPod(loggerPodName) client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) @@ -50,7 +45,7 @@ func TestCronJobSource(t *testing.T) { // create cron job source data := fmt.Sprintf("TestCronJobSource %s", uuid.NewUUID()) sinkOption := base.WithSinkServiceForCronJobSource(loggerPodName) - client.CreateCronJobSourceOrFail(cronJobSourceName, schedule, data, saIngressName, sinkOption) + client.CreateCronJobSourceOrFail(cronJobSourceName, schedule, data, sinkOption) // wait for all test resources to be ready, so that we can start sending events if err := client.WaitForAllTestResourcesReady(); err != nil { From ca5fe7bb8296c67887ae4e516e6278ce47cf1240 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 15:28:23 -0700 Subject: [PATCH 09/15] add an e2e test case for container source, still in progress --- test/base/resources.go | 48 ++++++++++++++++++++++ test/common/creation.go | 18 +++++++++ test/common/operation.go | 28 +++++++++++++ test/common/validation.go | 7 ++++ test/e2e/source_container_test.go | 66 +++++++++++++++++++++++++++++++ 5 files changed, 167 insertions(+) create mode 100644 test/e2e/source_container_test.go diff --git a/test/base/resources.go b/test/base/resources.go index 88591a805b4..c9cd61f6f54 100644 --- a/test/base/resources.go +++ b/test/base/resources.go @@ -204,6 +204,54 @@ func CronJobSource( return cronJobSource } +// WithArgsForContainerSource returns an option that adds args for the given ContainerSource. +func WithArgsForContainerSource(args []string) func(*sourcesv1alpha1.ContainerSource) { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.Args = args + } +} + +// WithEnvVarsForContainerSource returns an option that adds environment vars for the given ContainerSource. +func WithEnvVarsForContainerSource(envVars []corev1.EnvVar) func(*sourcesv1alpha1.ContainerSource) { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.Env = envVars + } +} + +// WithServiceAccountForContainerSource returns an option that adds a ServiceAccount for the given ContainerSource. +func WithServiceAccountForContainerSource(saName string) func(*sourcesv1alpha1.ContainerSource) { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.ServiceAccountName = saName + } +} + +// WithSinkServiceForContainerSource returns an option that adds a Kubernetes Service sink for the given ContainerSource. +func WithSinkServiceForContainerSource(name string) func(*sourcesv1alpha1.ContainerSource) { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.Sink = pkgTest.CoreV1ObjectReference("Service", "v1", name) + } +} + +// ContainerSource returns a Container EventSource. +func ContainerSource( + name, + imageName string, + options ...func(*sourcesv1alpha1.ContainerSource), +) *sourcesv1alpha1.ContainerSource { + containerSource := &sourcesv1alpha1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: sourcesv1alpha1.ContainerSourceSpec{ + Image: pkgTest.ImagePath(imageName), + }, + } + for _, option := range options { + option(containerSource) + } + return containerSource +} + // CloudEvent specifies the arguments for a CloudEvent sent by the sendevent // binary. type CloudEvent struct { diff --git a/test/common/creation.go b/test/common/creation.go index bc287b6d37e..b43f15de4cc 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -125,6 +125,24 @@ func (client *Client) CreateCronJobSourceOrFail( client.Cleaner.AddObj(cronJobSource) } +// CreateContainerSourceOrFail will create a ContainerSource. +func (client *Client) CreateContainerSourceOrFail( + name, + imageName string, + options ...func(*sourcesv1alpha1.ContainerSource), +) { + namespace := client.Namespace + containerSource := base.ContainerSource(name, imageName, options...) + + containerSources := client.Eventing.SourcesV1alpha1().ContainerSources(namespace) + // update containerSource with the new reference + containerSource, err := containerSources.Create(containerSource) + if err != nil { + client.T.Fatalf("Failed to create containersource %q: %v", name, err) + } + client.Cleaner.AddObj(containerSource) +} + // WithService returns an option that creates a Service binded with the given pod. func WithService(name string) func(*corev1.Pod, *Client) error { return func(pod *corev1.Pod, client *Client) error { diff --git a/test/common/operation.go b/test/common/operation.go index 7e8fe951080..ac0670dd01d 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -214,6 +214,31 @@ func (client *Client) WaitForCronJobSourcesReady() error { return nil } +// WaitForContainerSourceReady waits until the containersource is Ready. +func (client *Client) WaitForContainerSourceReady(name string) error { + namespace := client.Namespace + containerSourceMeta := base.MetaSource(name, namespace, "ContainerSource") + if err := base.WaitForResourceReady(client.Dynamic, containerSourceMeta); err != nil { + return err + } + return nil +} + +// WaitForContainerSourcesReady waits until all containersources in the namespace are Ready. +func (client *Client) WaitForContainerSourcesReady() error { + namespace := client.Namespace + containerSources, err := client.Eventing.SourcesV1alpha1().ContainerSources(namespace).List(metav1.ListOptions{}) + if err != nil { + return err + } + for _, containerSource := range containerSources.Items { + if err := client.WaitForContainerSourceReady(containerSource.Name); err != nil { + return err + } + } + return nil +} + // WaitForAllTestResourcesReady waits until all test resources in the namespace are Ready. // Currently the test resources include Pod, Channel, Subscription, Broker and Trigger. // If there are new resources, this function needs to be changed. @@ -233,6 +258,9 @@ func (client *Client) WaitForAllTestResourcesReady() error { if err := client.WaitForCronJobSourcesReady(); err != nil { return err } + if err := client.WaitForContainerSourcesReady(); err != nil { + return err + } if err := pkgTest.WaitForAllPodsRunning(client.Kube, client.Namespace); err != nil { return err } diff --git a/test/common/validation.go b/test/common/validation.go index b46a636973b..f375c86fc4d 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -73,6 +73,13 @@ func CheckerContainsCount(content string, count int) func(string) bool { } } +// CheckerContainsAtLeast returns a checker function to check if the log contains at least the count number of given content. +func CheckerContainsAtLeast(content string, count int) func(string) bool { + return func(log string) bool { + return strings.Count(log, content) >= count + } +} + // FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents. // It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true. func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) { diff --git a/test/e2e/source_container_test.go b/test/e2e/source_container_test.go new file mode 100644 index 00000000000..c29d53ee0f0 --- /dev/null +++ b/test/e2e/source_container_test.go @@ -0,0 +1,66 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/test/base" + "github.com/knative/eventing/test/common" + "k8s.io/apimachinery/pkg/util/uuid" +) + +func TestContainerSource(t *testing.T) { + const ( + containerSourceName = "e2e-container-source" + // the heartbeats image is built from test_images/heartbeats + imageName = "heartbeats" + + loggerPodName = "e2e-container-source-logger-pod" + + saIngressName = "e2e-container-source-ingress" + crIngressName = "eventing-broker-ingress" + ) + data := fmt.Sprintf("TestContainerSource %s", uuid.NewUUID()) + // msg is an argument that is used in the heartbeats image + args := []string{"--msg=" + data} + + client := Setup(t, true) + defer TearDown(client) + + // create event logger pod and service + loggerPod := base.EventLoggerPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + + // create container source + sinkOption := base.WithSinkServiceForContainerSource(loggerPodName) + argsOption := base.WithArgsForContainerSource(args) + client.CreateContainerSourceOrFail(containerSourceName, imageName, argsOption, sinkOption) + + // wait for all test resources to be ready + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // verify the logger service receives the event + expectedCount := 2 + if err := client.CheckLog(loggerPodName, common.CheckerContainsAtLeast(data, expectedCount)); err != nil { + t.Fatalf("String %q does not appear at least %i times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err) + } +} From 249cfb963ea60b45f45035dfe84cf34ba501f4ec Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 17:26:53 -0700 Subject: [PATCH 10/15] not sure why it's not working... --- test/common/validation.go | 2 ++ test/e2e/source_container_test.go | 9 ++++++--- test/test_images/heartbeats/main.go | 12 ++---------- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/test/common/validation.go b/test/common/validation.go index f375c86fc4d..c902c7ba8af 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -17,6 +17,7 @@ limitations under the License. package common import ( + "fmt" "strings" "time" @@ -43,6 +44,7 @@ func (client *Client) CheckLog(podName string, checker func(string) bool) error if err != nil { return true, err } + fmt.Println(string(logs)) return checker(string(logs)), nil }) } diff --git a/test/e2e/source_container_test.go b/test/e2e/source_container_test.go index c29d53ee0f0..333c0b2695a 100644 --- a/test/e2e/source_container_test.go +++ b/test/e2e/source_container_test.go @@ -37,13 +37,15 @@ func TestContainerSource(t *testing.T) { saIngressName = "e2e-container-source-ingress" crIngressName = "eventing-broker-ingress" ) - data := fmt.Sprintf("TestContainerSource %s", uuid.NewUUID()) + data := fmt.Sprintf("TestContainerSource%s", uuid.NewUUID()) // msg is an argument that is used in the heartbeats image args := []string{"--msg=" + data} client := Setup(t, true) defer TearDown(client) + client.CreateServiceAccountAndBindingOrFail(saIngressName, crIngressName) + // create event logger pod and service loggerPod := base.EventLoggerPod(loggerPodName) client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) @@ -51,7 +53,8 @@ func TestContainerSource(t *testing.T) { // create container source sinkOption := base.WithSinkServiceForContainerSource(loggerPodName) argsOption := base.WithArgsForContainerSource(args) - client.CreateContainerSourceOrFail(containerSourceName, imageName, argsOption, sinkOption) + saOption := base.WithServiceAccountForContainerSource(saIngressName) + client.CreateContainerSourceOrFail(containerSourceName, imageName, argsOption, sinkOption, saOption) // wait for all test resources to be ready if err := client.WaitForAllTestResourcesReady(); err != nil { @@ -61,6 +64,6 @@ func TestContainerSource(t *testing.T) { // verify the logger service receives the event expectedCount := 2 if err := client.CheckLog(loggerPodName, common.CheckerContainsAtLeast(data, expectedCount)); err != nil { - t.Fatalf("String %q does not appear at least %i times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err) + t.Fatalf("String %q does not appear at least %d times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err) } } diff --git a/test/test_images/heartbeats/main.go b/test/test_images/heartbeats/main.go index 1ae333a8431..dea5f4adb01 100644 --- a/test/test_images/heartbeats/main.go +++ b/test/test_images/heartbeats/main.go @@ -19,7 +19,6 @@ package main import ( "context" "flag" - "fmt" "log" "os" "strconv" @@ -44,7 +43,6 @@ var ( ) func init() { - flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") flag.StringVar(&msg, "msg", "", "the data message") flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") } @@ -52,12 +50,6 @@ func init() { type envConfig struct { // Sink URL where to send heartbeat cloudevents Sink string `envconfig:"SINK"` - - // Name of this pod. - Name string `envconfig:"POD_NAME" required:"true"` - - // Namespace this pod exists in. - Namespace string `envconfig:"POD_NAMESPACE" required:"true"` } func main() { @@ -85,8 +77,7 @@ func main() { period = time.Duration(p) * time.Second } - source := types.ParseURLRef( - fmt.Sprintf("https://github.com/knative/eventing-sources/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)) + source := types.ParseURLRef("https://github.com/knative/eventing/test/test_images/heartbeats") log.Printf("Heartbeats Source: %s", source) hb := &Heartbeat{ @@ -110,6 +101,7 @@ func main() { Data: hb, } + log.Printf("sending cloudevent to %s", sink) if _, err := c.Send(context.Background(), event); err != nil { log.Printf("failed to send cloudevent: %s", err.Error()) } From c46916a3ae11bac60452001f6e3f32f1da5b9bbb Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 17:27:39 -0700 Subject: [PATCH 11/15] fix minor commment error --- test/e2e/source_cron_job_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/source_cron_job_test.go b/test/e2e/source_cron_job_test.go index f7300597c19..70936a93fa8 100644 --- a/test/e2e/source_cron_job_test.go +++ b/test/e2e/source_cron_job_test.go @@ -47,7 +47,7 @@ func TestCronJobSource(t *testing.T) { sinkOption := base.WithSinkServiceForCronJobSource(loggerPodName) client.CreateCronJobSourceOrFail(cronJobSourceName, schedule, data, sinkOption) - // wait for all test resources to be ready, so that we can start sending events + // wait for all test resources to be ready if err := client.WaitForAllTestResourcesReady(); err != nil { t.Fatalf("Failed to get all test resources ready: %v", err) } From 33602f6c7d85d674448949dcb86c696f5987d4aa Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 18:21:37 -0700 Subject: [PATCH 12/15] Update test/test_images/heartbeats/main.go Co-Authored-By: mattmoor-sockpuppet --- test/test_images/heartbeats/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_images/heartbeats/main.go b/test/test_images/heartbeats/main.go index dea5f4adb01..7e66416dbc2 100644 --- a/test/test_images/heartbeats/main.go +++ b/test/test_images/heartbeats/main.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Knative Authors +Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 07abd21b018d7731496cb01da612ce11784d60af Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 19:20:13 -0700 Subject: [PATCH 13/15] run update-codegen --- Gopkg.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index 48f9a454d37..05ea7022b6b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1420,7 +1420,9 @@ "github.com/Shopify/sarama", "github.com/bsm/sarama-cluster", "github.com/cloudevents/sdk-go", + "github.com/cloudevents/sdk-go/pkg/cloudevents", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", + "github.com/cloudevents/sdk-go/pkg/cloudevents/types", "github.com/google/go-cmp/cmp", "github.com/google/go-cmp/cmp/cmpopts", "github.com/google/uuid", From b6098dac8cbdea10f377b202cca1de032ef37769 Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 19:41:14 -0700 Subject: [PATCH 14/15] remove debug print --- test/common/validation.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/common/validation.go b/test/common/validation.go index c902c7ba8af..f375c86fc4d 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -17,7 +17,6 @@ limitations under the License. package common import ( - "fmt" "strings" "time" @@ -44,7 +43,6 @@ func (client *Client) CheckLog(podName string, checker func(string) bool) error if err != nil { return true, err } - fmt.Println(string(logs)) return checker(string(logs)), nil }) } From 23e787bb8dc2404727418b2524a72982919bdb9f Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 30 May 2019 20:13:44 -0700 Subject: [PATCH 15/15] it's now working --- test/e2e/source_container_test.go | 27 +++++++++++++++++---------- test/test_images/heartbeats/main.go | 11 ++++++++++- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/test/e2e/source_container_test.go b/test/e2e/source_container_test.go index 333c0b2695a..c42fc3b4281 100644 --- a/test/e2e/source_container_test.go +++ b/test/e2e/source_container_test.go @@ -23,6 +23,7 @@ import ( "github.com/knative/eventing/test/base" "github.com/knative/eventing/test/common" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" ) @@ -33,28 +34,34 @@ func TestContainerSource(t *testing.T) { imageName = "heartbeats" loggerPodName = "e2e-container-source-logger-pod" - - saIngressName = "e2e-container-source-ingress" - crIngressName = "eventing-broker-ingress" ) - data := fmt.Sprintf("TestContainerSource%s", uuid.NewUUID()) - // msg is an argument that is used in the heartbeats image - args := []string{"--msg=" + data} client := Setup(t, true) defer TearDown(client) - client.CreateServiceAccountAndBindingOrFail(saIngressName, crIngressName) - // create event logger pod and service loggerPod := base.EventLoggerPod(loggerPodName) client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + data := fmt.Sprintf("TestContainerSource%s", uuid.NewUUID()) + // args are the arguments passing to the container, msg is used in the heartbeats image + args := []string{"--msg=" + data} + // envVars are the environment variables passing to the container + envVars := []corev1.EnvVar{ + corev1.EnvVar{ + Name: "POD_NAME", + Value: "e2e-container-source-pod", + }, + corev1.EnvVar{ + Name: "POD_NAMESPACE", + Value: client.Namespace, + }, + } // create container source sinkOption := base.WithSinkServiceForContainerSource(loggerPodName) argsOption := base.WithArgsForContainerSource(args) - saOption := base.WithServiceAccountForContainerSource(saIngressName) - client.CreateContainerSourceOrFail(containerSourceName, imageName, argsOption, sinkOption, saOption) + envVarOption := base.WithEnvVarsForContainerSource(envVars) + client.CreateContainerSourceOrFail(containerSourceName, imageName, argsOption, sinkOption, envVarOption) // wait for all test resources to be ready if err := client.WaitForAllTestResourcesReady(); err != nil { diff --git a/test/test_images/heartbeats/main.go b/test/test_images/heartbeats/main.go index 7e66416dbc2..8c1b6d9abca 100644 --- a/test/test_images/heartbeats/main.go +++ b/test/test_images/heartbeats/main.go @@ -19,6 +19,7 @@ package main import ( "context" "flag" + "fmt" "log" "os" "strconv" @@ -43,6 +44,7 @@ var ( ) func init() { + flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") flag.StringVar(&msg, "msg", "", "the data message") flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") } @@ -50,6 +52,12 @@ func init() { type envConfig struct { // Sink URL where to send heartbeat cloudevents Sink string `envconfig:"SINK"` + + // Name of this pod. + Name string `envconfig:"POD_NAME" required:"true"` + + // Namespace this pod exists in. + Namespace string `envconfig:"POD_NAMESPACE" required:"true"` } func main() { @@ -77,7 +85,8 @@ func main() { period = time.Duration(p) * time.Second } - source := types.ParseURLRef("https://github.com/knative/eventing/test/test_images/heartbeats") + source := types.ParseURLRef( + fmt.Sprintf("https://github.com/knative/eventing/test/heartbeats/#%s/%s", env.Namespace, env.Name)) log.Printf("Heartbeats Source: %s", source) hb := &Heartbeat{