diff --git a/test/conformance/helpers/broker_tracing_test_helper.go b/test/conformance/helpers/broker_tracing_test_helper.go index 9b5119fd2ab..ac090125df8 100644 --- a/test/conformance/helpers/broker_tracing_test_helper.go +++ b/test/conformance/helpers/broker_tracing_test_helper.go @@ -106,7 +106,7 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc { // Create a transformer (EventTransfrmer) Pod that replies with the same event as the input, // except the reply's event's type is changed to bar. - eventTransformerPod := resources.EventTransformationPod("transformer", &cloudevents.CloudEvent{ + eventTransformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{ EventContextV1: ce.EventContextV1{ Type: etLogger, }, diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index a44a9c27e9c..aef6cb9bb43 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -185,7 +185,7 @@ func setupChannelTracingWithReply( client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) // Create the subscriber, a Pod that mutates the event. - transformerPod := resources.EventTransformationPod("transformer", &cloudevents.CloudEvent{ + transformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{ EventContextV1: ce.EventContextV1{ Type: "mutated", }, diff --git a/test/e2e/helpers/broker_channel_flow_test_helper.go b/test/e2e/helpers/broker_channel_flow_test_helper.go index b8bd3f242da..49d0d80eb67 100644 --- a/test/e2e/helpers/broker_channel_flow_test_helper.go +++ b/test/e2e/helpers/broker_channel_flow_test_helper.go @@ -78,7 +78,7 @@ func BrokerChannelFlowTestHelper(t *testing.T, ) // create the transformation service for trigger1 - transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation) + transformationPod := resources.DeprecatedEventTransformationPod(transformationPodName, eventAfterTransformation) client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName)) // create trigger1 to receive the original event, and do event transformation diff --git a/test/e2e/helpers/broker_event_transformation_test_helper.go b/test/e2e/helpers/broker_event_transformation_test_helper.go index d63783e9964..5273828221b 100644 --- a/test/e2e/helpers/broker_event_transformation_test_helper.go +++ b/test/e2e/helpers/broker_event_transformation_test_helper.go @@ -71,7 +71,7 @@ func EventTransformationForTriggerTestHelper(t *testing.T, ) // create the transformation service - transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation) + transformationPod := resources.DeprecatedEventTransformationPod(transformationPodName, eventAfterTransformation) client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName)) // create trigger1 for event transformation diff --git a/test/e2e/helpers/broker_with_config_helper.go b/test/e2e/helpers/broker_with_config_helper.go index a07a48688cb..ead00d6681a 100644 --- a/test/e2e/helpers/broker_with_config_helper.go +++ b/test/e2e/helpers/broker_with_config_helper.go @@ -76,7 +76,7 @@ func TestBrokerWithConfig(t *testing.T, ) // create the transformation service for trigger1 - transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation) + transformationPod := resources.DeprecatedEventTransformationPod(transformationPodName, eventAfterTransformation) client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName)) // create trigger1 to receive the original event, and do event transformation diff --git a/test/e2e/helpers/channel_event_tranformation_test_helper.go b/test/e2e/helpers/channel_event_tranformation_test_helper.go index a03d1b31339..e8212066fd5 100644 --- a/test/e2e/helpers/channel_event_tranformation_test_helper.go +++ b/test/e2e/helpers/channel_event_tranformation_test_helper.go @@ -20,11 +20,11 @@ import ( "fmt" "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" "knative.dev/eventing/test/lib/resources" ) @@ -34,12 +34,13 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T, options ...lib.SetupClientOption) { senderName := "e2e-eventtransformation-sender" channelNames := []string{"e2e-eventtransformation1", "e2e-eventtransformation2"} + eventSource := fmt.Sprintf("http://%s.svc/", senderName) // subscriptionNames1 corresponds to Subscriptions on channelNames[0] subscriptionNames1 := []string{"e2e-eventtransformation-subs11", "e2e-eventtransformation-subs12"} // subscriptionNames2 corresponds to Subscriptions on channelNames[1] subscriptionNames2 := []string{"e2e-eventtransformation-subs21", "e2e-eventtransformation-subs22"} transformationPodName := "e2e-eventtransformation-transformation-pod" - loggerPodName := "e2e-eventtransformation-logger-pod" + recordEventsPodName := "e2e-eventtransformation-recordevents-pod" channelTestRunner.RunTests(t, lib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) { client := lib.Setup(st, true, options...) @@ -50,17 +51,25 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T, client.WaitForResourcesReadyOrFail(&channel) // create transformation pod and service - transformedEventBody := fmt.Sprintf("eventBody %s", uuid.NewUUID()) - eventAfterTransformation := cloudevents.New( - fmt.Sprintf(`{"msg":%q}`, transformedEventBody), - cloudevents.WithSource(senderName), - ) + eventAfterTransformation := cloudevents.NewEvent() + eventAfterTransformation.SetID("dummy-transformed") + eventAfterTransformation.SetSource(eventSource) + eventAfterTransformation.SetType(lib.DefaultEventType) + transformedEventBody := fmt.Sprintf(`{"msg":"eventBody %s"}`, uuid.New().String()) + if err := eventAfterTransformation.SetData(cloudevents.ApplicationJSON, []byte(transformedEventBody)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation) client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName)) - // create logger pod and service - loggerPod := resources.EventLoggerPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) + // create event logger pod and service as the subscriber + recordEventsPod := resources.EventRecordPod(recordEventsPodName) + client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) + eventTracker, err := client.NewEventInfoStore(recordEventsPodName, t.Logf) + if err != nil { + t.Fatalf("Pod tracker failed: %v", err) + } + defer eventTracker.Cleanup() // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward the transformed events to the second channel client.CreateSubscriptionsOrFail( @@ -75,24 +84,32 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T, subscriptionNames2, channelNames[1], &channel, - resources.WithSubscriberForSubscription(loggerPodName), + resources.WithSubscriberForSubscription(recordEventsPodName), ) // wait for all test resources to be ready, so that we can start sending events client.WaitForAllTestResourcesReadyOrFail() - // send fake CloudEvent to the first channel - eventBody := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID()) - eventToSend := cloudevents.New( - fmt.Sprintf(`{"msg":%q}`, eventBody), - cloudevents.WithSource(senderName), - ) - client.SendFakeEventToAddressableOrFail(senderName, channelNames[0], &channel, eventToSend) + // send CloudEvent to the first channel + eventToSend := cloudevents.NewEvent() + eventToSend.SetID("dummy") + eventToSend.SetSource(eventSource) + eventToSend.SetType(lib.DefaultEventType) + eventBody := fmt.Sprintf(`{"msg":"TestEventTransformation %s"}`, uuid.New().String()) + if err := eventToSend.SetData(cloudevents.ApplicationJSON, []byte(eventBody)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + client.SendEventToAddressable(senderName, channelNames[0], &channel, eventToSend) // check if the logging service receives the correct number of event messages expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) - if err := client.CheckLog(loggerPodName, lib.CheckerContainsCount(transformedEventBody, expectedContentCount)); err != nil { - st.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", transformedEventBody, expectedContentCount, loggerPodName, err) - } + eventTracker.AssertWaitMatchSourceData( + t, + recordEventsPodName, + eventSource, + transformedEventBody, + expectedContentCount, + expectedContentCount, + ) }) } diff --git a/test/lib/resources/kube.go b/test/lib/resources/kube.go index 78e267439f9..89c2368b3db 100644 --- a/test/lib/resources/kube.go +++ b/test/lib/resources/kube.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" pkgTest "knative.dev/pkg/test" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "knative.dev/eventing/test/lib/cloudevents" ) @@ -131,8 +132,8 @@ func eventLoggerPod(imageName string, name string) *corev1.Pod { } } -// EventTransformationPod creates a Pod that transforms events received. -func EventTransformationPod(name string, event *cetest.CloudEvent) *corev1.Pod { +// DeprecatedEventTransformationPod creates a Pod that transforms events received. +func DeprecatedEventTransformationPod(name string, event *cetest.CloudEvent) *corev1.Pod { const imageName = "transformevents" return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -158,6 +159,34 @@ func EventTransformationPod(name string, event *cetest.CloudEvent) *corev1.Pod { } } +// EventTransformationPod creates a Pod that transforms events received receiving as arg a cloudevents sdk2 Event +// TODO(nlopezgi): remove DeprecatedEventTransformationPod above once other tests that use sdk1 and depend on this method are migrated. +func EventTransformationPod(name string, event cloudevents.Event) *corev1.Pod { + const imageName = "transformevents" + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: imageName, + Image: pkgTest.ImagePath(imageName), + ImagePullPolicy: corev1.PullAlways, + Args: []string{ + "-event-type", + event.Type(), + "-event-source", + event.Source(), + "-event-data", + string(event.Data()), + }, + }}, + RestartPolicy: corev1.RestartPolicyAlways, + }, + } +} + // HelloWorldPod creates a Pod that logs "Hello, World!". func HelloWorldPod(name string, options ...PodOption) *corev1.Pod { const imageName = "print"