diff --git a/test/e2e/helpers/channel_chain_test_helper.go b/test/e2e/helpers/channel_chain_test_helper.go index ec9f0c8591a..7f0629cbbe5 100644 --- a/test/e2e/helpers/channel_chain_test_helper.go +++ b/test/e2e/helpers/channel_chain_test_helper.go @@ -20,11 +20,11 @@ import ( "fmt" "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" "knative.dev/eventing/test/lib/resources" ) @@ -33,14 +33,15 @@ func ChannelChainTestHelper(t *testing.T, channelTestRunner lib.ChannelTestRunner, options ...lib.SetupClientOption) { const ( - senderName = "e2e-channelchain-sender" - loggerPodName = "e2e-channelchain-logger-pod" + senderName = "e2e-channelchain-sender" + recordEventsPodName = "e2e-channelchain-recordevents-pod" ) channelNames := []string{"e2e-channelchain1", "e2e-channelchain2"} // subscriptionNames1 corresponds to Subscriptions on channelNames[0] subscriptionNames1 := []string{"e2e-channelchain-subs11", "e2e-channelchain-subs12"} // subscriptionNames2 corresponds to Subscriptions on channelNames[1] subscriptionNames2 := []string{"e2e-channelchain-subs21"} + eventSource := fmt.Sprintf("http://%s.svc/", senderName) channelTestRunner.RunTests(t, lib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) { client := lib.Setup(st, true, options...) @@ -51,8 +52,13 @@ func ChannelChainTestHelper(t *testing.T, client.WaitForResourcesReadyOrFail(&channel) // create loggerPod and expose it as a service - pod := resources.EventLoggerPod(loggerPodName) - client.CreatePodOrFail(pod, lib.WithService(loggerPodName)) + recordEventsPod := resources.EventRecordPod(recordEventsPodName) + client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) + eventTracker, err := client.NewEventInfoStore(recordEventsPodName, t.Logf) + if err != nil { + t.Fatalf("Pod tracker failed: %v", err) + } + defer eventTracker.Cleanup() // create subscriptions that subscribe the first channel, and reply events directly to the second channel client.CreateSubscriptionsOrFail( @@ -66,24 +72,29 @@ func ChannelChainTestHelper(t *testing.T, subscriptionNames2, channelNames[1], &channel, - resources.WithSubscriberForSubscription(loggerPodName), + resources.WithSubscriberForSubscription(recordEventsPodName), ) // wait for all test resources to be ready, so that we can start sending events client.WaitForAllTestResourcesReadyOrFail() - // send fake CloudEvent to the first channel - body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID()) - event := cloudevents.New( - fmt.Sprintf(`{"msg":%q}`, body), - cloudevents.WithSource(senderName), - ) - client.SendFakeEventToAddressableOrFail(senderName, channelNames[0], &channel, event) + // send CloudEvent to the first channel + event := cloudevents.NewEvent() + event.SetID("dummy") + event.SetSource(eventSource) + event.SetType(lib.DefaultEventType) + + body := fmt.Sprintf(`{"msg":"TestSingleEvent %s"}`, uuid.New().String()) + if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + st.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable(senderName, channelNames[0], &channel, event) // check if the logging service receives the correct number of event messages expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) - if err := client.CheckLog(loggerPodName, lib.CheckerContainsCount(body, expectedContentCount)); err != nil { - st.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", body, expectedContentCount, loggerPodName, err) - } + + // verify the logger service receives the event + eventTracker.AssertWaitMatchSourceData(t, recordEventsPodName, eventSource, body, expectedContentCount, expectedContentCount) }) } diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go index 47723cebe93..2554a703a5e 100644 --- a/test/test_images/transformevents/main.go +++ b/test/test_images/transformevents/main.go @@ -21,10 +21,9 @@ import ( "flag" "log" - cloudevents "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go/v2" "go.uber.org/zap" - "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" ) @@ -40,37 +39,28 @@ func init() { flag.StringVar(&eventData, "event-data", "", "Cloudevent data body.") } -func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error { - ctx := event.Context.AsV1() - - dataBytes, err := event.DataBytes() - if err != nil { - log.Printf("Got Data Error: %s\n", err.Error()) - return err - } +func gotEvent(event cloudevents.Event) (*cloudevents.Event, error) { log.Println("Received a new event: ") - log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes) + log.Printf("[%s] %s %s: %s", event.Time().String(), event.Source(), event.Type(), string(event.Data())) + + outputEvent := event.Clone() if eventSource != "" { - ctx.SetSource(eventSource) + outputEvent.SetSource(eventSource) } if eventType != "" { - ctx.SetType(eventType) + outputEvent.SetType(eventType) } if eventData != "" { - dataBytes = []byte(eventData) - } - r := cloudevents.Event{ - Context: ctx, - Data: string(dataBytes), + if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(eventData)); err != nil { + return nil, err + } } - r.SetDataContentType(cloudevents.ApplicationJSON) log.Println("Transform the event to: ") - log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes) + log.Printf("[%s] %s %s: %s", outputEvent.Time().String(), outputEvent.Source(), outputEvent.Type(), string(outputEvent.Data())) - resp.RespondWith(200, &r) - return nil + return &outputEvent, nil } func main() { @@ -82,11 +72,21 @@ func main() { log.Fatalf("Unable to setup trace publishing: %v", err) } - c, err := kncloudevents.NewDefaultClient() + t, err := cloudevents.NewHTTP(cloudevents.WithPort(8080)) + if err != nil { + log.Fatalf("failed to create transport, %v", err) + } + + c, err := cloudevents.NewClient(t, + cloudevents.WithTimeNow(), + cloudevents.WithUUIDs(), + ) if err != nil { log.Fatalf("failed to create client, %v", err) } log.Printf("listening on 8080") - log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent)) + if err := c.StartReceiver(context.Background(), gotEvent); err != nil { + log.Fatalf("failed to start receiver: %s", err) + } }