From f8754a04c332eb97141d31001b63fe5d25bb5a25 Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Fri, 5 Jun 2020 16:46:05 -0400 Subject: [PATCH 1/4] Port Channel event transformation for subscription Test to new test images --- test/e2e/helpers/channel_event_tranformation_test_helper.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/e2e/helpers/channel_event_tranformation_test_helper.go b/test/e2e/helpers/channel_event_tranformation_test_helper.go index 7bdacccc650..8bfc0146f1a 100644 --- a/test/e2e/helpers/channel_event_tranformation_test_helper.go +++ b/test/e2e/helpers/channel_event_tranformation_test_helper.go @@ -26,7 +26,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/test/lib" +<<<<<<< HEAD "knative.dev/eventing/test/lib/recordevents" +======= +>>>>>>> 785b7b17... Port Channel event transformation for subscription Test to new test images "knative.dev/eventing/test/lib/resources" ) From c2c4bc4701e3d5c128069375c7fd5e1e8f88b8d9 Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Tue, 9 Jun 2020 11:32:58 -0400 Subject: [PATCH 2/4] channel tracing test --- .../helpers/channel_tracing_test_helper.go | 37 ++++++++++--------- ...channel_event_tranformation_test_helper.go | 3 -- test/lib/event_sender.go | 37 +++++++++++++++++++ 3 files changed, 56 insertions(+), 21 deletions(-) diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 9d9c3380be2..6638e3e270c 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -24,18 +24,16 @@ import ( "testing" "time" - ce "github.com/cloudevents/sdk-go" ce2 "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" "github.com/openzipkin/zipkin-go/model" "go.opentelemetry.io/otel/api/trace" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/pkg/test/zipkin" tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" ) @@ -170,6 +168,7 @@ func setupChannelTracingWithReply( loggerPodName string, tc TracingTestCase, ) (tracinghelper.TestSpanTree, cetest.EventMatcher) { + eventSource := "sender" // Create the Channels. channelName := "ch" client.CreateChannelOrFail(channelName, channel) @@ -182,11 +181,12 @@ func setupChannelTracingWithReply( client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) // Create the subscriber, a Pod that mutates the event. - transformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{ - EventContextV1: ce.EventContextV1{ - Type: "mutated", - }, - }) + transformerPod := resources.EventTransformationPod( + "transformer", + "mutated", + eventSource, + nil, + ) client.CreatePodOrFail(transformerPod, lib.WithService(transformerPod.Name)) // Create the Subscription linking the Channel to the mutator. @@ -210,18 +210,20 @@ func setupChannelTracingWithReply( // Everything is setup to receive an event. Generate a CloudEvent. senderName := "sender" - eventID := string(uuid.NewUUID()) - body := fmt.Sprintf("TestChannelTracing %s", eventID) - event := cloudevents.New( - fmt.Sprintf(`{"msg":%q}`, body), - cloudevents.WithSource(senderName), - cloudevents.WithID(eventID), - ) + eventID := uuid.New().String() + event := ce2.NewEvent() + event.SetID(eventID) + event.SetSource(senderName) + event.SetType(lib.DefaultEventType) + body := fmt.Sprintf(`{"msg":"TestChannelTracing %s"}`, eventID) + if err := event.SetData(ce2.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } // Send the CloudEvent (either with or without tracing inside the SendEvents Pod). - sendEvent := client.SendFakeEventToAddressableOrFail + sendEvent := client.SendEventToAddressable if tc.IncomingTraceId { - sendEvent = client.SendFakeEventWithTracingToAddressableOrFail + sendEvent = client.SendEventWithTracingToAddressable } sendEvent(senderName, channelName, channel, event) @@ -332,7 +334,6 @@ func setupChannelTracingWithReply( Children: []tracinghelper.TestSpanTree{expected}, } } - matchFunc := func(ev ce2.Event) error { if ev.Source() != senderName { return fmt.Errorf("expected source %s, saw %s", senderName, ev.Source()) diff --git a/test/e2e/helpers/channel_event_tranformation_test_helper.go b/test/e2e/helpers/channel_event_tranformation_test_helper.go index 8bfc0146f1a..7bdacccc650 100644 --- a/test/e2e/helpers/channel_event_tranformation_test_helper.go +++ b/test/e2e/helpers/channel_event_tranformation_test_helper.go @@ -26,10 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/test/lib" -<<<<<<< HEAD "knative.dev/eventing/test/lib/recordevents" -======= ->>>>>>> 785b7b17... Port Channel event transformation for subscription Test to new test images "knative.dev/eventing/test/lib/resources" ) diff --git a/test/lib/event_sender.go b/test/lib/event_sender.go index a51c210d39d..ea8701ab5dc 100644 --- a/test/lib/event_sender.go +++ b/test/lib/event_sender.go @@ -42,6 +42,23 @@ func (c *Client) SendEventToAddressable( } } +// SendEventToAddressable will send the given event to the given Addressable. +func (c *Client) SendEventWithTracingToAddressable( + senderName, + addressableName string, + typemeta *metav1.TypeMeta, + event cloudevents.Event, + option ...sender.EventSenderOption, +) { + uri, err := c.GetAddressableURI(addressableName, typemeta) + if err != nil { + c.T.Fatalf("Failed to get the URI for %v-%s", typemeta, addressableName) + } + if err = c.SendEventWithTracing(senderName, uri, event, option...); err != nil { + c.T.Fatalf("Failed to send event %v with tracing to %s: %v", event, uri, err) + } +} + // SendEvent will create a sender pod, which will send the given event to the given url. func (c *Client) SendEvent( senderName string, @@ -60,3 +77,23 @@ func (c *Client) SendEvent( } return nil } + +// SendEvent will create a sender pod, which will send the given event to the given url. +func (c *Client) SendEventWithTracing( + senderName string, + uri string, + event cloudevents.Event, + option ...sender.EventSenderOption, +) error { + namespace := c.Namespace + pod, err := sender.EventSenderPod("event-sender", senderName, uri, event, option...) + sender.EnableTracing() + if err != nil { + return err + } + c.CreatePodOrFail(pod) + if err := pkgTest.WaitForPodRunning(c.Kube, senderName, namespace); err != nil { + return err + } + return nil +} From ba50b17f7efd587e6092b5a075cf3d2642caffd0 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 16 Jun 2020 09:45:06 +0200 Subject: [PATCH 3/4] Removed wrong SendEventWithTracing* Now tracing tests should just work Signed-off-by: Francesco Guardiani --- .../helpers/channel_tracing_test_helper.go | 7 ++-- test/lib/event_sender.go | 37 ------------------- test/test_images/event-sender/main.go | 27 +++++++++++--- test/test_images/transformevents/main.go | 3 +- 4 files changed, 28 insertions(+), 46 deletions(-) diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 6638e3e270c..fe3a4c7fbae 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -36,6 +36,7 @@ import ( "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/lib/resources/sender" ) // SetupInfrastructureFunc sets up the infrastructure for running tracing tests. It returns the @@ -221,11 +222,11 @@ func setupChannelTracingWithReply( } // Send the CloudEvent (either with or without tracing inside the SendEvents Pod). - sendEvent := client.SendEventToAddressable if tc.IncomingTraceId { - sendEvent = client.SendEventWithTracingToAddressable + client.SendEventToAddressable(senderName, channelName, channel, event, sender.EnableTracing()) + } else { + client.SendEventToAddressable(senderName, channelName, channel, event) } - sendEvent(senderName, channelName, channel, event) // We expect the following spans: // 1. Sending pod sends event to Channel (only if the sending pod generates a span). diff --git a/test/lib/event_sender.go b/test/lib/event_sender.go index ea8701ab5dc..a51c210d39d 100644 --- a/test/lib/event_sender.go +++ b/test/lib/event_sender.go @@ -42,23 +42,6 @@ func (c *Client) SendEventToAddressable( } } -// SendEventToAddressable will send the given event to the given Addressable. -func (c *Client) SendEventWithTracingToAddressable( - senderName, - addressableName string, - typemeta *metav1.TypeMeta, - event cloudevents.Event, - option ...sender.EventSenderOption, -) { - uri, err := c.GetAddressableURI(addressableName, typemeta) - if err != nil { - c.T.Fatalf("Failed to get the URI for %v-%s", typemeta, addressableName) - } - if err = c.SendEventWithTracing(senderName, uri, event, option...); err != nil { - c.T.Fatalf("Failed to send event %v with tracing to %s: %v", event, uri, err) - } -} - // SendEvent will create a sender pod, which will send the given event to the given url. func (c *Client) SendEvent( senderName string, @@ -77,23 +60,3 @@ func (c *Client) SendEvent( } return nil } - -// SendEvent will create a sender pod, which will send the given event to the given url. -func (c *Client) SendEventWithTracing( - senderName string, - uri string, - event cloudevents.Event, - option ...sender.EventSenderOption, -) error { - namespace := c.Namespace - pod, err := sender.EventSenderPod("event-sender", senderName, uri, event, option...) - sender.EnableTracing() - if err != nil { - return err - } - c.CreatePodOrFail(pod) - if err := pkgTest.WaitForPodRunning(c.Kube, senderName, namespace); err != nil { - return err - } - return nil -} diff --git a/test/test_images/event-sender/main.go b/test/test_images/event-sender/main.go index a093cd6e3ec..c0b7264825f 100644 --- a/test/test_images/event-sender/main.go +++ b/test/test_images/event-sender/main.go @@ -22,10 +22,13 @@ import ( "flag" "fmt" "log" + nethttp "net/http" "strconv" "time" cloudevents "github.com/cloudevents/sdk-go/v2" + "go.opencensus.io/plugin/ochttp" + "go.opencensus.io/plugin/ochttp/propagation/tracecontext" "go.uber.org/zap" "knative.dev/eventing/pkg/tracing" @@ -98,23 +101,37 @@ func main() { log.Fatalf("unsupported encoding option: %q\n", eventEncoding) } - t, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink)) + t, err := cloudevents.NewHTTP( + cloudevents.WithTarget(sink), + cloudevents.WithRoundTripper(&ochttp.Transport{ + Base: nethttp.DefaultTransport, + Propagation: &tracecontext.HTTPFormat{}, + }), + ) if err != nil { log.Fatalf("failed to create transport, %v", err) } + var c cloudevents.Client if addTracing { log.Println("Adding tracing") logger, _ := zap.NewDevelopment() if err := tracing.SetupStaticPublishing(logger.Sugar(), "", tracing.AlwaysSample); err != nil { log.Fatalf("Unable to setup trace publishing: %v", err) } + + c, err = cloudevents.NewClientObserved(t, + cloudevents.WithTimeNow(), + cloudevents.WithUUIDs(), + cloudevents.WithTracePropagation, + ) + } else { + c, err = cloudevents.NewClient(t, + cloudevents.WithTimeNow(), + cloudevents.WithUUIDs(), + ) } - c, err := cloudevents.NewClient(t, - cloudevents.WithTimeNow(), - cloudevents.WithUUIDs(), - ) if err != nil { log.Fatalf("failed to create client, %v", err) } diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go index 2554a703a5e..55e712f61a7 100644 --- a/test/test_images/transformevents/main.go +++ b/test/test_images/transformevents/main.go @@ -77,9 +77,10 @@ func main() { log.Fatalf("failed to create transport, %v", err) } - c, err := cloudevents.NewClient(t, + c, err := cloudevents.NewClientObserved(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs(), + cloudevents.WithTracePropagation, ) if err != nil { log.Fatalf("failed to create client, %v", err) From 5bd7e7d94a6c3ca094521004d89cce325dcfe435 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 16 Jun 2020 09:59:28 +0200 Subject: [PATCH 4/4] Now using recordevents Signed-off-by: Francesco Guardiani --- .../helpers/channel_tracing_test_helper.go | 67 ++++++++----------- 1 file changed, 27 insertions(+), 40 deletions(-) diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index fe3a4c7fbae..4801abfee9f 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - "strings" "testing" "time" @@ -94,7 +93,7 @@ func tracingTest( tc TracingTestCase, ) { const ( - loggerPodName = "logger" + recordEventsPodName = "recordevents" ) client := lib.Setup(t, true, setupClient) @@ -104,9 +103,18 @@ func tracingTest( // TestMain. tracinghelper.Setup(t, client) - expected, mustMatch := setupInfrastructure(t, &channel, client, loggerPodName, tc) - matches := assertEventMatch(t, client, loggerPodName, mustMatch) + // Setup the test infrastructure + expectedTestSpan, eventMatcher := setupInfrastructure(t, &channel, client, recordEventsPodName, tc) + // Start the event info store and assert the event was received correctly + targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) + if err != nil { + t.Fatalf("Pod tracker failed: %v", err) + } + defer targetTracker.Cleanup() + matches := targetTracker.AssertAtLeast(1, recordevents.MatchEvent(eventMatcher)) + + // Match the trace traceID := getTraceIDHeader(t, matches) trace, err := zipkin.JSONTracePred(traceID, 5*time.Minute, func(trace []model.SpanModel) bool { tree, err := tracinghelper.GetTraceTree(trace) @@ -114,7 +122,7 @@ func tracingTest( return false } // Do not pass t to prevent unnecessary log output. - return len(expected.MatchesSubtree(nil, tree)) > 0 + return len(expectedTestSpan.MatchesSubtree(nil, tree)) > 0 }) if err != nil { t.Logf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace)) @@ -122,24 +130,12 @@ func tracingTest( if err != nil { t.Fatal(err) } - if len(expected.MatchesSubtree(t, tree)) == 0 { - t.Fatalf("No matching subtree. want: %v got: %v", expected, tree) + if len(expectedTestSpan.MatchesSubtree(t, tree)) == 0 { + t.Fatalf("No matching subtree. want: %v got: %v", expectedTestSpan, tree) } } } -// assertEventMatch verifies that recorder pod contains at least one event that -// matches mustMatch. It is used to show that the expected event was sent to -// the logger Pod. It returns a list of the matching events. -func assertEventMatch(t *testing.T, client *lib.Client, recorderPodName string, mustMatch cetest.EventMatcher) []recordevents.EventInfo { - targetTracker, err := recordevents.NewEventInfoStore(client, recorderPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() - return targetTracker.AssertAtLeast(1, recordevents.MatchEvent(mustMatch)) -} - // getTraceIDHeader gets the TraceID from the passed in events. It returns the header from the // first matching event, but registers a fatal error if more than one traceid header is seen // in that message. @@ -166,7 +162,7 @@ func setupChannelTracingWithReply( t *testing.T, channel *metav1.TypeMeta, client *lib.Client, - loggerPodName string, + recordEventsPodName string, tc TracingTestCase, ) (tracinghelper.TestSpanTree, cetest.EventMatcher) { eventSource := "sender" @@ -178,8 +174,8 @@ func setupChannelTracingWithReply( client.CreateChannelOrFail(replyChannelName, channel) // Create the 'sink', a LogEvents Pod and a K8s Service that points to it. - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) + recordEventsPod := resources.EventRecordPod(recordEventsPodName) + client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) // Create the subscriber, a Pod that mutates the event. transformerPod := resources.EventTransformationPod( @@ -203,7 +199,7 @@ func setupChannelTracingWithReply( "reply-sub", replyChannelName, channel, - resources.WithSubscriberForSubscription(loggerPodName), + resources.WithSubscriberForSubscription(recordEventsPodName), ) // Wait for all test resources to be ready, so that we can start sending events. @@ -296,7 +292,7 @@ func setupChannelTracingWithReply( Span: tracinghelper.MatchHTTPSpanNoReply( model.Client, tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s.%s.svc.cluster.local", loggerPod.Name, client.Namespace), + fmt.Sprintf("%s.%s.svc.cluster.local", recordEventsPod.Name, client.Namespace), "/", ), ), @@ -306,10 +302,10 @@ func setupChannelTracingWithReply( Span: tracinghelper.MatchHTTPSpanNoReply( model.Server, tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s.%s.svc.cluster.local", loggerPod.Name, client.Namespace), + fmt.Sprintf("%s.%s.svc.cluster.local", recordEventsPod.Name, client.Namespace), "/", ), - tracinghelper.WithLocalEndpointServiceName(loggerPod.Name), + tracinghelper.WithLocalEndpointServiceName(recordEventsPod.Name), ), }, }, @@ -335,19 +331,10 @@ func setupChannelTracingWithReply( Children: []tracinghelper.TestSpanTree{expected}, } } - matchFunc := func(ev ce2.Event) error { - if ev.Source() != senderName { - return fmt.Errorf("expected source %s, saw %s", senderName, ev.Source()) - } - if ev.ID() != eventID { - return fmt.Errorf("expected id %s, saw %s", eventID, ev.ID()) - } - db := ev.Data() - if !strings.Contains(string(db), body) { - return fmt.Errorf("expected substring %s in %s", body, string(db)) - } - return nil - } - return expected, matchFunc + return expected, cetest.AllOf( + cetest.HasSource(senderName), + cetest.HasId(eventID), + recordevents.DataContains(body), + ) }