Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions docs/spec/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,6 @@ discretion (e.g. expose a gRPC endpoint to accept events).
If a Channel receives an event queueing request and is unable to parse a valid
CloudEvent, then it MUST reject the request.

The Channel MUST recognize and pass through all tracing information from sender
to subscribers using [W3C Tracecontext](https://w3c.github.io/trace-context/),
although internally it MAY use another mechanism(s) to propagate the tracing
information. The Channel SHOULD sample and write traces to the location
specified in
[`config-tracing`](https://github.com/knative/eventing/blob/master/config/config-tracing.yaml).

##### HTTP

Channels MUST reject all HTTP event queueing requests with a method other than
Expand Down Expand Up @@ -388,7 +381,7 @@ not limited to:
- the time in-between retries
- the backoff rate

#### Metrics
#### Observability
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name change makes perfect sense. Thanks for catching that.


Channels SHOULD expose a variety of metrics, including, but not limited to:

Expand All @@ -401,6 +394,23 @@ Channels SHOULD expose a variety of metrics, including, but not limited to:
Metrics SHOULD be enabled by default, with a configuration parameter included to
disable them if desired.

The Channel MUST recognize and pass through all tracing information from sender
to subscribers using [W3C Tracecontext](https://w3c.github.io/trace-context/),
although internally it MAY use another mechanism(s) to propagate the tracing
information. The Channel SHOULD sample and write traces to the location
specified in
[`config-tracing`](https://github.com/knative/eventing/blob/master/config/config-tracing.yaml).
Comment on lines +397 to +402
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies for broker

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me the observabiliity parts of broker and channel should be exactly the same. I am wondering if it worths it is own spec and just reference the spec in both channel and broker. Adam also mentioned that whether the broker should have some common metrics, but I remembered the last conversation ended with not going to implement it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhongduo thats a fair point.
In the current spec, the channel has an additional requirement of passing through tracing info from sender to subscribers. It sounds like that would be applicable for the Broker as well ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like that would be applicable for the Broker as well ?

Yep, and similar to channel, there is no constraint on how to propagate tracing informations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be very confusing if there are difference between Channel and broker, as you can build a "broker" using channels.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you can build a "broker" using channels.

But that would be an implementation detail of how you build that particular broker

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't really thinking about "how to build the broker". My point is that in tracing side, most requirements in broker will apply in channel as well, so just wondering how we can avoid such an inconsistency. I do understand theoretically they can be very different, but at least now they looks related to me.


Spans emitted by the Channel SHOULD follow the
[OpenTelemetry Semantic Conventions for Messaging Systems](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md)
whenever possible. In particular, spans emitted by the Channel SHOULD set the
following attributes:

- messaging.system: "knative"
- messaging.destination: url to which the event is being routed
- messaging.protocol: the name of the underlying transport protocol
- messaging.message_id: the event ID

## Changelog

- `0.11.x release`: CloudEvents in 0.3 and 1.0 are supported.
Expand Down
3 changes: 2 additions & 1 deletion pkg/channel/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

Expand Down Expand Up @@ -190,7 +191,7 @@ func (d *MessageDispatcherImpl) executeRequest(ctx context.Context, url *url.URL
}

if span.IsRecordingEvents() {
err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders, kncloudevents.PopulateSpan(span))
err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders, tracing.PopulateSpan(span, url.String()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're keeping kncloudevents.PopulateSpan (which is fine, to avoid breaking downstream), can you add the deprecated comment to such function to signal users to use this new one in tracing module?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kncloudevents.PopulateSpan function & file was removed in the PR. However, I hadn't considered how this change might impact downstream.
The only references to PopulateSpan were from message_dispatcher.go, which I assumed would be shipped together. Are there downstream uses cases that would be affected ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't remember, try to remove the function and check if the downstream checks break 😄

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this PR removes the function I was looking at the downstream checks for the PR. It looks like all but the Kafka downstream checks are passing. I am not sure if the Kafka failure is related to the changes in the PR.

I also did a github search in knative org and knative-sandbox org and the only reference is message_dispatcher.go, so this change feels safe.

Any additional things I could check ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, rebased off master just now and now the downstream kafak test is passing but the rabbit and nats ones are failing due to go mod issues.

} else {
err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package kncloudevents
package tracing

import (
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -23,26 +23,32 @@ import (
"go.opencensus.io/trace"
)

func PopulateSpan(span *trace.Span) binding.TransformerFunc {
func PopulateSpan(span *trace.Span, destination string) binding.TransformerFunc {
return func(reader binding.MessageMetadataReader, writer binding.MessageMetadataWriter) error {
_, specVersion := reader.GetAttribute(spec.SpecVersion)
if specVersion != nil {
specVersionParsed, err := types.Format(specVersion)
if err != nil {
return err
}
span.AddAttributes(trace.StringAttribute("cloudevents.specversion", specVersionParsed))
}

span.AddAttributes(MessagingProtocolHTTP)
span.AddAttributes(MessagingSystemAttribute)
span.AddAttributes(trace.StringAttribute(MessagingDestinationAttributeName, destination))

_, id := reader.GetAttribute(spec.ID)
if id != nil {
idParsed, err := types.Format(id)
if err != nil {
return err
}
span.AddAttributes(trace.StringAttribute(MessagingMessageIDAttributeName, idParsed))
span.AddAttributes(trace.StringAttribute("cloudevents.id", idParsed))
}

_, specVersion := reader.GetAttribute(spec.SpecVersion)
if specVersion != nil {
specVersionParsed, err := types.Format(specVersion)
if err != nil {
return err
}
span.AddAttributes(trace.StringAttribute("cloudevents.specversion", specVersionParsed))
}

_, ty := reader.GetAttribute(spec.Type)
if ty != nil {
tyParsed, err := types.Format(ty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package kncloudevents
package tracing

import (
"context"
Expand Down Expand Up @@ -46,7 +46,13 @@ func TestPopulateSpan(t *testing.T) {
wantEvent.SetType("hello.world")
wantEvent.SetSource("example.com")

destination := "some-url"

expectedAttributes := map[string]interface{}{
"messaging.system": "knative",
"messaging.protocol": "HTTP",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that http is the protocol used, do we support https? How about protocol version maybe we should be explicit as there is wip (if I understood correctly) for http/2?

Copy link
Copy Markdown
Contributor Author

@xtreme-sameer-vohra xtreme-sameer-vohra Dec 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @skonto
If I understood correctly, once the http/2 feature is wrapped up, the protocol string could change and hence hardcoding a value in the test is not a good idea.

The current implementation has it hardcoded here which would need to change once the http/2 feature is completed.

If that change is imminent;

  • the assertion could be updated to match a regex
  • the string could be pulled out and put into an enum ( would still only be configured for HTTP for now )
  • and if the test assertion for protocol isn't very valuable, the assertion for protocol could be dropped

wdyt ?

"messaging.message_id": "aaa",
"messaging.destination": "some-url",
"cloudevents.id": "aaa",
"cloudevents.type": "hello.world",
"cloudevents.source": "example.com",
Expand All @@ -63,7 +69,7 @@ func TestPopulateSpan(t *testing.T) {
spanData := <-mockExp
require.Equal(t, expectedAttributes, spanData.Attributes)
},
Transformers: binding.Transformers{PopulateSpan(testSpanBinary)},
Transformers: binding.Transformers{PopulateSpan(testSpanBinary, destination)},
},
{
Name: "Populate span for event messages",
Expand All @@ -74,7 +80,7 @@ func TestPopulateSpan(t *testing.T) {
spanData := <-mockExp
require.Equal(t, expectedAttributes, spanData.Attributes)
},
Transformers: binding.Transformers{PopulateSpan(testSpanEvent)},
Transformers: binding.Transformers{PopulateSpan(testSpanEvent, destination)},
},
})
}
128 changes: 81 additions & 47 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package helpers
import (
"context"
"fmt"
"regexp"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -73,10 +74,10 @@ func setupChannelTracingWithReply(
recordEventsPod := recordevents.DeployEventRecordOrFail(ctx, client, recordEventsPodName)

// Create the subscriber, a Pod that mutates the event.
transformerPod := recordevents.DeployEventRecordOrFail(
mutatingPod := recordevents.DeployEventRecordOrFail(
ctx,
client,
"transformer",
"mutator",
recordevents.ReplyWithTransformedEvent(
"mutated",
eventSource,
Expand All @@ -89,7 +90,7 @@ func setupChannelTracingWithReply(
"sub",
channelName,
channel,
resources.WithSubscriberForSubscription(transformerPod.Name),
resources.WithSubscriberForSubscription(mutatingPod.Name),
resources.WithReplyForSubscription(replyChannelName, channel))

// Create the Subscription linking the reply Channel to the LogEvents K8s Service.
Expand Down Expand Up @@ -125,12 +126,15 @@ func setupChannelTracingWithReply(
// We expect the following spans:
// 1. Sending pod sends event to Channel (only if the sending pod generates a span).
// 2. Channel receives event from sending pod.
// 3. Channel sends event to transformer pod.
// 4. Transformer Pod receives event from Channel.
// 5. Channel sends reply from Transformer Pod to the reply Channel.
// 6. Reply Channel receives event from the original Channel's reply.
// 7. Reply Channel sends event to the logging Pod.
// 8. Logging pod receives event from Channel.
// 3. Channel Dispatcher span
// 4. Channel sends event to Mutator pod.
Comment on lines +129 to +130
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2 spans here? Are we sure this isn't an impl detail here and only 1 match should be done (the point 3)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered the the span containing the cloudevent & tracing tags wasn't being tested previously.

Removing the assertion for the span corresponding to Channel sends event to Mutator pod doesn't impact the issue this PR is attempting to address. However, It would just loose the test case to ensure the http status of the resultant request is not tested. I'd defer the importance of testing/not testing to folks with more context.

Looking at the broker tests, it doesn't appear that they are asserting for the presence of the span with the http status either.

So, I am happy to drop the assertions for spans corresponding to 4, 7 and 10.

Does that make sense @slinkydeveloper ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think i explained myself wrongly. Why do assert the span:

Channel Dispatcher span

Isn't that an implementation detail?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats the span that contains the Open Telemetry Spec Attributes that this PR adds
Ditto for spans labeled 6 and 9

// 5. Mutator Pod receives event from Channel.
// 6. Channel Dispatcher span
// 7. Channel sends reply from Mutator Pod to the reply Channel.
// 8. Reply Channel receives event from the original Channel's reply.
// 9. Channel Dispatcher span
// 10. Reply Channel sends event to the logging Pod.
// 11. Logging pod receives event from Channel.
expected := tracinghelper.TestSpanTree{
// 1 is added below if it is needed.
// 2. Channel receives event from sending pod.
Expand All @@ -143,68 +147,86 @@ func setupChannelTracingWithReply(
),
Children: []tracinghelper.TestSpanTree{
{
// 3. Channel sends event to transformer pod.
Span: tracinghelper.MatchHTTPSpanWithReply(
model.Client,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc", transformerPod.Name, client.Namespace),
"/",
),
),
// 3. Channel Dispatcher span
Span: channelSpan(eventID, fmt.Sprintf("%s.%s.svc", mutatingPod.Name, client.Namespace), "/"),
Children: []tracinghelper.TestSpanTree{
{
// 4. Transformer Pod receives event from Channel.
// 4. Channel sends event to Mutator pod.
Span: tracinghelper.MatchHTTPSpanWithReply(
model.Server,
model.Client,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc", transformerPod.Name, client.Namespace),
fmt.Sprintf("%s.%s.svc", mutatingPod.Name, client.Namespace),
"/",
),
tracinghelper.WithLocalEndpointServiceName(transformerPod.Name),
),
Children: []tracinghelper.TestSpanTree{
{
// 5. Mutator Pod receives event from Channel.
Span: tracinghelper.MatchHTTPSpanWithReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc", mutatingPod.Name, client.Namespace),
"/",
),
tracinghelper.WithLocalEndpointServiceName(mutatingPod.Name),
),
},
},
},
},
},
{
// 5. Channel sends reply from Transformer Pod to the reply Channel.
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Client,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace),
"",
),
),
Children: []tracinghelper.TestSpanTree{
// 6. Reply Channel receives event from the original Channel's reply.
{
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace),
"/",
),
),
// 6. Channel Dispatcher span
Span: channelSpan(eventID, fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), ""),
Children: []tracinghelper.TestSpanTree{
{
// 7. Reply Channel sends event to the logging Pod.
// 7. Channel sends reply from Mutator Pod to the reply Channel.
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Client,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace),
"/",
fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace),
"",
),
),
Children: []tracinghelper.TestSpanTree{
{
// 8. Logging pod receives event from Channel.
// 8. Reply Channel receives event from the original Channel's reply.
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace),
fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace),
"/",
),
tracinghelper.WithLocalEndpointServiceName(recordEventsPod.Name),
),
Children: []tracinghelper.TestSpanTree{
{
// 9. Channel Dispatcher span
Span: channelSpan(eventID, fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace), "/"),
Children: []tracinghelper.TestSpanTree{
{
// 10. Reply Channel sends event to the logging Pod.
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Client,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace),
"/",
),
),
Children: []tracinghelper.TestSpanTree{
{
// 11. Logging pod receives event from Channel.
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc", recordEventsPod.Name, client.Namespace),
"/",
),
tracinghelper.WithLocalEndpointServiceName(recordEventsPod.Name),
),
},
},
},
},
},
},
},
},
},
Expand Down Expand Up @@ -236,3 +258,15 @@ func setupChannelTracingWithReply(
cetest.DataContains(body),
)
}

func channelSpan(eventID, host, path string) *tracinghelper.SpanMatcher {
k := model.Client
return &tracinghelper.SpanMatcher{
Kind: &k,
Tags: map[string]*regexp.Regexp{
"messaging.system": regexp.MustCompile("^knative$"),
"messaging.destination": regexp.MustCompile("^http://" + host + tracinghelper.HostSuffix + path + "$"),
"messaging.message_id": regexp.MustCompile("^" + eventID + "$"),
},
}
}
Loading