From 96bbb491240ea2eda190fa31e4771846c46723bf Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Thu, 4 Jun 2020 16:01:29 -0400 Subject: [PATCH 1/2] port prallel e2e test to new test facilities --- test/e2e/helpers/parallel_test_helper.go | 51 +++++++++++++----------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/test/e2e/helpers/parallel_test_helper.go b/test/e2e/helpers/parallel_test_helper.go index 470481de5bb..24c1d12faa3 100644 --- a/test/e2e/helpers/parallel_test_helper.go +++ b/test/e2e/helpers/parallel_test_helper.go @@ -16,18 +16,17 @@ limitations under the License. package helpers import ( - "encoding/json" "fmt" "testing" + "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" + cloudevents "github.com/cloudevents/sdk-go/v2" "knative.dev/eventing/pkg/apis/flows/v1beta1" messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" eventingtesting "knative.dev/eventing/pkg/reconciler/testing/v1beta1" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" "knative.dev/eventing/test/lib/resources" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -87,10 +86,15 @@ func ParallelTestHelper(t *testing.T, TypeMeta: channel, } - // create logger service for global reply - loggerPodName := fmt.Sprintf("%s-logger", tc.name) - loggerPod := resources.EventLoggerPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) + // create event logger pod and service + eventRecorder := fmt.Sprintf("%s-eventRecord", tc.name) + eventRecordPod := resources.EventRecordPod(eventRecorder) + client.CreatePodOrFail(eventRecordPod, lib.WithService(eventRecorder)) + eventTracker, err := client.NewEventInfoStore(eventRecorder, t.Logf) + if err != nil { + t.Fatalf("Pod tracker failed: %v", err) + } + defer eventTracker.Cleanup() // create channel as reply of the Parallel // TODO(chizhg): now we'll have to use a channel plus its subscription here, as reply of the Subscription @@ -102,7 +106,7 @@ func ParallelTestHelper(t *testing.T, replySubscriptionName, replyChannelName, &channel, - resources.WithSubscriberForSubscription(loggerPodName), + resources.WithSubscriberForSubscription(eventRecorder), ) parallel := eventingtesting.NewFlowsParallel(tc.name, client.Namespace, @@ -114,28 +118,27 @@ func ParallelTestHelper(t *testing.T, client.WaitForAllTestResourcesReadyOrFail() - // send fake CloudEvent to the Parallel - msg := fmt.Sprintf("TestFlowParallel %s - ", uuid.NewUUID()) - // NOTE: the eventData format must be BaseData, as it needs to be correctly parsed in the stepper service. - eventData := cloudevents.BaseData{Message: msg} - eventDataBytes, err := json.Marshal(eventData) - if err != nil { - st.Fatalf("Failed to convert %v to json: %v", eventData, err) + // send CloudEvent to the Parallel + event := cloudevents.NewEvent() + event.SetID("dummy") + + eventSource := fmt.Sprintf("http://%s.svc/", senderPodName) + event.SetSource(eventSource) + event.SetType(lib.DefaultEventType) + body := fmt.Sprintf(`{"msg":"TestFlowParallel %s"}`, uuid.New().String()) + if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + st.Fatalf("Cannot set the payload of the event: %s", err.Error()) } - event := cloudevents.New( - string(eventDataBytes), - cloudevents.WithSource(senderPodName), - ) - client.SendFakeEventToAddressableOrFail( + + client.SendEventToAddressable( senderPodName, tc.name, lib.FlowsParallelTypeMeta, - event) + event, + ) // verify the logger service receives the correct transformed event - if err := client.CheckLog(loggerPodName, lib.CheckerContains(tc.expected)); err != nil { - st.Fatalf("String %q not found in logs of logger pod %q: %v", tc.expected, loggerPodName, err) - } + eventTracker.AssertWaitMatchSourceData(t, eventRecorder, eventSource, body, 1, 1) } }) } From e2170b8915945d6f00bc875db7dc576a41ec31be Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Fri, 5 Jun 2020 09:47:58 -0400 Subject: [PATCH 2/2] fix details --- test/e2e/helpers/parallel_test_helper.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/e2e/helpers/parallel_test_helper.go b/test/e2e/helpers/parallel_test_helper.go index 24c1d12faa3..ed25ad176d8 100644 --- a/test/e2e/helpers/parallel_test_helper.go +++ b/test/e2e/helpers/parallel_test_helper.go @@ -87,7 +87,7 @@ func ParallelTestHelper(t *testing.T, } // create event logger pod and service - eventRecorder := fmt.Sprintf("%s-eventRecord", tc.name) + eventRecorder := fmt.Sprintf("%s-event-record-pod", tc.name) eventRecordPod := resources.EventRecordPod(eventRecorder) client.CreatePodOrFail(eventRecordPod, lib.WithService(eventRecorder)) eventTracker, err := client.NewEventInfoStore(eventRecorder, t.Logf) @@ -134,11 +134,10 @@ func ParallelTestHelper(t *testing.T, senderPodName, tc.name, lib.FlowsParallelTypeMeta, - event, - ) + event) // verify the logger service receives the correct transformed event - eventTracker.AssertWaitMatchSourceData(t, eventRecorder, eventSource, body, 1, 1) + eventTracker.AssertWaitMatchSourceData(t, eventRecorder, eventSource, tc.expected, 1, 1) } }) }