-
Notifications
You must be signed in to change notification settings - Fork 630
Tracing works with Broker and Trigger #936
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d7ed39b
080f5eb
f20a2df
163474d
5f53e05
2f6a03f
3ee0b8b
da9fe36
f8cca2e
d7ebdcb
c5f7a83
054c10d
942b979
afd71af
7a1f69b
2be56bd
10acfa3
2108d83
6aa7086
3cd236f
be350a0
0068232
b4e2358
3a9fd94
5ca1b9f
bd53cda
17889fb
582eb4c
4f803a2
eec0f8d
ac3587e
34d42e0
1788a9b
ad4e6f5
bc519d8
3f47238
3070157
0957fda
ea53702
f0106aa
509c8a1
d33308b
af83baa
8df699c
bd6a8fa
d2011e6
d567412
3b84c91
1d6c759
0e43a7f
2df6bdc
bfe61c1
22efbe0
3a3c13c
f834f89
698a40c
6b817ef
451b607
4097d0a
97e642d
5e0319f
4bb7fda
da32bae
7bc8b7a
8ccfcca
29acf36
e75141c
e79b8f7
ae665bd
af2f475
dbe650a
30f1c74
f363e02
e45d19a
1628333
a6d2650
434bf9d
cbbd066
c33c569
66f6f44
40ca8da
650ef47
d2c1ef8
ae04007
6d6c488
9c63dbc
3ad193f
d18acab
2a02ba5
54cdb3c
11a1734
d713eeb
89cdd58
bd4345a
49fd1a0
4cdfd58
b42d359
29be515
1edbf0a
9c415e6
ac9b485
2b39168
ed9fcbe
4848952
311e174
d361afb
d1198a9
6bd659c
0d80f9c
cd862ff
2ea7aa3
aa60a8a
6471b4f
6bc7f64
56ad43c
c4959e2
69ebf18
1dccad8
2b67eb8
a44d6d2
41ab0a8
6a832ea
446022a
7f24f14
2a3eb19
9debaf2
abf3427
a83ca82
fda6550
3ee6343
2aae7eb
e15431d
5f201fa
ba726b3
e7ff5b2
d640c7a
dc91427
0ea0402
dcb8774
df88eab
86f7330
b6fe6bd
71c5250
b12da7e
8d6df67
c27d283
d9a607c
77a69f9
fef7628
6dd814f
f8d606d
a3b7728
80005e8
bf335bc
a106553
bc05fdf
134198d
0bdac35
2e50060
f27c63a
747e290
2b9a2e0
f3121ae
50aa925
c63f23e
ad9bf7c
b314abd
a9d7111
bd47d55
36ef2a4
c66f21e
f94d5a8
a6f6bd4
44e4575
591fd60
f6ea5ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import ( | |
| "errors" | ||
| "net/http" | ||
| "net/url" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/cloudevents/sdk-go/pkg/cloudevents" | ||
|
|
@@ -31,6 +32,7 @@ import ( | |
| "github.com/knative/eventing/pkg/provisioners" | ||
| "go.uber.org/zap" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/apimachinery/pkg/util/sets" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
| ) | ||
|
|
||
|
|
@@ -40,6 +42,22 @@ const ( | |
| writeTimeout = 1 * time.Minute | ||
| ) | ||
|
|
||
| var ( | ||
| // These MUST be lowercase strings, as they will be compared against lowercase strings. | ||
| forwardHeaders = sets.NewString( | ||
| // tracing | ||
| "x-request-id", | ||
| ) | ||
| // These MUST be lowercase strings, as they will be compared against lowercase strings. | ||
| forwardPrefixes = []string{ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use sets or an array, not both? why both?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I use a set for the first one so that I can easily do |
||
| // knative | ||
| "knative-", | ||
| // tracing | ||
| "x-b3-", | ||
| "x-ot-", | ||
| } | ||
| ) | ||
|
|
||
| // Receiver parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. | ||
| type Receiver struct { | ||
| logger *zap.Logger | ||
|
|
@@ -142,18 +160,22 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp | |
|
|
||
| r.logger.Debug("Received message", zap.Any("triggerRef", triggerRef)) | ||
|
|
||
| responseEvent, err := r.sendEvent(ctx, triggerRef, &event) | ||
| responseEvent, err := r.sendEvent(ctx, tctx, triggerRef, &event) | ||
| if err != nil { | ||
| r.logger.Error("Error sending the event", zap.Error(err)) | ||
| return err | ||
| } | ||
| resp.Status = http.StatusAccepted | ||
| resp.Event = responseEvent | ||
|
|
||
| // TODO Add filtered headers (mostly tracing) to the response. We are waiting for CloudEvents | ||
| // SDK to allow this. | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // sendEvent sends an event to a subscriber if the trigger filter passes. | ||
| func (r *Receiver) sendEvent(ctx context.Context, trigger provisioners.ChannelReference, event *cloudevents.Event) (*cloudevents.Event, error) { | ||
| func (r *Receiver) sendEvent(ctx context.Context, tctx cehttp.TransportContext, trigger provisioners.ChannelReference, event *cloudevents.Event) (*cloudevents.Event, error) { | ||
| t, err := r.getTrigger(ctx, trigger) | ||
| if err != nil { | ||
| r.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", trigger)) | ||
|
|
@@ -179,6 +201,7 @@ func (r *Receiver) sendEvent(ctx context.Context, trigger provisioners.ChannelRe | |
| } | ||
|
|
||
| sendingCtx := cecontext.WithTarget(ctx, subscriberURI.String()) | ||
| sendingCtx = addFilteredHeaders(sendingCtx, tctx) | ||
| return r.ceHttp.Send(sendingCtx, *event) | ||
| } | ||
|
|
||
|
|
@@ -214,3 +237,28 @@ func (r *Receiver) shouldSendMessage(ts *eventingv1alpha1.TriggerSpec, event *cl | |
| } | ||
| return true | ||
| } | ||
|
|
||
| func addFilteredHeaders(ctx context.Context, tctx cehttp.TransportContext) context.Context { | ||
| // Helper function that adds the header name and all its values. | ||
| addHeader := func(ctx context.Context, n string, v []string) context.Context { | ||
| for _, iv := range v { | ||
| ctx = cehttp.ContextWithHeader(ctx, n, iv) | ||
| } | ||
| return ctx | ||
| } | ||
|
|
||
| for n, v := range tctx.Header { | ||
| lower := strings.ToLower(n) | ||
| if forwardHeaders.Has(lower) { | ||
| ctx = addHeader(ctx, n, v) | ||
| continue | ||
| } | ||
| for _, prefix := range forwardPrefixes { | ||
| if strings.HasPrefix(lower, prefix) { | ||
| ctx = addHeader(ctx, n, v) | ||
| break | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do you break here?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this will only add one header?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This breaks out of the 'range over forwardPrefixes' for loop, not the 'range over all headers from the context' for loop. |
||
| } | ||
| } | ||
| } | ||
| return ctx | ||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.