diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index c76cfab3049..89771f398ae 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -24,7 +24,7 @@ import ( "go.uber.org/zap/zapcore" "k8s.io/client-go/kubernetes" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/configmap" @@ -78,19 +78,19 @@ func main() { // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - receiver, err := broker.New(logger, mgr.GetClient()) + handler, err := filter.NewHandler(logger, mgr.GetClient()) if err != nil { - logger.Fatal("Error creating Receiver", zap.Error(err)) + logger.Fatal("Error creating Handler", zap.Error(err)) } - err = mgr.Add(receiver) + err = mgr.Add(handler) if err != nil { - logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("broker_receiver", receiver)) + logger.Fatal("Unable to start the handler", zap.Error(err), zap.Any("broker_filter", handler)) } // TODO watch logging config map. // Watch the observability config map and dynamically update metrics exporter. - configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_receiver", logger.Sugar())) + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_filter", logger.Sugar())) // Set up signals so we handle the first shutdown signal gracefully. stopCh := signals.SetupSignalHandler() diff --git a/pkg/broker/context.go b/pkg/broker/context.go index fd336f5123e..4b768c869ca 100644 --- a/pkg/broker/context.go +++ b/pkg/broker/context.go @@ -50,7 +50,7 @@ var ( func SendingContext(ctx context.Context, tctx cloudevents.HTTPTransportContext, targetURI *url.URL) context.Context { sendingCTX := cloudevents.ContextWithTarget(ctx, targetURI.String()) - h := extractPassThroughHeaders(tctx) + h := ExtractPassThroughHeaders(tctx) for n, v := range h { for _, iv := range v { sendingCTX = cloudevents.ContextWithHeader(sendingCTX, n, iv) @@ -60,7 +60,9 @@ func SendingContext(ctx context.Context, tctx cloudevents.HTTPTransportContext, return sendingCTX } -func extractPassThroughHeaders(tctx cloudevents.HTTPTransportContext) http.Header { +// ExtractPassThroughHeaders extracts the headers that are in the `forwardHeaders` set +// or has any of the prefixes in `forwardPrefixes`. +func ExtractPassThroughHeaders(tctx cloudevents.HTTPTransportContext) http.Header { h := http.Header{} for n, v := range tctx.Header { diff --git a/pkg/broker/receiver.go b/pkg/broker/filter/filter_handler.go similarity index 85% rename from pkg/broker/receiver.go rename to pkg/broker/filter/filter_handler.go index 60c946853f3..9c9ae63c0c0 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/filter/filter_handler.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package broker +package filter import ( "context" @@ -33,6 +33,7 @@ import ( "go.opencensus.io/tag" "go.uber.org/zap" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/reconciler/trigger/path" "knative.dev/pkg/tracing" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,22 +41,18 @@ import ( const ( writeTimeout = 1 * time.Minute - // TimeInFlightMetadataName is used to access the metadata stored on a - // CloudEvent to measure the time difference between when an event is - // received and when it is dispatched to the trigger function. - TimeInFlightMetadataName = "kn00timeinflight" ) -// Receiver parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. -type Receiver struct { +// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. +type Handler struct { logger *zap.Logger client client.Client ceClient cloudevents.Client } -// New creates a new Receiver and its associated MessageReceiver. The caller is responsible for -// Start()ing the returned MessageReceiver. -func New(logger *zap.Logger, client client.Client) (*Receiver, error) { +// NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for +// Start()ing the returned Handler. +func NewHandler(logger *zap.Logger, client client.Client) (*Handler, error) { httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(tracing.HTTPSpanMiddleware)) if err != nil { return nil, err @@ -80,7 +77,7 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, error) { return nil, err } - r := &Receiver{ + r := &Handler{ logger: logger, client: client, ceClient: ceClient, @@ -95,7 +92,7 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, error) { } // Initialize the client. Mainly intended to load stuff in its cache. -func (r *Receiver) initClient() error { +func (r *Handler) initClient() error { // We list triggers so that we do not drop messages. Otherwise, on receiving an event, it // may not find the Trigger and would return an error. opts := &client.ListOptions{} @@ -106,14 +103,14 @@ func (r *Receiver) initClient() error { return nil } -// Start begins to receive messages for the receiver. +// Start begins to receive messages for the handler. // // Only HTTP POST requests to the root path (/) are accepted. If other paths or // methods are needed, use the HandleRequest method directly with another HTTP // server. // // This method will block until a message is received on the stop channel. -func (r *Receiver) Start(stopCh <-chan struct{}) error { +func (r *Handler) Start(stopCh <-chan struct{}) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -141,7 +138,7 @@ func (r *Receiver) Start(stopCh <-chan struct{}) error { } } -func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { +func (r *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { tctx := cloudevents.HTTPTransportContextFrom(ctx) if tctx.Method != http.MethodPost { resp.Status = http.StatusMethodNotAllowed @@ -157,7 +154,7 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp // Remove the TTL attribute that is used by the Broker. originalV3 := event.Context.AsV03() - ttl, ttlKey := GetTTL(event.Context) + ttl, ttlKey := broker.GetTTL(event.Context) if ttl == nil { // Only messages sent by the Broker should be here. If the attribute isn't here, then the // event wasn't sent by the Broker, so we can drop it. @@ -184,20 +181,20 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp } // Reattach the TTL (with the same value) to the response event before sending it to the Broker. - responseEvent.Context, err = SetTTL(responseEvent.Context, ttl) + responseEvent.Context, err = broker.SetTTL(responseEvent.Context, ttl) if err != nil { return err } resp.Event = responseEvent resp.Context = &cloudevents.HTTPTransportResponseContext{ - Header: extractPassThroughHeaders(tctx), + Header: broker.ExtractPassThroughHeaders(tctx), } return nil } // sendEvent sends an event to a subscriber if the trigger filter passes. -func (r *Receiver) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportContext, trigger path.NamespacedNameUID, event *cloudevents.Event) (*cloudevents.Event, error) { +func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportContext, trigger path.NamespacedNameUID, event *cloudevents.Event) (*cloudevents.Event, error) { t, err := r.getTrigger(ctx, trigger) if err != nil { r.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", trigger)) @@ -217,7 +214,7 @@ func (r *Receiver) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransport dispatchTimeMS := int64(now.Sub(startTS) / time.Millisecond) stats.Record(ctx, MeasureTriggerDispatchTime.M(dispatchTimeMS)) stats.Record(ctx, MeasureTriggerEventsTotal.M(1)) - if err := event.ExtensionAs(TimeInFlightMetadataName, &deliveryTime); err != nil { + if err := event.ExtensionAs(broker.TimeInFlightMetadataName, &deliveryTime); err != nil { return } timeInFlightMS := int64(now.Sub(deliveryTime) / time.Millisecond) @@ -244,7 +241,7 @@ func (r *Receiver) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransport return nil, nil } - sendingCTX := SendingContext(ctx, tctx, subscriberURI) + sendingCTX := broker.SendingContext(ctx, tctx, subscriberURI) replyEvent, err := r.ceClient.Send(sendingCTX, *event) if err == nil { ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "accept")) @@ -254,7 +251,7 @@ func (r *Receiver) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransport return replyEvent, err } -func (r *Receiver) getTrigger(ctx context.Context, ref path.NamespacedNameUID) (*eventingv1alpha1.Trigger, error) { +func (r *Handler) getTrigger(ctx context.Context, ref path.NamespacedNameUID) (*eventingv1alpha1.Trigger, error) { t := &eventingv1alpha1.Trigger{} err := r.client.Get(ctx, ref.NamespacedName, t) if err != nil { @@ -270,7 +267,7 @@ func (r *Receiver) getTrigger(ctx context.Context, ref path.NamespacedNameUID) ( // Currently it supports exact matching on event context attributes. // If no filter is present, shouldSendMessage returns false. // If no filter strategy is present, shouldSendMessage returns true. -func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool { +func (r *Handler) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool { if ts.Filter == nil { r.logger.Error("No filter specified") ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail")) @@ -309,7 +306,7 @@ func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.T return result } -func (r *Receiver) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool { +func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool { // Set standard context attributes. The attributes available may not be // exactly the same as the attributes defined in the current version of the // CloudEvents spec. diff --git a/pkg/broker/receiver_test.go b/pkg/broker/filter/filter_handler_test.go similarity index 98% rename from pkg/broker/receiver_test.go rename to pkg/broker/filter/filter_handler_test.go index 22e6694b177..c56308f0e24 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package broker +package filter import ( "context" @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/broker" controllertesting "knative.dev/eventing/pkg/reconciler/testing" "knative.dev/eventing/pkg/utils" "sigs.k8s.io/controller-runtime/pkg/client" @@ -290,7 +291,7 @@ func TestReceiver(t *testing.T) { correctURI = append(correctURI, trig) } - r, err := New( + r, err := NewHandler( zap.NewNop(), getClient(correctURI, tc.mocks)) if tc.expectNewToFail { @@ -365,7 +366,7 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { h.requestReceived = true for n, v := range h.headers { - if strings.Contains(strings.ToLower(n), strings.ToLower(v03TTLAttribute)) { + if strings.Contains(strings.ToLower(n), strings.ToLower(broker.V03TTLAttribute)) { h.t.Errorf("Broker TTL should not be seen by the subscriber: %s", n) } if diff := cmp.Diff(v, req.Header[n]); diff != "" { @@ -494,7 +495,7 @@ func makeEvent() *cloudevents.Event { } func addTTLToEvent(e cloudevents.Event) cloudevents.Event { - e.Context, _ = SetTTL(e.Context, 1) + e.Context, _ = broker.SetTTL(e.Context, 1) return e } diff --git a/pkg/broker/filter/metrics.go b/pkg/broker/filter/metrics.go new file mode 100644 index 00000000000..cf1a0bf189a --- /dev/null +++ b/pkg/broker/filter/metrics.go @@ -0,0 +1,112 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + utils "knative.dev/eventing/pkg/broker" +) + +var ( + // MeasureTriggerEventsTotal is a counter which records the number of events received + // by a Trigger. + MeasureTriggerEventsTotal = stats.Int64( + "knative.dev/eventing/trigger/measures/events_total", + "Total number of events received by a Trigger", + stats.UnitNone, + ) + + // MeasureTriggerDispatchTime records the time spent dispatching an event for + // a Trigger, in milliseconds. + MeasureTriggerDispatchTime = stats.Int64( + "knative.dev/eventing/trigger/measures/dispatch_time", + "Time spent dispatching an event to a Trigger", + stats.UnitMilliseconds, + ) + + // MeasureTriggerFilterTime records the time spent filtering a message for a + // Trigger, in milliseconds. + MeasureTriggerFilterTime = stats.Int64( + "knative.dev/eventing/trigger/measures/filter_time", + "Time spent filtering a message for a Trigger", + stats.UnitMilliseconds, + ) + + // MeasureDeliveryTime records the time spent between arrival at ingress + // and delivery to the trigger subscriber. + MeasureDeliveryTime = stats.Int64( + "knative.dev/eventing/trigger/measures/delivery_time", + "Time between an event arriving at ingress and delivery to the trigger subscriber", + stats.UnitMilliseconds, + ) + + // Tag keys must conform to the restrictions described in + // go.opencensus.io/tag/validate.go. Currently those restrictions are: + // - length between 1 and 255 inclusive + // - characters are printable US-ASCII + + // TagResult is a tag key referring to the observed result of an operation. + TagResult = utils.MustNewTagKey("result") + + // TagFilterResult is a tag key referring to the observed result of a filter + // operation. + TagFilterResult = utils.MustNewTagKey("filter_result") + + // TagBroker is a tag key referring to the Broker name serviced by this + // filter process. + TagBroker = utils.MustNewTagKey("broker") + + // TagTrigger is a tag key referring to the Trigger name serviced by this + // filter process. + TagTrigger = utils.MustNewTagKey("trigger") +) + +func init() { + // Create views for exporting measurements. This returns an error if a + // previously registered view has the same name with a different value. + err := view.Register( + &view.View{ + Name: "trigger_events_total", + Measure: MeasureTriggerEventsTotal, + Aggregation: view.Count(), + TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, + }, + &view.View{ + Name: "trigger_dispatch_time", + Measure: MeasureTriggerDispatchTime, + Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100, + TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, + }, + &view.View{ + Name: "trigger_filter_time", + Measure: MeasureTriggerFilterTime, + Aggregation: view.Distribution(utils.Buckets125(0.1, 10)...), // 0.1, 0.2, 0.5, 1, 2, 5, 10 + TagKeys: []tag.Key{TagResult, TagFilterResult, TagBroker, TagTrigger}, + }, + &view.View{ + Name: "broker_to_function_delivery_time", + Measure: MeasureDeliveryTime, + Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 + TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, + }, + ) + if err != nil { + panic(err) + } +} diff --git a/pkg/broker/ingress/metrics.go b/pkg/broker/ingress/metrics.go index 553598a7bfa..67e266fe78e 100644 --- a/pkg/broker/ingress/metrics.go +++ b/pkg/broker/ingress/metrics.go @@ -20,7 +20,7 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - "knative.dev/eventing/pkg/broker" + utils "knative.dev/eventing/pkg/broker" ) var ( @@ -47,11 +47,11 @@ var ( // - characters are printable US-ASCII // TagResult is a tag key referring to the observed result of an operation. - TagResult = mustNewTagKey("result") + TagResult = utils.MustNewTagKey("result") // TagBroker is a tag key referring to the Broker name serviced by this // ingress process. - TagBroker = mustNewTagKey("broker") + TagBroker = utils.MustNewTagKey("broker") ) func init() { @@ -67,7 +67,7 @@ func init() { &view.View{ Name: "broker_dispatch_time", Measure: MeasureDispatchTime, - Aggregation: view.Distribution(broker.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 + Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 TagKeys: []tag.Key{TagResult, TagBroker}, }, ) @@ -75,14 +75,3 @@ func init() { panic(err) } } - -// mustNewTagKey creates a Tag or panics. This will only fail if the tag key -// doesn't conform to tag name validations. -// TODO OC library should provide this -func mustNewTagKey(k string) tag.Key { - tagKey, err := tag.NewKey(k) - if err != nil { - panic(err) - } - return tagKey -} diff --git a/pkg/broker/metrics.go b/pkg/broker/metrics.go index 376d6b101d3..9e3b7df4d8d 100644 --- a/pkg/broker/metrics.go +++ b/pkg/broker/metrics.go @@ -16,104 +16,12 @@ package broker -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" -) +import "go.opencensus.io/tag" -var ( - // MeasureTriggerEventsTotal is a counter which records the number of events received - // by a Trigger. - MeasureTriggerEventsTotal = stats.Int64( - "knative.dev/eventing/trigger/measures/events_total", - "Total number of events received by a Trigger", - stats.UnitNone, - ) - - // MeasureTriggerDispatchTime records the time spent dispatching an event for - // a Trigger, in milliseconds. - MeasureTriggerDispatchTime = stats.Int64( - "knative.dev/eventing/trigger/measures/dispatch_time", - "Time spent dispatching an event to a Trigger", - stats.UnitMilliseconds, - ) - - // MeasureTriggerFilterTime records the time spent filtering a message for a - // Trigger, in milliseconds. - MeasureTriggerFilterTime = stats.Int64( - "knative.dev/eventing/trigger/measures/filter_time", - "Time spent filtering a message for a Trigger", - stats.UnitMilliseconds, - ) - - // MeasureDeliveryTime records the time spent between arrival at ingress - // and delivery to the trigger subscriber. - MeasureDeliveryTime = stats.Int64( - "knative.dev/eventing/trigger/measures/delivery_time", - "Time between an event arriving at ingress and delivery to the trigger subscriber", - stats.UnitMilliseconds, - ) - - // Tag keys must conform to the restrictions described in - // go.opencensus.io/tag/validate.go. Currently those restrictions are: - // - length between 1 and 255 inclusive - // - characters are printable US-ASCII - - // TagResult is a tag key referring to the observed result of an operation. - TagResult = mustNewTagKey("result") - - // TagFilterResult is a tag key referring to the observed result of a filter - // operation. - TagFilterResult = mustNewTagKey("filter_result") - - // TagBroker is a tag key referring to the Broker name serviced by this - // filter process. - TagBroker = mustNewTagKey("broker") - - // TagTrigger is a tag key referring to the Trigger name serviced by this - // filter process. - TagTrigger = mustNewTagKey("trigger") -) - -func init() { - // Create views for exporting measurements. This returns an error if a - // previously registered view has the same name with a different value. - err := view.Register( - &view.View{ - Name: "trigger_events_total", - Measure: MeasureTriggerEventsTotal, - Aggregation: view.Count(), - TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, - }, - &view.View{ - Name: "trigger_dispatch_time", - Measure: MeasureTriggerDispatchTime, - Aggregation: view.Distribution(Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100, - TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, - }, - &view.View{ - Name: "trigger_filter_time", - Measure: MeasureTriggerFilterTime, - Aggregation: view.Distribution(Buckets125(0.1, 10)...), // 0.1, 0.2, 0.5, 1, 2, 5, 10 - TagKeys: []tag.Key{TagResult, TagFilterResult, TagBroker, TagTrigger}, - }, - &view.View{ - Name: "broker_to_function_delivery_time", - Measure: MeasureDeliveryTime, - Aggregation: view.Distribution(Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 - TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, - }, - ) - if err != nil { - panic(err) - } -} - -// mustNewTagKey creates a Tag or panics. This will only fail if the tag key +// MustNewTagKey creates a Tag or panics. This will only fail if the tag key // doesn't conform to tag name validations. // TODO OC library should provide this -func mustNewTagKey(k string) tag.Key { +func MustNewTagKey(k string) tag.Key { tagKey, err := tag.NewKey(k) if err != nil { panic(err) diff --git a/pkg/broker/ttl.go b/pkg/broker/ttl.go index d5c42881f41..ad6e459896f 100644 --- a/pkg/broker/ttl.go +++ b/pkg/broker/ttl.go @@ -23,10 +23,16 @@ import ( ) const ( - // v03TTLAttribute is the name of the CloudEvents 0.3 extension attribute used to store the + // V03TTLAttribute is the name of the CloudEvents 0.3 extension attribute used to store the // Broker's TTL (number of times a single event can reply through a Broker continuously). All // interactions with the attribute should be done through the GetTTL and SetTTL functions. - v03TTLAttribute = "knativebrokerttl" + V03TTLAttribute = "knativebrokerttl" + + // TODO rename this extension, and place it somewhere else. + // TimeInFlightMetadataName is used to access the metadata stored on a + // CloudEvent to measure the time difference between when an event is + // received and when it is dispatched to the trigger function. + TimeInFlightMetadataName = "kn00timeinflight" ) // GetTTL finds the TTL in the EventContext using a case insensitive comparison @@ -34,15 +40,15 @@ const ( // Depending on the encoding/transport, the extension case could be changed. func GetTTL(ctx cloudevents.EventContext) (interface{}, string) { for k, v := range ctx.AsV03().Extensions { - if lower := strings.ToLower(k); lower == v03TTLAttribute { + if lower := strings.ToLower(k); lower == V03TTLAttribute { return v, k } } - return nil, v03TTLAttribute + return nil, V03TTLAttribute } // SetTTL sets the TTL into the EventContext. ttl should be a positive integer. func SetTTL(ctx cloudevents.EventContext, ttl interface{}) (cloudevents.EventContext, error) { - err := ctx.SetExtension(v03TTLAttribute, ttl) + err := ctx.SetExtension(V03TTLAttribute, ttl) return ctx, err }