From 4a0b600913ddfb34b520a686cafc45ffa4cd0fff Mon Sep 17 00:00:00 2001 From: Mike Helmick Date: Sat, 22 Jun 2019 13:13:53 -0700 Subject: [PATCH 1/3] Fix issue with broker dropping messages with "No TTL Seen" --- pkg/broker/receiver_test.go | 4 ++++ pkg/broker/ttl.go | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index 5a473570717..eb349caa6a0 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -209,6 +209,8 @@ func TestReceiver(t *testing.T) { "Knative-Foo": []string{"baz", "qux"}, // X-Ot-Foo will pass as a prefix match. "X-Ot-Foo": []string{"haden"}, + // knative-importer will be canonicalized. + "knative-importer": []string{"exporter"}, }, }, expectedHeaders: http.Header{ @@ -222,6 +224,8 @@ func TestReceiver(t *testing.T) { "Knative-Foo": []string{"baz", "qux"}, // X-Ot-Foo will pass as a prefix match. "X-Ot-Foo": []string{"haden"}, + // Passes prefix match, but gets case changed. + "Knative-Importer": []string{"exporter"}, }, expectedDispatch: true, returnedEvent: makeDifferentEvent(), diff --git a/pkg/broker/ttl.go b/pkg/broker/ttl.go index d71f425b8a7..ee7db26e8b3 100644 --- a/pkg/broker/ttl.go +++ b/pkg/broker/ttl.go @@ -23,7 +23,9 @@ import ( const ( // V02TTLAttribute is the name of the CloudEvents 0.2 extension attribute used to store the // Broker's TTL (number of times a single event can reply through a Broker continuously). - V02TTLAttribute = "knativebrokerttl" + // Extensions should be formatted as described in net/textproto.CanonicalMIMEHeaderKey, as + // they may have their case changed in transit. + V02TTLAttribute = "Knativebrokerttl" ) // SetTTL sets the TTL into the EventContext. ttl should be a positive integer. From 48bb3d3c77789f4878b60c71f3951e1c7ad31034 Mon Sep 17 00:00:00 2001 From: Mike Helmick Date: Mon, 24 Jun 2019 10:20:14 -0700 Subject: [PATCH 2/3] Treat the broker TTL attribute as case insensitive, in line with how cloudevents attributes should be handled. See https://github.com/cloudevents/spec/pull/321 for context. --- pkg/broker/receiver.go | 6 +++--- pkg/broker/ttl.go | 18 +++++++++++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index 315bd57d3d8..0f2dc0f985e 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -134,8 +134,8 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp // Remove the TTL attribute that is used by the Broker. originalV2 := event.Context.AsV02() - ttl, present := originalV2.Extensions[V02TTLAttribute] - if !present { + ttl, ttlKey := GetTTL(event.Context) + if ttl == nil { // Only messages sent by the Broker should be here. If the attribute isn't here, then the // event wasn't sent by the Broker, so we can drop it. r.logger.Warn("No TTL seen, dropping", zap.Any("triggerRef", triggerRef), zap.Any("event", event)) @@ -144,7 +144,7 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp // framework returns a 500 to the caller, so the Channel would send this repeatedly. return nil } - delete(originalV2.Extensions, V02TTLAttribute) + delete(originalV2.Extensions, ttlKey) event.Context = originalV2 r.logger.Debug("Received message", zap.Any("triggerRef", triggerRef)) diff --git a/pkg/broker/ttl.go b/pkg/broker/ttl.go index ee7db26e8b3..f84e17a6129 100644 --- a/pkg/broker/ttl.go +++ b/pkg/broker/ttl.go @@ -17,17 +17,29 @@ package broker import ( + "strings" + cloudevents "github.com/cloudevents/sdk-go" ) const ( // V02TTLAttribute is the name of the CloudEvents 0.2 extension attribute used to store the // Broker's TTL (number of times a single event can reply through a Broker continuously). - // Extensions should be formatted as described in net/textproto.CanonicalMIMEHeaderKey, as - // they may have their case changed in transit. - V02TTLAttribute = "Knativebrokerttl" + V02TTLAttribute = "knativebrokerttl" ) +// GetTTL finds the TTL in the EventContext using a case insensitive comparison +// for the key. The second return param, is the case preserved key that matched. +// Depending on the encoding/transport, the extension case could be changed. +func GetTTL(ctx cloudevents.EventContext) (interface{}, string) { + for k, v := range ctx.AsV02().Extensions { + if lower := strings.ToLower(k); lower == V02TTLAttribute { + return v, k + } + } + return nil, V02TTLAttribute +} + // SetTTL sets the TTL into the EventContext. ttl should be a positive integer. func SetTTL(ctx cloudevents.EventContext, ttl interface{}) (cloudevents.EventContext, error) { err := ctx.SetExtension(V02TTLAttribute, ttl) From 879365b7f834f570b32ec713842bd60e927bafc9 Mon Sep 17 00:00:00 2001 From: Mike Helmick Date: Mon, 24 Jun 2019 10:27:41 -0700 Subject: [PATCH 3/3] Remove unnecessary test change. --- pkg/broker/receiver_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index eb349caa6a0..5a473570717 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -209,8 +209,6 @@ func TestReceiver(t *testing.T) { "Knative-Foo": []string{"baz", "qux"}, // X-Ot-Foo will pass as a prefix match. "X-Ot-Foo": []string{"haden"}, - // knative-importer will be canonicalized. - "knative-importer": []string{"exporter"}, }, }, expectedHeaders: http.Header{ @@ -224,8 +222,6 @@ func TestReceiver(t *testing.T) { "Knative-Foo": []string{"baz", "qux"}, // X-Ot-Foo will pass as a prefix match. "X-Ot-Foo": []string{"haden"}, - // Passes prefix match, but gets case changed. - "Knative-Importer": []string{"exporter"}, }, expectedDispatch: true, returnedEvent: makeDifferentEvent(),