diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 9d9c3380be2..6638e3e270c 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -24,18 +24,16 @@ import ( "testing" "time" - ce "github.com/cloudevents/sdk-go" ce2 "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" "github.com/openzipkin/zipkin-go/model" "go.opentelemetry.io/otel/api/trace" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/pkg/test/zipkin" tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" ) @@ -170,6 +168,7 @@ func setupChannelTracingWithReply( loggerPodName string, tc TracingTestCase, ) (tracinghelper.TestSpanTree, cetest.EventMatcher) { + eventSource := "sender" // Create the Channels. channelName := "ch" client.CreateChannelOrFail(channelName, channel) @@ -182,11 +181,12 @@ func setupChannelTracingWithReply( client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) // Create the subscriber, a Pod that mutates the event. - transformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{ - EventContextV1: ce.EventContextV1{ - Type: "mutated", - }, - }) + transformerPod := resources.EventTransformationPod( + "transformer", + "mutated", + eventSource, + nil, + ) client.CreatePodOrFail(transformerPod, lib.WithService(transformerPod.Name)) // Create the Subscription linking the Channel to the mutator. @@ -210,18 +210,20 @@ func setupChannelTracingWithReply( // Everything is setup to receive an event. Generate a CloudEvent. senderName := "sender" - eventID := string(uuid.NewUUID()) - body := fmt.Sprintf("TestChannelTracing %s", eventID) - event := cloudevents.New( - fmt.Sprintf(`{"msg":%q}`, body), - cloudevents.WithSource(senderName), - cloudevents.WithID(eventID), - ) + eventID := uuid.New().String() + event := ce2.NewEvent() + event.SetID(eventID) + event.SetSource(senderName) + event.SetType(lib.DefaultEventType) + body := fmt.Sprintf(`{"msg":"TestChannelTracing %s"}`, eventID) + if err := event.SetData(ce2.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } // Send the CloudEvent (either with or without tracing inside the SendEvents Pod). - sendEvent := client.SendFakeEventToAddressableOrFail + sendEvent := client.SendEventToAddressable if tc.IncomingTraceId { - sendEvent = client.SendFakeEventWithTracingToAddressableOrFail + sendEvent = client.SendEventWithTracingToAddressable } sendEvent(senderName, channelName, channel, event) @@ -332,7 +334,6 @@ func setupChannelTracingWithReply( Children: []tracinghelper.TestSpanTree{expected}, } } - matchFunc := func(ev ce2.Event) error { if ev.Source() != senderName { return fmt.Errorf("expected source %s, saw %s", senderName, ev.Source()) diff --git a/test/lib/event_sender.go b/test/lib/event_sender.go index a51c210d39d..ea8701ab5dc 100644 --- a/test/lib/event_sender.go +++ b/test/lib/event_sender.go @@ -42,6 +42,23 @@ func (c *Client) SendEventToAddressable( } } +// SendEventToAddressable will send the given event to the given Addressable. +func (c *Client) SendEventWithTracingToAddressable( + senderName, + addressableName string, + typemeta *metav1.TypeMeta, + event cloudevents.Event, + option ...sender.EventSenderOption, +) { + uri, err := c.GetAddressableURI(addressableName, typemeta) + if err != nil { + c.T.Fatalf("Failed to get the URI for %v-%s", typemeta, addressableName) + } + if err = c.SendEventWithTracing(senderName, uri, event, option...); err != nil { + c.T.Fatalf("Failed to send event %v with tracing to %s: %v", event, uri, err) + } +} + // SendEvent will create a sender pod, which will send the given event to the given url. func (c *Client) SendEvent( senderName string, @@ -60,3 +77,23 @@ func (c *Client) SendEvent( } return nil } + +// SendEvent will create a sender pod, which will send the given event to the given url. +func (c *Client) SendEventWithTracing( + senderName string, + uri string, + event cloudevents.Event, + option ...sender.EventSenderOption, +) error { + namespace := c.Namespace + pod, err := sender.EventSenderPod("event-sender", senderName, uri, event, option...) + sender.EnableTracing() + if err != nil { + return err + } + c.CreatePodOrFail(pod) + if err := pkgTest.WaitForPodRunning(c.Kube, senderName, namespace); err != nil { + return err + } + return nil +}