diff --git a/test/conformance/helpers/broker_tracing_test_helper.go b/test/conformance/helpers/broker_tracing_test_helper.go index f034c3ce576..848b63917d3 100644 --- a/test/conformance/helpers/broker_tracing_test_helper.go +++ b/test/conformance/helpers/broker_tracing_test_helper.go @@ -18,22 +18,20 @@ package helpers import ( "fmt" - "strings" "testing" - ce "github.com/cloudevents/sdk-go" - ce2 "github.com/cloudevents/sdk-go/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/openzipkin/zipkin-go/model" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/eventing/pkg/apis/eventing/v1beta1" "knative.dev/eventing/pkg/utils" tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/lib/resources/sender" ) // BrokerTracingTestHelperWithChannelTestRunner runs the Broker tracing tests for all Channels in @@ -45,25 +43,10 @@ func BrokerTracingTestHelperWithChannelTestRunner( setupClient lib.SetupClientOption, ) { channelTestRunner.RunTests(t, lib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) { - BrokerTracingTestHelper(t, brokerClass, channel, setupClient) + tracingTest(t, setupClient, setupBrokerTracing(brokerClass), channel) }) } -// BrokerTracingTestHelper runs the Broker tracing test using the given TypeMeta. -func BrokerTracingTestHelper(t *testing.T, brokerClass string, channel metav1.TypeMeta, setupClient lib.SetupClientOption) { - testCases := map[string]TracingTestCase{ - "includes incoming trace id": { - IncomingTraceId: true, - }, - } - - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - tracingTest(t, setupClient, setupBrokerTracing(brokerClass), channel, tc) - }) - } -} - // setupBrokerTracing is the general setup for TestBrokerTracing. It creates the following: // 1. Broker. // 2. Trigger on 'foo' events -> K8s Service -> transformer Pod (which replies with a 'bar' event). @@ -71,18 +54,20 @@ func BrokerTracingTestHelper(t *testing.T, brokerClass string, channel metav1.Ty // 4. Sender Pod which sends a 'foo' event. // It returns a string that is expected to be sent by the SendEvents Pod and should be present in // the LogEvents Pod logs. -func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc { +func setupBrokerTracing(brokerClass string) SetupTracingTestInfrastructureFunc { const ( - etTransformer = "transformer" - etLogger = "logger" - defaultCMPName = "eventing" + etTransformer = "transformer" + etLogger = "logger" + senderName = "sender" + eventID = "event-1" + eventBody = `{"msg":"TestBrokerTracing event-1"}` ) return func( t *testing.T, channel *metav1.TypeMeta, client *lib.Client, loggerPodName string, - tc TracingTestCase, + senderPublishTrace bool, ) (tracinghelper.TestSpanTree, cetest.EventMatcher) { // Create a configmap used by the broker. client.CreateBrokerConfigMapOrFail("br", channel) @@ -107,11 +92,12 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc { // Create a transformer (EventTransfrmer) Pod that replies with the same event as the input, // except the reply's event's type is changed to bar. - eventTransformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{ - EventContextV1: ce.EventContextV1{ - Type: etLogger, - }, - }) + eventTransformerPod := resources.EventTransformationPod( + "transformer", + etLogger, + senderName, + []byte(eventBody), + ) client.CreatePodOrFail(eventTransformerPod, lib.WithService(eventTransformerPod.Name)) // Create a Trigger that receives events (type=foo) and sends them to the transformer Pod. @@ -126,22 +112,20 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc { client.WaitForAllTestResourcesReadyOrFail() // Everything is setup to receive an event. Generate a CloudEvent. - senderName := "sender" - eventID := string(uuid.NewUUID()) - body := fmt.Sprintf("TestBrokerTracing %s", eventID) - event := cloudevents.New( - fmt.Sprintf(`{"msg":%q}`, body), - cloudevents.WithSource(senderName), - cloudevents.WithID(eventID), - cloudevents.WithType(etTransformer), - ) + event := cloudevents.NewEvent() + event.SetID(eventID) + event.SetSource(senderName) + event.SetType(etTransformer) + if err := event.SetData(cloudevents.ApplicationJSON, []byte(eventBody)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } // Send the CloudEvent (either with or without tracing inside the SendEvents Pod). - sendEvent := client.SendFakeEventToAddressableOrFail - if tc.IncomingTraceId { - sendEvent = client.SendFakeEventWithTracingToAddressableOrFail + if senderPublishTrace { + client.SendEventToAddressable(senderName, broker.Name, lib.BrokerTypeMeta, event, sender.EnableTracing()) + } else { + client.SendEventToAddressable(senderName, broker.Name, lib.BrokerTypeMeta, event) } - sendEvent(senderName, broker.Name, lib.BrokerTypeMeta, event) domain := utils.GetClusterDomainName() @@ -207,7 +191,7 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc { }, } - if tc.IncomingTraceId { + if senderPublishTrace { expected = tracinghelper.TestSpanTree{ Note: "1. Send pod sends event to the Broker Ingress (only if the sending pod generates a span).", Span: tracinghelper.MatchHTTPSpanNoReply( @@ -217,21 +201,12 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc { Children: []tracinghelper.TestSpanTree{expected}, } } - matchFunc := func(ev ce2.Event) error { - if ev.Source() != senderName { - return fmt.Errorf("expected source %s, saw %s", senderName, ev.Source()) - } - if ev.ID() != eventID { - return fmt.Errorf("expected id %s, saw %s", eventID, ev.ID()) - } - db := ev.Data() - if !strings.Contains(string(db), body) { - return fmt.Errorf("expected substring %s in data %s", body, string(db)) - } - return nil - } - return expected, matchFunc + return expected, cetest.AllOf( + cetest.HasSource(senderName), + cetest.HasId(eventID), + recordevents.DataContains(eventBody), + ) } } diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 4801abfee9f..1cdd6d7936d 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -17,19 +17,14 @@ limitations under the License. package helpers import ( - "context" "fmt" - "net/http" "testing" - "time" - ce2 "github.com/cloudevents/sdk-go/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" "github.com/openzipkin/zipkin-go/model" - "go.opentelemetry.io/otel/api/trace" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/pkg/test/zipkin" tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" "knative.dev/eventing/test/lib" @@ -38,26 +33,6 @@ import ( "knative.dev/eventing/test/lib/resources/sender" ) -// SetupInfrastructureFunc sets up the infrastructure for running tracing tests. It returns the -// expected trace as well as a string that is expected to be in the logger Pod's logs. -type SetupInfrastructureFunc func( - t *testing.T, - channel *metav1.TypeMeta, - client *lib.Client, - loggerPodName string, - tc TracingTestCase, -) (tracinghelper.TestSpanTree, cetest.EventMatcher) - -// TracingTestCase is the test case information for tracing tests. -type TracingTestCase struct { - // IncomingTraceId controls whether the original request is sent to the Broker/Channel already - // has a trace ID associated with it by the sender. - IncomingTraceId bool - // Istio controls whether the Pods being created for the test (sender, transformer, logger, - // etc.) have Istio sidecars. It does not affect the Channel Pods. - Istio bool -} - // ChannelTracingTestHelperWithChannelTestRunner runs the Channel tracing tests for all Channels in // the ChannelTestRunner. func ChannelTracingTestHelperWithChannelTestRunner( @@ -66,92 +41,10 @@ func ChannelTracingTestHelperWithChannelTestRunner( setupClient lib.SetupClientOption, ) { channelTestRunner.RunTests(t, lib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) { - ChannelTracingTestHelper(t, channel, setupClient) + tracingTest(t, setupClient, setupChannelTracingWithReply, channel) }) } -// ChannelTracingTestHelper runs the Channel tracing test using the given TypeMeta. -func ChannelTracingTestHelper(t *testing.T, channel metav1.TypeMeta, setupClient lib.SetupClientOption) { - testCases := map[string]TracingTestCase{ - "includes incoming trace id": { - IncomingTraceId: true, - }, - } - - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - tracingTest(t, setupClient, setupChannelTracingWithReply, channel, tc) - }) - } -} - -func tracingTest( - t *testing.T, - setupClient lib.SetupClientOption, - setupInfrastructure SetupInfrastructureFunc, - channel metav1.TypeMeta, - tc TracingTestCase, -) { - const ( - recordEventsPodName = "recordevents" - ) - - client := lib.Setup(t, true, setupClient) - defer lib.TearDown(client) - - // Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in - // TestMain. - tracinghelper.Setup(t, client) - - // Setup the test infrastructure - expectedTestSpan, eventMatcher := setupInfrastructure(t, &channel, client, recordEventsPodName, tc) - - // Start the event info store and assert the event was received correctly - targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() - matches := targetTracker.AssertAtLeast(1, recordevents.MatchEvent(eventMatcher)) - - // Match the trace - traceID := getTraceIDHeader(t, matches) - trace, err := zipkin.JSONTracePred(traceID, 5*time.Minute, func(trace []model.SpanModel) bool { - tree, err := tracinghelper.GetTraceTree(trace) - if err != nil { - return false - } - // Do not pass t to prevent unnecessary log output. - return len(expectedTestSpan.MatchesSubtree(nil, tree)) > 0 - }) - if err != nil { - t.Logf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace)) - tree, err := tracinghelper.GetTraceTree(trace) - if err != nil { - t.Fatal(err) - } - if len(expectedTestSpan.MatchesSubtree(t, tree)) == 0 { - t.Fatalf("No matching subtree. want: %v got: %v", expectedTestSpan, tree) - } - } -} - -// getTraceIDHeader gets the TraceID from the passed in events. It returns the header from the -// first matching event, but registers a fatal error if more than one traceid header is seen -// in that message. -func getTraceIDHeader(t *testing.T, evInfos []recordevents.EventInfo) string { - for i := range evInfos { - if nil != evInfos[i].HTTPHeaders { - sc := trace.RemoteSpanContextFromContext(trace.DefaultHTTPPropagator().Extract(context.TODO(), http.Header(evInfos[i].HTTPHeaders))) - if sc.HasTraceID() { - return sc.TraceIDString() - } - } - } - t.Fatalf("FAIL: No traceid in %d messages: (%s)", len(evInfos), evInfos) - return "" -} - // setupChannelTracing is the general setup for TestChannelTracing. It creates the following: // SendEvents (Pod) -> Channel -> Subscription -> K8s Service -> Mutate (Pod) // v @@ -163,7 +56,7 @@ func setupChannelTracingWithReply( channel *metav1.TypeMeta, client *lib.Client, recordEventsPodName string, - tc TracingTestCase, + senderPublishTrace bool, ) (tracinghelper.TestSpanTree, cetest.EventMatcher) { eventSource := "sender" // Create the Channels. @@ -208,17 +101,17 @@ func setupChannelTracingWithReply( // Everything is setup to receive an event. Generate a CloudEvent. senderName := "sender" eventID := uuid.New().String() - event := ce2.NewEvent() + event := cloudevents.NewEvent() event.SetID(eventID) event.SetSource(senderName) event.SetType(lib.DefaultEventType) body := fmt.Sprintf(`{"msg":"TestChannelTracing %s"}`, eventID) - if err := event.SetData(ce2.ApplicationJSON, []byte(body)); err != nil { + if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { t.Fatalf("Cannot set the payload of the event: %s", err.Error()) } // Send the CloudEvent (either with or without tracing inside the SendEvents Pod). - if tc.IncomingTraceId { + if senderPublishTrace { client.SendEventToAddressable(senderName, channelName, channel, event, sender.EnableTracing()) } else { client.SendEventToAddressable(senderName, channelName, channel, event) @@ -317,7 +210,7 @@ func setupChannelTracingWithReply( }, } - if tc.IncomingTraceId { + if senderPublishTrace { expected = tracinghelper.TestSpanTree{ // 1. Sending pod sends event to Channel (only if the sending pod generates a span). Span: tracinghelper.MatchHTTPSpanNoReply( diff --git a/test/conformance/helpers/tracing_test_helper.go b/test/conformance/helpers/tracing_test_helper.go new file mode 100644 index 00000000000..6bda47e5be5 --- /dev/null +++ b/test/conformance/helpers/tracing_test_helper.go @@ -0,0 +1,111 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helpers + +import ( + "context" + "net/http" + "testing" + "time" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/openzipkin/zipkin-go/model" + "go.opentelemetry.io/otel/api/trace" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/test/zipkin" + + tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" + "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" +) + +// SetupTracingTestInfrastructureFunc sets up the infrastructure for running tracing tests. It returns the +// expected trace as well as a string that is expected to be in the logger Pod's logs. +type SetupTracingTestInfrastructureFunc func( + t *testing.T, + channel *metav1.TypeMeta, + client *lib.Client, + loggerPodName string, + senderPublishTrace bool, +) (tracinghelper.TestSpanTree, cetest.EventMatcher) + +// tracingTest bootstraps the test and then executes the assertions on the received event and on the spans +func tracingTest( + t *testing.T, + setupClient lib.SetupClientOption, + setupInfrastructure SetupTracingTestInfrastructureFunc, + channel metav1.TypeMeta, +) { + const ( + recordEventsPodName = "recordevents" + ) + + client := lib.Setup(t, true, setupClient) + defer lib.TearDown(client) + + // Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in + // TestMain. + tracinghelper.Setup(t, client) + + // Setup the test infrastructure + expectedTestSpan, eventMatcher := setupInfrastructure(t, &channel, client, recordEventsPodName, true) + + // Start the event info store and assert the event was received correctly + targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) + if err != nil { + t.Fatalf("Pod tracker failed: %v", err) + } + defer targetTracker.Cleanup() + matches := targetTracker.AssertAtLeast(1, recordevents.MatchEvent(eventMatcher)) + + // Match the trace + traceID := getTraceIDHeader(t, matches) + trace, err := zipkin.JSONTracePred(traceID, 5*time.Minute, func(trace []model.SpanModel) bool { + tree, err := tracinghelper.GetTraceTree(trace) + if err != nil { + return false + } + // Do not pass t to prevent unnecessary log output. + return len(expectedTestSpan.MatchesSubtree(nil, tree)) > 0 + }) + if err != nil { + t.Logf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace)) + tree, err := tracinghelper.GetTraceTree(trace) + if err != nil { + t.Fatal(err) + } + if len(expectedTestSpan.MatchesSubtree(t, tree)) == 0 { + t.Fatalf("No matching subtree. want: %v got: %v", expectedTestSpan, tree) + } + } +} + +// getTraceIDHeader gets the TraceID from the passed in events. It returns the header from the +// first matching event, but registers a fatal error if more than one traceid header is seen +// in that message. +func getTraceIDHeader(t *testing.T, evInfos []recordevents.EventInfo) string { + for i := range evInfos { + if nil != evInfos[i].HTTPHeaders { + sc := trace.RemoteSpanContextFromContext(trace.DefaultHTTPPropagator().Extract(context.TODO(), http.Header(evInfos[i].HTTPHeaders))) + if sc.HasTraceID() { + return sc.TraceIDString() + } + } + } + t.Fatalf("FAIL: No traceid in %d messages: (%s)", len(evInfos), evInfos) + return "" +}