diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 9d9c3380be2..4801abfee9f 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -20,24 +20,22 @@ import ( "context" "fmt" "net/http" - "strings" "testing" "time" - ce "github.com/cloudevents/sdk-go" ce2 "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" "github.com/openzipkin/zipkin-go/model" "go.opentelemetry.io/otel/api/trace" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/pkg/test/zipkin" tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/lib/resources/sender" ) // SetupInfrastructureFunc sets up the infrastructure for running tracing tests. It returns the @@ -95,7 +93,7 @@ func tracingTest( tc TracingTestCase, ) { const ( - loggerPodName = "logger" + recordEventsPodName = "recordevents" ) client := lib.Setup(t, true, setupClient) @@ -105,9 +103,18 @@ func tracingTest( // TestMain. tracinghelper.Setup(t, client) - expected, mustMatch := setupInfrastructure(t, &channel, client, loggerPodName, tc) - matches := assertEventMatch(t, client, loggerPodName, mustMatch) + // Setup the test infrastructure + expectedTestSpan, eventMatcher := setupInfrastructure(t, &channel, client, recordEventsPodName, tc) + // Start the event info store and assert the event was received correctly + targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) + if err != nil { + t.Fatalf("Pod tracker failed: %v", err) + } + defer targetTracker.Cleanup() + matches := targetTracker.AssertAtLeast(1, recordevents.MatchEvent(eventMatcher)) + + // Match the trace traceID := getTraceIDHeader(t, matches) trace, err := zipkin.JSONTracePred(traceID, 5*time.Minute, func(trace []model.SpanModel) bool { tree, err := tracinghelper.GetTraceTree(trace) @@ -115,7 +122,7 @@ func tracingTest( return false } // Do not pass t to prevent unnecessary log output. - return len(expected.MatchesSubtree(nil, tree)) > 0 + return len(expectedTestSpan.MatchesSubtree(nil, tree)) > 0 }) if err != nil { t.Logf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace)) @@ -123,24 +130,12 @@ func tracingTest( if err != nil { t.Fatal(err) } - if len(expected.MatchesSubtree(t, tree)) == 0 { - t.Fatalf("No matching subtree. want: %v got: %v", expected, tree) + if len(expectedTestSpan.MatchesSubtree(t, tree)) == 0 { + t.Fatalf("No matching subtree. want: %v got: %v", expectedTestSpan, tree) } } } -// assertEventMatch verifies that recorder pod contains at least one event that -// matches mustMatch. It is used to show that the expected event was sent to -// the logger Pod. It returns a list of the matching events. -func assertEventMatch(t *testing.T, client *lib.Client, recorderPodName string, mustMatch cetest.EventMatcher) []recordevents.EventInfo { - targetTracker, err := recordevents.NewEventInfoStore(client, recorderPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() - return targetTracker.AssertAtLeast(1, recordevents.MatchEvent(mustMatch)) -} - // getTraceIDHeader gets the TraceID from the passed in events. It returns the header from the // first matching event, but registers a fatal error if more than one traceid header is seen // in that message. @@ -167,9 +162,10 @@ func setupChannelTracingWithReply( t *testing.T, channel *metav1.TypeMeta, client *lib.Client, - loggerPodName string, + recordEventsPodName string, tc TracingTestCase, ) (tracinghelper.TestSpanTree, cetest.EventMatcher) { + eventSource := "sender" // Create the Channels. channelName := "ch" client.CreateChannelOrFail(channelName, channel) @@ -178,15 +174,16 @@ func setupChannelTracingWithReply( client.CreateChannelOrFail(replyChannelName, channel) // Create the 'sink', a LogEvents Pod and a K8s Service that points to it. - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) + recordEventsPod := resources.EventRecordPod(recordEventsPodName) + client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) // Create the subscriber, a Pod that mutates the event. - transformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{ - EventContextV1: ce.EventContextV1{ - Type: "mutated", - }, - }) + transformerPod := resources.EventTransformationPod( + "transformer", + "mutated", + eventSource, + nil, + ) client.CreatePodOrFail(transformerPod, lib.WithService(transformerPod.Name)) // Create the Subscription linking the Channel to the mutator. @@ -202,7 +199,7 @@ func setupChannelTracingWithReply( "reply-sub", replyChannelName, channel, - resources.WithSubscriberForSubscription(loggerPodName), + resources.WithSubscriberForSubscription(recordEventsPodName), ) // Wait for all test resources to be ready, so that we can start sending events. @@ -210,20 +207,22 @@ func setupChannelTracingWithReply( // Everything is setup to receive an event. Generate a CloudEvent. senderName := "sender" - eventID := string(uuid.NewUUID()) - body := fmt.Sprintf("TestChannelTracing %s", eventID) - event := cloudevents.New( - fmt.Sprintf(`{"msg":%q}`, body), - cloudevents.WithSource(senderName), - cloudevents.WithID(eventID), - ) + eventID := uuid.New().String() + event := ce2.NewEvent() + event.SetID(eventID) + event.SetSource(senderName) + event.SetType(lib.DefaultEventType) + body := fmt.Sprintf(`{"msg":"TestChannelTracing %s"}`, eventID) + if err := event.SetData(ce2.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } // Send the CloudEvent (either with or without tracing inside the SendEvents Pod). - sendEvent := client.SendFakeEventToAddressableOrFail if tc.IncomingTraceId { - sendEvent = client.SendFakeEventWithTracingToAddressableOrFail + client.SendEventToAddressable(senderName, channelName, channel, event, sender.EnableTracing()) + } else { + client.SendEventToAddressable(senderName, channelName, channel, event) } - sendEvent(senderName, channelName, channel, event) // We expect the following spans: // 1. Sending pod sends event to Channel (only if the sending pod generates a span). @@ -293,7 +292,7 @@ func setupChannelTracingWithReply( Span: tracinghelper.MatchHTTPSpanNoReply( model.Client, tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s.%s.svc.cluster.local", loggerPod.Name, client.Namespace), + fmt.Sprintf("%s.%s.svc.cluster.local", recordEventsPod.Name, client.Namespace), "/", ), ), @@ -303,10 +302,10 @@ func setupChannelTracingWithReply( Span: tracinghelper.MatchHTTPSpanNoReply( model.Server, tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s.%s.svc.cluster.local", loggerPod.Name, client.Namespace), + fmt.Sprintf("%s.%s.svc.cluster.local", recordEventsPod.Name, client.Namespace), "/", ), - tracinghelper.WithLocalEndpointServiceName(loggerPod.Name), + tracinghelper.WithLocalEndpointServiceName(recordEventsPod.Name), ), }, }, @@ -333,19 +332,9 @@ func setupChannelTracingWithReply( } } - matchFunc := func(ev ce2.Event) error { - if ev.Source() != senderName { - return fmt.Errorf("expected source %s, saw %s", senderName, ev.Source()) - } - if ev.ID() != eventID { - return fmt.Errorf("expected id %s, saw %s", eventID, ev.ID()) - } - db := ev.Data() - if !strings.Contains(string(db), body) { - return fmt.Errorf("expected substring %s in %s", body, string(db)) - } - return nil - } - - return expected, matchFunc + return expected, cetest.AllOf( + cetest.HasSource(senderName), + cetest.HasId(eventID), + recordevents.DataContains(body), + ) } diff --git a/test/test_images/event-sender/main.go b/test/test_images/event-sender/main.go index a093cd6e3ec..c0b7264825f 100644 --- a/test/test_images/event-sender/main.go +++ b/test/test_images/event-sender/main.go @@ -22,10 +22,13 @@ import ( "flag" "fmt" "log" + nethttp "net/http" "strconv" "time" cloudevents "github.com/cloudevents/sdk-go/v2" + "go.opencensus.io/plugin/ochttp" + "go.opencensus.io/plugin/ochttp/propagation/tracecontext" "go.uber.org/zap" "knative.dev/eventing/pkg/tracing" @@ -98,23 +101,37 @@ func main() { log.Fatalf("unsupported encoding option: %q\n", eventEncoding) } - t, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink)) + t, err := cloudevents.NewHTTP( + cloudevents.WithTarget(sink), + cloudevents.WithRoundTripper(&ochttp.Transport{ + Base: nethttp.DefaultTransport, + Propagation: &tracecontext.HTTPFormat{}, + }), + ) if err != nil { log.Fatalf("failed to create transport, %v", err) } + var c cloudevents.Client if addTracing { log.Println("Adding tracing") logger, _ := zap.NewDevelopment() if err := tracing.SetupStaticPublishing(logger.Sugar(), "", tracing.AlwaysSample); err != nil { log.Fatalf("Unable to setup trace publishing: %v", err) } + + c, err = cloudevents.NewClientObserved(t, + cloudevents.WithTimeNow(), + cloudevents.WithUUIDs(), + cloudevents.WithTracePropagation, + ) + } else { + c, err = cloudevents.NewClient(t, + cloudevents.WithTimeNow(), + cloudevents.WithUUIDs(), + ) } - c, err := cloudevents.NewClient(t, - cloudevents.WithTimeNow(), - cloudevents.WithUUIDs(), - ) if err != nil { log.Fatalf("failed to create client, %v", err) } diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go index 2554a703a5e..55e712f61a7 100644 --- a/test/test_images/transformevents/main.go +++ b/test/test_images/transformevents/main.go @@ -77,9 +77,10 @@ func main() { log.Fatalf("failed to create transport, %v", err) } - c, err := cloudevents.NewClient(t, + c, err := cloudevents.NewClientObserved(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs(), + cloudevents.WithTracePropagation, ) if err != nil { log.Fatalf("failed to create client, %v", err)