Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/conformance/helpers/broker_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {

// Create a transformer (EventTransfrmer) Pod that replies with the same event as the input,
// except the reply's event's type is changed to bar.
eventTransformerPod := resources.EventTransformationPod("transformer", &cloudevents.CloudEvent{
eventTransformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{
EventContextV1: ce.EventContextV1{
Type: etLogger,
},
Expand Down
2 changes: 1 addition & 1 deletion test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func setupChannelTracingWithReply(
client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName))

// Create the subscriber, a Pod that mutates the event.
transformerPod := resources.EventTransformationPod("transformer", &cloudevents.CloudEvent{
transformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{
EventContextV1: ce.EventContextV1{
Type: "mutated",
},
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/helpers/broker_channel_flow_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BrokerChannelFlowTestHelper(t *testing.T,
)

// create the transformation service for trigger1
transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation)
transformationPod := resources.DeprecatedEventTransformationPod(transformationPodName, eventAfterTransformation)
client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName))

// create trigger1 to receive the original event, and do event transformation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func EventTransformationForTriggerTestHelper(t *testing.T,
)

// create the transformation service
transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation)
transformationPod := resources.DeprecatedEventTransformationPod(transformationPodName, eventAfterTransformation)
client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName))

// create trigger1 for event transformation
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/helpers/broker_with_config_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestBrokerWithConfig(t *testing.T,
)

// create the transformation service for trigger1
transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation)
transformationPod := resources.DeprecatedEventTransformationPod(transformationPodName, eventAfterTransformation)
client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName))

// create trigger1 to receive the original event, and do event transformation
Expand Down
61 changes: 39 additions & 22 deletions test/e2e/helpers/channel_event_tranformation_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"fmt"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/resources"
)

Expand All @@ -34,12 +34,13 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T,
options ...lib.SetupClientOption) {
senderName := "e2e-eventtransformation-sender"
channelNames := []string{"e2e-eventtransformation1", "e2e-eventtransformation2"}
eventSource := fmt.Sprintf("http://%s.svc/", senderName)
// subscriptionNames1 corresponds to Subscriptions on channelNames[0]
subscriptionNames1 := []string{"e2e-eventtransformation-subs11", "e2e-eventtransformation-subs12"}
// subscriptionNames2 corresponds to Subscriptions on channelNames[1]
subscriptionNames2 := []string{"e2e-eventtransformation-subs21", "e2e-eventtransformation-subs22"}
transformationPodName := "e2e-eventtransformation-transformation-pod"
loggerPodName := "e2e-eventtransformation-logger-pod"
recordEventsPodName := "e2e-eventtransformation-recordevents-pod"

channelTestRunner.RunTests(t, lib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) {
client := lib.Setup(st, true, options...)
Expand All @@ -50,17 +51,25 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T,
client.WaitForResourcesReadyOrFail(&channel)

// create transformation pod and service
transformedEventBody := fmt.Sprintf("eventBody %s", uuid.NewUUID())
eventAfterTransformation := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, transformedEventBody),
cloudevents.WithSource(senderName),
)
eventAfterTransformation := cloudevents.NewEvent()
eventAfterTransformation.SetID("dummy-transformed")
eventAfterTransformation.SetSource(eventSource)
eventAfterTransformation.SetType(lib.DefaultEventType)
transformedEventBody := fmt.Sprintf(`{"msg":"eventBody %s"}`, uuid.New().String())
if err := eventAfterTransformation.SetData(cloudevents.ApplicationJSON, []byte(transformedEventBody)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation)
client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName))

// create logger pod and service
loggerPod := resources.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName))
// create event logger pod and service as the subscriber
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := client.NewEventInfoStore(recordEventsPodName, t.Logf)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer eventTracker.Cleanup()

// create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward the transformed events to the second channel
client.CreateSubscriptionsOrFail(
Expand All @@ -75,24 +84,32 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T,
subscriptionNames2,
channelNames[1],
&channel,
resources.WithSubscriberForSubscription(loggerPodName),
resources.WithSubscriberForSubscription(recordEventsPodName),
)

// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()

// send fake CloudEvent to the first channel
eventBody := fmt.Sprintf("TestEventTransformation %s", uuid.NewUUID())
eventToSend := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, eventBody),
cloudevents.WithSource(senderName),
)
client.SendFakeEventToAddressableOrFail(senderName, channelNames[0], &channel, eventToSend)
// send CloudEvent to the first channel
eventToSend := cloudevents.NewEvent()
eventToSend.SetID("dummy")
eventToSend.SetSource(eventSource)
eventToSend.SetType(lib.DefaultEventType)
eventBody := fmt.Sprintf(`{"msg":"TestEventTransformation %s"}`, uuid.New().String())
if err := eventToSend.SetData(cloudevents.ApplicationJSON, []byte(eventBody)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(senderName, channelNames[0], &channel, eventToSend)

// check if the logging service receives the correct number of event messages
expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2)
if err := client.CheckLog(loggerPodName, lib.CheckerContainsCount(transformedEventBody, expectedContentCount)); err != nil {
st.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", transformedEventBody, expectedContentCount, loggerPodName, err)
}
eventTracker.AssertWaitMatchSourceData(
t,
recordEventsPodName,
eventSource,
transformedEventBody,
expectedContentCount,
expectedContentCount,
)
})
}
33 changes: 31 additions & 2 deletions test/lib/resources/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
pkgTest "knative.dev/pkg/test"

cloudevents "github.com/cloudevents/sdk-go/v2"
cetest "knative.dev/eventing/test/lib/cloudevents"
)

Expand Down Expand Up @@ -131,8 +132,8 @@ func eventLoggerPod(imageName string, name string) *corev1.Pod {
}
}

// EventTransformationPod creates a Pod that transforms events received.
func EventTransformationPod(name string, event *cetest.CloudEvent) *corev1.Pod {
// DeprecatedEventTransformationPod creates a Pod that transforms events received.
func DeprecatedEventTransformationPod(name string, event *cetest.CloudEvent) *corev1.Pod {
const imageName = "transformevents"
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -158,6 +159,34 @@ func EventTransformationPod(name string, event *cetest.CloudEvent) *corev1.Pod {
}
}

// EventTransformationPod creates a Pod that transforms events received receiving as arg a cloudevents sdk2 Event
// TODO(nlopezgi): remove DeprecatedEventTransformationPod above once other tests that use sdk1 and depend on this method are migrated.
func EventTransformationPod(name string, event cloudevents.Event) *corev1.Pod {
const imageName = "transformevents"
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"e2etest": string(uuid.NewUUID())},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: imageName,
Image: pkgTest.ImagePath(imageName),
ImagePullPolicy: corev1.PullAlways,
Args: []string{
"-event-type",
event.Type(),
"-event-source",
event.Source(),
"-event-data",
string(event.Data()),
},
}},
RestartPolicy: corev1.RestartPolicyAlways,
},
}
}

// HelloWorldPod creates a Pod that logs "Hello, World!".
func HelloWorldPod(name string, options ...PodOption) *corev1.Pod {
const imageName = "print"
Expand Down