From c84b2a4be3b2326b44d1f833a9002e0b4443a5b3 Mon Sep 17 00:00:00 2001 From: Sameer Vohra Date: Mon, 14 Dec 2020 19:06:20 -0600 Subject: [PATCH 1/4] [#4578] Update imc-dispatcher - trace should include Open Telemetry Spec attributes (messaging.*) messaging.destination messaging.message_id messaging.protocol messaging.system --- pkg/channel/message_dispatcher.go | 3 ++- .../populate_span_transformer.go | 26 ++++++++++++------- .../populate_span_transformer_test.go | 12 ++++++--- 3 files changed, 27 insertions(+), 14 deletions(-) rename pkg/{kncloudevents => tracing}/populate_span_transformer.go (82%) rename pkg/{kncloudevents => tracing}/populate_span_transformer_test.go (91%) diff --git a/pkg/channel/message_dispatcher.go b/pkg/channel/message_dispatcher.go index 3765e5d2e1e..fe83e079a26 100644 --- a/pkg/channel/message_dispatcher.go +++ b/pkg/channel/message_dispatcher.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/eventing/pkg/tracing" "knative.dev/eventing/pkg/utils" ) @@ -190,7 +191,7 @@ func (d *MessageDispatcherImpl) executeRequest(ctx context.Context, url *url.URL } if span.IsRecordingEvents() { - err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders, kncloudevents.PopulateSpan(span)) + err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders, tracing.PopulateSpan(span, url.String())) } else { err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders) } diff --git a/pkg/kncloudevents/populate_span_transformer.go b/pkg/tracing/populate_span_transformer.go similarity index 82% rename from pkg/kncloudevents/populate_span_transformer.go rename to pkg/tracing/populate_span_transformer.go index da7b025469d..d00790956fe 100644 --- a/pkg/kncloudevents/populate_span_transformer.go +++ b/pkg/tracing/populate_span_transformer.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kncloudevents +package tracing import ( "github.com/cloudevents/sdk-go/v2/binding" @@ -23,16 +23,12 @@ import ( "go.opencensus.io/trace" ) -func PopulateSpan(span *trace.Span) binding.TransformerFunc { +func PopulateSpan(span *trace.Span, destination string) binding.TransformerFunc { return func(reader binding.MessageMetadataReader, writer binding.MessageMetadataWriter) error { - _, specVersion := reader.GetAttribute(spec.SpecVersion) - if specVersion != nil { - specVersionParsed, err := types.Format(specVersion) - if err != nil { - return err - } - span.AddAttributes(trace.StringAttribute("cloudevents.specversion", specVersionParsed)) - } + + span.AddAttributes(MessagingProtocolHTTP) + span.AddAttributes(MessagingSystemAttribute) + span.AddAttributes(trace.StringAttribute(MessagingDestinationAttributeName, destination)) _, id := reader.GetAttribute(spec.ID) if id != nil { @@ -40,9 +36,19 @@ func PopulateSpan(span *trace.Span) binding.TransformerFunc { if err != nil { return err } + span.AddAttributes(trace.StringAttribute(MessagingMessageIDAttributeName, idParsed)) span.AddAttributes(trace.StringAttribute("cloudevents.id", idParsed)) } + _, specVersion := reader.GetAttribute(spec.SpecVersion) + if specVersion != nil { + specVersionParsed, err := types.Format(specVersion) + if err != nil { + return err + } + span.AddAttributes(trace.StringAttribute("cloudevents.specversion", specVersionParsed)) + } + _, ty := reader.GetAttribute(spec.Type) if ty != nil { tyParsed, err := types.Format(ty) diff --git a/pkg/kncloudevents/populate_span_transformer_test.go b/pkg/tracing/populate_span_transformer_test.go similarity index 91% rename from pkg/kncloudevents/populate_span_transformer_test.go rename to pkg/tracing/populate_span_transformer_test.go index 12f906fc522..4af52861e21 100644 --- a/pkg/kncloudevents/populate_span_transformer_test.go +++ b/pkg/tracing/populate_span_transformer_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kncloudevents +package tracing import ( "context" @@ -46,7 +46,13 @@ func TestPopulateSpan(t *testing.T) { wantEvent.SetType("hello.world") wantEvent.SetSource("example.com") + destination := "some-url" + expectedAttributes := map[string]interface{}{ + "messaging.system": "knative", + "messaging.protocol": "HTTP", + "messaging.message_id": "aaa", + "messaging.destination": "some-url", "cloudevents.id": "aaa", "cloudevents.type": "hello.world", "cloudevents.source": "example.com", @@ -63,7 +69,7 @@ func TestPopulateSpan(t *testing.T) { spanData := <-mockExp require.Equal(t, expectedAttributes, spanData.Attributes) }, - Transformers: binding.Transformers{PopulateSpan(testSpanBinary)}, + Transformers: binding.Transformers{PopulateSpan(testSpanBinary, destination)}, }, { Name: "Populate span for event messages", @@ -74,7 +80,7 @@ func TestPopulateSpan(t *testing.T) { spanData := <-mockExp require.Equal(t, expectedAttributes, spanData.Attributes) }, - Transformers: binding.Transformers{PopulateSpan(testSpanEvent)}, + Transformers: binding.Transformers{PopulateSpan(testSpanEvent, destination)}, }, }) } From 3b83ae66a8a82f0233cdc7a2925437773c09b055 Mon Sep 17 00:00:00 2001 From: Sameer Vohra Date: Tue, 15 Dec 2020 14:30:03 -0600 Subject: [PATCH 2/4] [#4578] Update ../../docs/spec/channel.md - change Metrics -> Observability, to match spec/broker.md - relocate trace pass through & trace config information from Misc to Observability sections - add section specifying messaging.* attributes that should be included in the trace span Signed-off-by: Sameer Vohra --- docs/spec/channel.md | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/docs/spec/channel.md b/docs/spec/channel.md index 57d512da2f8..a5dea76470f 100644 --- a/docs/spec/channel.md +++ b/docs/spec/channel.md @@ -327,13 +327,6 @@ discretion (e.g. expose a gRPC endpoint to accept events). If a Channel receives an event queueing request and is unable to parse a valid CloudEvent, then it MUST reject the request. -The Channel MUST recognize and pass through all tracing information from sender -to subscribers using [W3C Tracecontext](https://w3c.github.io/trace-context/), -although internally it MAY use another mechanism(s) to propagate the tracing -information. The Channel SHOULD sample and write traces to the location -specified in -[`config-tracing`](https://github.com/knative/eventing/blob/master/config/config-tracing.yaml). - ##### HTTP Channels MUST reject all HTTP event queueing requests with a method other than @@ -388,7 +381,7 @@ not limited to: - the time in-between retries - the backoff rate -#### Metrics +#### Observability Channels SHOULD expose a variety of metrics, including, but not limited to: @@ -401,6 +394,23 @@ Channels SHOULD expose a variety of metrics, including, but not limited to: Metrics SHOULD be enabled by default, with a configuration parameter included to disable them if desired. +The Channel MUST recognize and pass through all tracing information from sender +to subscribers using [W3C Tracecontext](https://w3c.github.io/trace-context/), +although internally it MAY use another mechanism(s) to propagate the tracing +information. The Channel SHOULD sample and write traces to the location +specified in +[`config-tracing`](https://github.com/knative/eventing/blob/master/config/config-tracing.yaml). + +Spans emitted by the Channel SHOULD follow the +[OpenTelemetry Semantic Conventions for Messaging Systems](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md) +whenever possible. In particular, spans emitted by the Channel SHOULD set the +following attributes: + +- messaging.system: "knative" +- messaging.destination: url to which the event is being routed +- messaging.protocol: the name of the underlying transport protocol +- messaging.message_id: the event ID + ## Changelog - `0.11.x release`: CloudEvents in 0.3 and 1.0 are supported. From 48153c4716bda4d90006e249498ce290f22e30a4 Mon Sep 17 00:00:00 2001 From: Sameer Vohra Date: Fri, 18 Dec 2020 14:23:04 -0600 Subject: [PATCH 3/4] [#4578] Update channel tracing conformance test - add check for Open Telemetry Spec tags (messaging.*) in spans emitted by message dispatcher - exported helpful regex host suffix - re-worked the channel trace tree to include all the missing dispatcher spans. Signed-off-by: Sameer Vohra --- .../helpers/channel_tracing_test_helper.go | 128 +++++++++++------- test/conformance/helpers/tracing/traces.go | 27 ++-- 2 files changed, 95 insertions(+), 60 deletions(-) diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 2cecf0007ee..73c9418eb95 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -19,6 +19,7 @@ package helpers import ( "context" "fmt" + "regexp" "testing" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -73,10 +74,10 @@ func setupChannelTracingWithReply( recordEventsPod := recordevents.DeployEventRecordOrFail(ctx, client, recordEventsPodName) // Create the subscriber, a Pod that mutates the event. - transformerPod := recordevents.DeployEventRecordOrFail( + mutatingPod := recordevents.DeployEventRecordOrFail( ctx, client, - "transformer", + "mutator", recordevents.ReplyWithTransformedEvent( "mutated", eventSource, @@ -89,7 +90,7 @@ func setupChannelTracingWithReply( "sub", channelName, channel, - resources.WithSubscriberForSubscription(transformerPod.Name), + resources.WithSubscriberForSubscription(mutatingPod.Name), resources.WithReplyForSubscription(replyChannelName, channel)) // Create the Subscription linking the reply Channel to the LogEvents K8s Service. @@ -125,12 +126,15 @@ func setupChannelTracingWithReply( // We expect the following spans: // 1. Sending pod sends event to Channel (only if the sending pod generates a span). // 2. Channel receives event from sending pod. - // 3. Channel sends event to transformer pod. - // 4. Transformer Pod receives event from Channel. - // 5. Channel sends reply from Transformer Pod to the reply Channel. - // 6. Reply Channel receives event from the original Channel's reply. - // 7. Reply Channel sends event to the logging Pod. - // 8. Logging pod receives event from Channel. + // 3. Channel Dispatcher span + // 4. Channel sends event to transformer pod. + // 5. Transformer Pod receives event from Channel. + // 6. Channel Dispatcher span + // 7. Channel sends reply from Transformer Pod to the reply Channel. + // 8. Reply Channel receives event from the original Channel's reply. + // 9. Channel Dispatcher span + // 10. Reply Channel sends event to the logging Pod. + // 11. Logging pod receives event from Channel. expected := tracinghelper.TestSpanTree{ // 1 is added below if it is needed. // 2. Channel receives event from sending pod. @@ -143,68 +147,86 @@ func setupChannelTracingWithReply( ), Children: []tracinghelper.TestSpanTree{ { - // 3. Channel sends event to transformer pod. - Span: tracinghelper.MatchHTTPSpanWithReply( - model.Client, - tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s.%s.svc", transformerPod.Name, client.Namespace), - "/", - ), - ), + // 3. Channel Dispatcher span + Span: channelSpan(eventID, fmt.Sprintf("%s.%s.svc", mutatingPod.Name, client.Namespace), "/"), Children: []tracinghelper.TestSpanTree{ { - // 4. Transformer Pod receives event from Channel. + // 4. Channel sends event to transformer pod. Span: tracinghelper.MatchHTTPSpanWithReply( - model.Server, + model.Client, tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s.%s.svc", transformerPod.Name, client.Namespace), + fmt.Sprintf("%s.%s.svc", mutatingPod.Name, client.Namespace), "/", ), - tracinghelper.WithLocalEndpointServiceName(transformerPod.Name), ), + Children: []tracinghelper.TestSpanTree{ + { + // 5. Transformer Pod receives event from Channel. + Span: tracinghelper.MatchHTTPSpanWithReply( + model.Server, + tracinghelper.WithHTTPHostAndPath( + fmt.Sprintf("%s.%s.svc", mutatingPod.Name, client.Namespace), + "/", + ), + tracinghelper.WithLocalEndpointServiceName(mutatingPod.Name), + ), + }, + }, }, - }, - }, - { - // 5. Channel sends reply from Transformer Pod to the reply Channel. - Span: tracinghelper.MatchHTTPSpanNoReply( - model.Client, - tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), - "", - ), - ), - Children: []tracinghelper.TestSpanTree{ - // 6. Reply Channel receives event from the original Channel's reply. { - Span: tracinghelper.MatchHTTPSpanNoReply( - model.Server, - tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), - "/", - ), - ), + // 6. Channel Dispatcher span + Span: channelSpan(eventID, fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), ""), Children: []tracinghelper.TestSpanTree{ { - // 7. Reply Channel sends event to the logging Pod. + // 7. Channel sends reply from Transformer Pod to the reply Channel. Span: tracinghelper.MatchHTTPSpanNoReply( model.Client, tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace), - "/", + fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), + "", ), ), Children: []tracinghelper.TestSpanTree{ { - // 8. Logging pod receives event from Channel. + // 8. Reply Channel receives event from the original Channel's reply. Span: tracinghelper.MatchHTTPSpanNoReply( model.Server, tracinghelper.WithHTTPHostAndPath( - fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace), + fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), "/", ), - tracinghelper.WithLocalEndpointServiceName(recordEventsPod.Name), ), + Children: []tracinghelper.TestSpanTree{ + { + // 9. Channel Dispatcher span + Span: channelSpan(eventID, fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace), "/"), + Children: []tracinghelper.TestSpanTree{ + { + // 10. Reply Channel sends event to the logging Pod. + Span: tracinghelper.MatchHTTPSpanNoReply( + model.Client, + tracinghelper.WithHTTPHostAndPath( + fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace), + "/", + ), + ), + Children: []tracinghelper.TestSpanTree{ + { + // 11. Logging pod receives event from Channel. + Span: tracinghelper.MatchHTTPSpanNoReply( + model.Server, + tracinghelper.WithHTTPHostAndPath( + fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace), + "/", + ), + tracinghelper.WithLocalEndpointServiceName(recordEventsPod.Name), + ), + }, + }, + }, + }, + }, + }, }, }, }, @@ -236,3 +258,15 @@ func setupChannelTracingWithReply( cetest.DataContains(body), ) } + +func channelSpan(eventID, host, path string) *tracinghelper.SpanMatcher { + k := model.Client + return &tracinghelper.SpanMatcher{ + Kind: &k, + Tags: map[string]*regexp.Regexp{ + "messaging.system": regexp.MustCompile("^knative$"), + "messaging.destination": regexp.MustCompile("^http://" + host + tracinghelper.HostSuffix + path + "$"), + "messaging.message_id": regexp.MustCompile("^" + eventID + "$"), + }, + } +} diff --git a/test/conformance/helpers/tracing/traces.go b/test/conformance/helpers/tracing/traces.go index f244f94cf9c..079039aeb61 100644 --- a/test/conformance/helpers/tracing/traces.go +++ b/test/conformance/helpers/tracing/traces.go @@ -28,6 +28,17 @@ import ( "github.com/openzipkin/zipkin-go/model" ) +// hostSuffix is an optional suffix that might appear at the end of hostnames. +// We supplement matches with this to allow matches for: +// foo.bar +// to match all of: +// foo.bar +// foo.bar.svc +// foo.bar.svc.cluster.local +// It's hardly perfect, but requires the suffix to start with the delimiter '.' +// and then match anything prior to the path starting, e.g. '/' +const HostSuffix = "[.][^/]+" + // PrettyPrintTrace pretty prints a Trace. func PrettyPrintTrace(trace []model.SpanModel) string { b, _ := json.Marshal(trace) @@ -61,23 +72,13 @@ func WithLocalEndpointServiceName(s string) SpanMatcherOption { } func WithHTTPHostAndPath(host, path string) SpanMatcherOption { - // hostSuffix is an optional suffix that might appear at the end of hostnames. - // We supplement matches with this to allow matches for: - // foo.bar - // to match all of: - // foo.bar - // foo.bar.svc - // foo.bar.svc.cluster.local - // It's hardly perfect, but requires the suffix to start with the delimiter '.' - // and then match anything prior to the path starting, e.g. '/' - hostSuffix := "[.][^/]+" return func(m *SpanMatcher) { if m.Kind != nil { if *m.Kind == model.Client { - m.Tags["http.url"] = regexp.MustCompile("^http://" + regexp.QuoteMeta(host) + hostSuffix + regexp.QuoteMeta(path) + "$") + m.Tags["http.url"] = regexp.MustCompile("^http://" + regexp.QuoteMeta(host) + HostSuffix + regexp.QuoteMeta(path) + "$") } else if *m.Kind == model.Server { - m.Tags["http.host"] = regexp.MustCompile("^" + regexp.QuoteMeta(host) + hostSuffix + "$") + m.Tags["http.host"] = regexp.MustCompile("^" + regexp.QuoteMeta(host) + HostSuffix + "$") m.Tags["http.path"] = regexp.MustCompile("^" + regexp.QuoteMeta(path) + "$") } } @@ -225,7 +226,7 @@ func (tt TestSpanTree) MatchesSubtree(t *testing.T, actual *SpanTree) (matches [ } // matchesSubtrees checks for a match of each TestSpanTree with a -// subtree of a distrinct actual SpanTree. +// subtree of a distinct actual SpanTree. func matchesSubtrees(t *testing.T, ts []TestSpanTree, as []SpanTree) error { if t != nil { t.Helper() From 9ed38dab5ce0c7137e1c9f121f73a5773a0efb16 Mon Sep 17 00:00:00 2001 From: Sameer Vohra Date: Fri, 18 Dec 2020 15:56:20 -0600 Subject: [PATCH 4/4] Update channel tracing conformance test - nit: change transformer -> mutator for consistency Signed-off-by: Sameer Vohra --- .../helpers/channel_tracing_test_helper.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 73c9418eb95..ad545b94161 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -127,10 +127,10 @@ func setupChannelTracingWithReply( // 1. Sending pod sends event to Channel (only if the sending pod generates a span). // 2. Channel receives event from sending pod. // 3. Channel Dispatcher span - // 4. Channel sends event to transformer pod. - // 5. Transformer Pod receives event from Channel. + // 4. Channel sends event to Mutator pod. + // 5. Mutator Pod receives event from Channel. // 6. Channel Dispatcher span - // 7. Channel sends reply from Transformer Pod to the reply Channel. + // 7. Channel sends reply from Mutator Pod to the reply Channel. // 8. Reply Channel receives event from the original Channel's reply. // 9. Channel Dispatcher span // 10. Reply Channel sends event to the logging Pod. @@ -151,7 +151,7 @@ func setupChannelTracingWithReply( Span: channelSpan(eventID, fmt.Sprintf("%s.%s.svc", mutatingPod.Name, client.Namespace), "/"), Children: []tracinghelper.TestSpanTree{ { - // 4. Channel sends event to transformer pod. + // 4. Channel sends event to Mutator pod. Span: tracinghelper.MatchHTTPSpanWithReply( model.Client, tracinghelper.WithHTTPHostAndPath( @@ -161,7 +161,7 @@ func setupChannelTracingWithReply( ), Children: []tracinghelper.TestSpanTree{ { - // 5. Transformer Pod receives event from Channel. + // 5. Mutator Pod receives event from Channel. Span: tracinghelper.MatchHTTPSpanWithReply( model.Server, tracinghelper.WithHTTPHostAndPath( @@ -178,7 +178,7 @@ func setupChannelTracingWithReply( Span: channelSpan(eventID, fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), ""), Children: []tracinghelper.TestSpanTree{ { - // 7. Channel sends reply from Transformer Pod to the reply Channel. + // 7. Channel sends reply from Mutator Pod to the reply Channel. Span: tracinghelper.MatchHTTPSpanNoReply( model.Client, tracinghelper.WithHTTPHostAndPath(