From 66265c1cbc3b1563131fc3eabf264377cbf522a8 Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 26 Aug 2019 08:35:18 -0700 Subject: [PATCH 1/4] Stats reporter refactor --- cmd/broker/filter/main.go | 7 +- cmd/broker/ingress/main.go | 7 + pkg/broker/filter/filter_handler.go | 135 ++++++----- pkg/broker/filter/filter_handler_test.go | 23 +- pkg/broker/filter/metrics.go | 112 --------- pkg/broker/filter/stats_reporter.go | 270 +++++++++++++++++++++ pkg/broker/filter/stats_reporter_test.go | 154 ++++++++++++ pkg/broker/ingress/ingress_handler.go | 45 ++-- pkg/broker/ingress/ingress_handler_test.go | 16 ++ pkg/broker/ingress/metrics.go | 77 ------ pkg/broker/ingress/stats_reporter.go | 160 ++++++++++++ pkg/broker/ingress/stats_reporter_test.go | 86 +++++++ pkg/broker/metrics.go | 26 +- pkg/broker/ttl.go | 6 - pkg/metrics/metricskey/constants.go | 85 +++++++ 15 files changed, 908 insertions(+), 301 deletions(-) delete mode 100644 pkg/broker/filter/metrics.go create mode 100644 pkg/broker/filter/stats_reporter.go create mode 100644 pkg/broker/filter/stats_reporter_test.go delete mode 100644 pkg/broker/ingress/metrics.go create mode 100644 pkg/broker/ingress/stats_reporter.go create mode 100644 pkg/broker/ingress/stats_reporter_test.go create mode 100644 pkg/metrics/metricskey/constants.go diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 89771f398ae..392d3f3ed70 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -76,9 +76,14 @@ func main() { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } + reporter, err := filter.NewStatsReporter() + if err != nil { + logger.Fatal("Error creating stats reporter", zap.Error(err)) + } + // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - handler, err := filter.NewHandler(logger, mgr.GetClient()) + handler, err := filter.NewHandler(logger, mgr.GetClient(), reporter) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) } diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 3743eee84a9..81022c07b83 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -104,11 +104,18 @@ func main() { if err != nil { logger.Fatal("Unable to create CE client", zap.Error(err)) } + reporter, err := ingress.NewStatsReporter() + if err != nil { + logger.Fatal("Unable to create StatsReporter", zap.Error(err)) + } + h := &ingress.Handler{ Logger: logger, CeClient: ceClient, ChannelURI: channelURI, BrokerName: env.Broker, + Namespace: env.Namespace, + Reporter: reporter, } // Run the event handler with the manager. diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 9c9ae63c0c0..a6c24abfd40 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -25,12 +25,8 @@ import ( "sync/atomic" "time" - "knative.dev/eventing/pkg/logging" - - cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.uber.org/zap" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/broker" @@ -48,11 +44,12 @@ type Handler struct { logger *zap.Logger client client.Client ceClient cloudevents.Client + reporter StatsReporter } // NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for // Start()ing the returned Handler. -func NewHandler(logger *zap.Logger, client client.Client) (*Handler, error) { +func NewHandler(logger *zap.Logger, client client.Client, reporter StatsReporter) (*Handler, error) { httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(tracing.HTTPSpanMiddleware)) if err != nil { return nil, err @@ -81,6 +78,7 @@ func NewHandler(logger *zap.Logger, client client.Client) (*Handler, error) { logger: logger, client: client, ceClient: ceClient, + reporter: reporter, } err = r.initClient() if err != nil { @@ -201,52 +199,56 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC return nil, err } - // Set up the metrics context - ctx, _ = tag.New(ctx, - tag.Insert(TagTrigger, trigger.String()), - tag.Insert(TagBroker, fmt.Sprintf("%s/%s", trigger.Namespace, t.Spec.Broker)), - ) - // Record event count and filtering time - startTS := time.Now() - defer func() { - var deliveryTime time.Time - now := time.Now() - dispatchTimeMS := int64(now.Sub(startTS) / time.Millisecond) - stats.Record(ctx, MeasureTriggerDispatchTime.M(dispatchTimeMS)) - stats.Record(ctx, MeasureTriggerEventsTotal.M(1)) - if err := event.ExtensionAs(broker.TimeInFlightMetadataName, &deliveryTime); err != nil { - return - } - timeInFlightMS := int64(now.Sub(deliveryTime) / time.Millisecond) - stats.Record(ctx, MeasureDeliveryTime.M(timeInFlightMS)) - }() + reportArgs := &ReportArgs{ + ns: t.Namespace, + trigger: t.Name, + broker: t.Spec.Broker, + eventType: triggerFilterAttribute(t.Spec.Filter, "type"), + eventSource: triggerFilterAttribute(t.Spec.Filter, "source"), + } subscriberURIString := t.Status.SubscriberURI if subscriberURIString == "" { - ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "error")) - return nil, errors.New("unable to read subscriberURI") + err = errors.New("unable to read subscriberURI") + // Record the event count. + r.reporter.ReportEventCount(reportArgs, err) + return nil, err } // We could just send the request to this URI regardless, but let's just check to see if it well // formed first, that way we can generate better error message if it isn't. subscriberURI, err := url.Parse(subscriberURIString) if err != nil { r.logger.Error("Unable to parse subscriberURI", zap.Error(err), zap.String("subscriberURIString", subscriberURIString)) - ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "error")) + // Record the event count. + r.reporter.ReportEventCount(reportArgs, err) return nil, err } - if !r.shouldSendMessage(ctx, &t.Spec, event) { - r.logger.Debug("Message did not pass filter", zap.Any("triggerRef", trigger)) - ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "drop")) + // Check if the event should be sent, and record filtering time. + start := time.Now() + pass, filterResult := r.shouldSendEvent(ctx, &t.Spec, event) + r.reporter.ReportFilterTime(reportArgs, filterResult, time.Since(start)) + + if !pass { + r.logger.Debug("Event did not pass filter", zap.Any("triggerRef", trigger)) + // Record the event count. + r.reporter.ReportEventCount(reportArgs, errors.New("event did not pass filter")) return nil, nil } + start = time.Now() sendingCTX := broker.SendingContext(ctx, tctx, subscriberURI) + // TODO get HTTP status codes and use those. replyEvent, err := r.ceClient.Send(sendingCTX, *event) - if err == nil { - ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "accept")) - } else { - ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "error")) + // Record the dispatch time. + r.reporter.ReportDispatchTime(reportArgs, err, time.Since(start)) + // Record the event count. + r.reporter.ReportEventCount(reportArgs, err) + // Record the event latency. This might be off if the receiver and the filter pods are running in + // different nodes with different clocks. + var arrivalTime time.Time + if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTime); extErr == nil { + r.reporter.ReportEventDeliveryTime(reportArgs, err, time.Since(arrivalTime)) } return replyEvent, err } @@ -263,28 +265,13 @@ func (r *Handler) getTrigger(ctx context.Context, ref path.NamespacedNameUID) (* return t, nil } -// shouldSendMessage determines whether message 'm' should be sent based on the triggerSpec 'ts'. -// Currently it supports exact matching on event context attributes. -// If no filter is present, shouldSendMessage returns false. -// If no filter strategy is present, shouldSendMessage returns true. -func (r *Handler) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool { - if ts.Filter == nil { - r.logger.Error("No filter specified") - ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail")) - return false - } - - // Record event count and filtering time. - startTS := time.Now() - defer func() { - filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond) - stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS)) - }() - +// shouldSendEvent determines whether event 'event' should be sent based on the triggerSpec 'ts'. +// Currently it supports exact matching on event context attributes and extension attributes. +// If no filter is present, shouldSendEvent returns true. +func (r *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) (bool, string) { // No filter specified, default to passing everything. - if ts.Filter.DeprecatedSourceAndType == nil && ts.Filter.Attributes == nil { - ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-pass")) - return true + if ts.Filter == nil || (ts.Filter.DeprecatedSourceAndType == nil && ts.Filter.Attributes == nil) { + return true, "no_filter" } attrs := map[string]string{} @@ -297,16 +284,15 @@ func (r *Handler) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.Tr attrs = map[string]string(*ts.Filter.Attributes) } - result := r.filterEventByAttributes(ctx, attrs, event) + result := r.filterEventByAttributes(attrs, event) + resultStr := "fail" if result { - ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass")) - } else { - ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail")) + resultStr = "pass" } - return result + return result, resultStr } -func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool { +func (r *Handler) filterEventByAttributes(attrs map[string]string, event *cloudevents.Event) bool { // Set standard context attributes. The attributes available may not be // exactly the same as the attributes defined in the current version of the // CloudEvents spec. @@ -334,14 +320,35 @@ func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string] value, ok := ce[k] // If the attribute does not exist in the event, return false. if !ok { - logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) + r.logger.Debug("Attribute not found", zap.String("attribute", k)) return false } // If the attribute is not set to any and is different than the one from the event, return false. if v != eventingv1alpha1.TriggerAnyFilter && v != value { - logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) + r.logger.Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) return false } } return true } + +// triggerFilterAttribute returns the filter attribute value for a given `attributeName`. If it doesn't not exist, +// returns the any value filter. +func triggerFilterAttribute(filter *eventingv1alpha1.TriggerFilter, attributeName string) string { + attributeValue := eventingv1alpha1.TriggerAnyFilter + if filter != nil { + if filter.DeprecatedSourceAndType != nil { + if attributeName == "type" { + attributeValue = filter.DeprecatedSourceAndType.Type + } else if attributeName == "source" { + attributeValue = filter.DeprecatedSourceAndType.Source + } + } else if filter.Attributes != nil { + attrs := map[string]string(*filter.Attributes) + if v, ok := attrs[attributeName]; ok { + attributeValue = v + } + } + } + return attributeValue +} diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index c56308f0e24..08832714ac6 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -25,6 +25,7 @@ import ( "net/url" "strings" "testing" + "time" cloudevents "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" @@ -147,6 +148,7 @@ func TestReceiver(t *testing.T) { triggers: []*eventingv1alpha1.Trigger{ makeTriggerWithoutFilter(), }, + expectedDispatch: true, }, "No TTL": { triggers: []*eventingv1alpha1.Trigger{ @@ -293,7 +295,8 @@ func TestReceiver(t *testing.T) { r, err := NewHandler( zap.NewNop(), - getClient(correctURI, tc.mocks)) + getClient(correctURI, tc.mocks), + &mockReporter{}) if tc.expectNewToFail { if err == nil { t.Fatal("Expected New to fail, it didn't") @@ -354,6 +357,24 @@ func TestReceiver(t *testing.T) { } } +type mockReporter struct{} + +func (r *mockReporter) ReportEventCount(args *ReportArgs, err error) error { + return nil +} + +func (r *mockReporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { + return nil +} + +func (r *mockReporter) ReportFilterTime(args *ReportArgs, filterResult string, d time.Duration) error { + return nil +} + +func (r *mockReporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error { + return nil +} + type fakeHandler struct { failRequest bool requestReceived bool diff --git a/pkg/broker/filter/metrics.go b/pkg/broker/filter/metrics.go deleted file mode 100644 index cf1a0bf189a..00000000000 --- a/pkg/broker/filter/metrics.go +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2019 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package filter - -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - utils "knative.dev/eventing/pkg/broker" -) - -var ( - // MeasureTriggerEventsTotal is a counter which records the number of events received - // by a Trigger. - MeasureTriggerEventsTotal = stats.Int64( - "knative.dev/eventing/trigger/measures/events_total", - "Total number of events received by a Trigger", - stats.UnitNone, - ) - - // MeasureTriggerDispatchTime records the time spent dispatching an event for - // a Trigger, in milliseconds. - MeasureTriggerDispatchTime = stats.Int64( - "knative.dev/eventing/trigger/measures/dispatch_time", - "Time spent dispatching an event to a Trigger", - stats.UnitMilliseconds, - ) - - // MeasureTriggerFilterTime records the time spent filtering a message for a - // Trigger, in milliseconds. - MeasureTriggerFilterTime = stats.Int64( - "knative.dev/eventing/trigger/measures/filter_time", - "Time spent filtering a message for a Trigger", - stats.UnitMilliseconds, - ) - - // MeasureDeliveryTime records the time spent between arrival at ingress - // and delivery to the trigger subscriber. - MeasureDeliveryTime = stats.Int64( - "knative.dev/eventing/trigger/measures/delivery_time", - "Time between an event arriving at ingress and delivery to the trigger subscriber", - stats.UnitMilliseconds, - ) - - // Tag keys must conform to the restrictions described in - // go.opencensus.io/tag/validate.go. Currently those restrictions are: - // - length between 1 and 255 inclusive - // - characters are printable US-ASCII - - // TagResult is a tag key referring to the observed result of an operation. - TagResult = utils.MustNewTagKey("result") - - // TagFilterResult is a tag key referring to the observed result of a filter - // operation. - TagFilterResult = utils.MustNewTagKey("filter_result") - - // TagBroker is a tag key referring to the Broker name serviced by this - // filter process. - TagBroker = utils.MustNewTagKey("broker") - - // TagTrigger is a tag key referring to the Trigger name serviced by this - // filter process. - TagTrigger = utils.MustNewTagKey("trigger") -) - -func init() { - // Create views for exporting measurements. This returns an error if a - // previously registered view has the same name with a different value. - err := view.Register( - &view.View{ - Name: "trigger_events_total", - Measure: MeasureTriggerEventsTotal, - Aggregation: view.Count(), - TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, - }, - &view.View{ - Name: "trigger_dispatch_time", - Measure: MeasureTriggerDispatchTime, - Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100, - TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, - }, - &view.View{ - Name: "trigger_filter_time", - Measure: MeasureTriggerFilterTime, - Aggregation: view.Distribution(utils.Buckets125(0.1, 10)...), // 0.1, 0.2, 0.5, 1, 2, 5, 10 - TagKeys: []tag.Key{TagResult, TagFilterResult, TagBroker, TagTrigger}, - }, - &view.View{ - Name: "broker_to_function_delivery_time", - Measure: MeasureDeliveryTime, - Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 - TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger}, - }, - ) - if err != nil { - panic(err) - } -} diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go new file mode 100644 index 00000000000..ac50810874a --- /dev/null +++ b/pkg/broker/filter/stats_reporter.go @@ -0,0 +1,270 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter + +import ( + "context" + "fmt" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + utils "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics" + "time" +) + +var ( + // eventCountM is a counter which records the number of events received + // by a Trigger. + eventCountM = stats.Int64( + "event_count", + "Number of events received by a Trigger", + stats.UnitDimensionless, + ) + + // dispatchTimeInMsecM records the time spent dispatching an event to + // a Trigger subscriber, in milliseconds. + dispatchTimeInMsecM = stats.Float64( + "dispatch_latencies", + "The time spent dispatching an event to a Trigger subscriber", + stats.UnitMilliseconds, + ) + + // filterTimeInMsecM records the time spent filtering an event for a + // Trigger, in milliseconds. + filterTimeInMsecM = stats.Float64( + "filter_latencies", + "The time spent filtering an event for a Trigger", + stats.UnitMilliseconds, + ) + + // deliveryTimeInMsecM records the time spent between arrival at the Broker + // and delivery to the Trigger subscriber. + deliveryTimeInMsecM = stats.Float64( + "event_latencies", + "The time spent routing an event from a Broker to a Trigger subscriber", + stats.UnitMilliseconds, + ) +) + +type ReportArgs struct { + ns string + trigger string + broker string + eventType string + eventSource string +} + +// StatsReporter defines the interface for sending filter metrics. +type StatsReporter interface { + ReportEventCount(args *ReportArgs, err error) error + ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error + ReportFilterTime(args *ReportArgs, filterResult string, d time.Duration) error + ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error +} + +// Reporter holds cached metric objects to report filter metrics. +type Reporter struct { + initialized bool + namespaceTagKey tag.Key + triggerTagKey tag.Key + brokerTagKey tag.Key + triggerTypeKey tag.Key + triggerSourceKey tag.Key + resultKey tag.Key + filterResultKey tag.Key +} + +// NewStatsReporter creates a reporter that collects and reports filter metrics. +func NewStatsReporter() (*Reporter, error) { + var r = &Reporter{} + + // Create the tag keys that will be used to add tags to our measurements. + nsTag, err := tag.NewKey(metricskey.NamespaceName) + if err != nil { + return nil, err + } + r.namespaceTagKey = nsTag + triggerTag, err := tag.NewKey(metricskey.TriggerName) + if err != nil { + return nil, err + } + r.triggerTagKey = triggerTag + brokerTag, err := tag.NewKey(metricskey.BrokerName) + if err != nil { + return nil, err + } + r.brokerTagKey = brokerTag + triggerTypeTag, err := tag.NewKey(metricskey.TriggerType) + if err != nil { + return nil, err + } + r.triggerTypeKey = triggerTypeTag + triggerSourceKey, err := tag.NewKey(metricskey.TriggerSource) + if err != nil { + return nil, err + } + r.triggerSourceKey = triggerSourceKey + filterResultTag, err := tag.NewKey(metricskey.FilterResult) + if err != nil { + return nil, err + } + r.filterResultKey = filterResultTag + resultTag, err := tag.NewKey(metricskey.Result) + if err != nil { + return nil, err + } + r.resultKey = resultTag + + // Create view to see our measurements. + err = view.Register( + &view.View{ + Description: eventCountM.Description(), + Measure: eventCountM, + // TODO count or sum aggregation? + Aggregation: view.Count(), + TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey}, + }, + &view.View{ + Description: dispatchTimeInMsecM.Description(), + Measure: dispatchTimeInMsecM, + Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 + TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey}, + }, + &view.View{ + Description: filterTimeInMsecM.Description(), + Measure: filterTimeInMsecM, + Aggregation: view.Distribution(utils.Buckets125(0.1, 10)...), // 0.1, 0.2, 0.5, 1, 2, 5, 10 + TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.filterResultKey}, + }, + &view.View{ + Description: deliveryTimeInMsecM.Description(), + Measure: deliveryTimeInMsecM, + Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 + TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey}, + }, + ) + if err != nil { + return nil, err + } + + r.initialized = true + return r, nil +} + +// ReportEventCount captures the event count. +func (r *Reporter) ReportEventCount(args *ReportArgs, err error) error { + if !r.initialized { + return fmt.Errorf("StatsReporter is not initialized yet") + } + + // Note that eventType and eventSource can be empty strings, so they need a special treatment. + ctx, err := tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.triggerTagKey, args.trigger), + tag.Insert(r.brokerTagKey, args.broker), + tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), + tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), + tag.Insert(r.resultKey, utils.Result(err))) + if err != nil { + return err + } + + metrics.Record(ctx, eventCountM.M(1)) + return nil +} + +// ReportDispatchTime captures dispatch times. +func (r *Reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { + if !r.initialized { + return fmt.Errorf("StatsReporter is not initialized yet") + } + + // Note that eventType and eventSource can be empty strings, so they need a special treatment. + ctx, err := tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.triggerTagKey, args.trigger), + tag.Insert(r.brokerTagKey, args.broker), + tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), + tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), + tag.Insert(r.resultKey, utils.Result(err))) + if err != nil { + return err + } + + // convert time.Duration in nanoseconds to milliseconds. + metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) + return nil +} + +// ReportFilterTime captures filtering times. +func (r *Reporter) ReportFilterTime(args *ReportArgs, filterResult string, d time.Duration) error { + if !r.initialized { + return fmt.Errorf("StatsReporter is not initialized yet") + } + + // Note that eventType and eventSource can be empty strings, so they need a special treatment. + ctx, err := tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.triggerTagKey, args.trigger), + tag.Insert(r.brokerTagKey, args.broker), + tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), + tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), + tag.Insert(r.filterResultKey, filterResult)) + if err != nil { + return err + } + + // convert time.Duration in nanoseconds to milliseconds. + metrics.Record(ctx, filterTimeInMsecM.M(float64(d/time.Millisecond))) + return nil +} + +// ReportEventDeliveryTime captures event delivery times. +func (r *Reporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error { + if !r.initialized { + return fmt.Errorf("StatsReporter is not initialized yet") + } + + // Note that eventType and eventSource can be empty strings, so they need a special treatment. + ctx, err := tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.triggerTagKey, args.trigger), + tag.Insert(r.brokerTagKey, args.broker), + tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), + tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), + tag.Insert(r.resultKey, utils.Result(err))) + if err != nil { + return err + } + + // convert time.Duration in nanoseconds to milliseconds. + metrics.Record(ctx, deliveryTimeInMsecM.M(float64(d/time.Millisecond))) + return nil +} + +func valueOrAny(v string) string { + if v != "" { + return v + } + return metricskey.Any +} diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go new file mode 100644 index 00000000000..4c0dac26d9b --- /dev/null +++ b/pkg/broker/filter/stats_reporter_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "github.com/pkg/errors" + "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" + "testing" + "time" +) + +// unregister, ehm, unregisters the metrics that were registered, by +// virtue of StatsReporter creation. +// Since golang executes test iterations within the same process, the stats reporter +// returns an error if the metric is already registered and the test panics. +func unregister() { + metricstest.Unregister("event_count", "dispatch_latencies", "filter_latencies", "event_latencies") +} + +func TestStatsReporter(t *testing.T) { + r := &Reporter{} + + args := &ReportArgs{ + ns: "testns", + trigger: "testtrigger", + broker: "testbroker", + eventType: "testeventtype", + eventSource: "testeventsource", + } + if err := r.ReportEventCount(args, errors.New("error")); err == nil { + t.Error("Reporter expected an error for Report call before init. Got success.") + } + + var err error + if r, err = NewStatsReporter(); err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + // Without this `go test ... -count=X`, where X > 1, fails, since + // we get an error about view already being registered. + defer unregister() + + wantTags := map[string]string{ + metricskey.NamespaceName: "testns", + metricskey.TriggerName: "testtrigger", + metricskey.BrokerName: "testbroker", + metricskey.TriggerType: "testeventtype", + metricskey.TriggerSource: "testeventsource", + } + + wantTags1 := map[string]string(wantTags) + wantTags1[metricskey.Result] = "success" + + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + metricstest.CheckCountData(t, "event_count", wantTags1, 2) + + // test ReportDispatchTime + expectSuccess(t, func() error { + return r.ReportDispatchTime(args, nil, 1100*time.Millisecond) + }) + expectSuccess(t, func() error { + return r.ReportDispatchTime(args, nil, 9100*time.Millisecond) + }) + metricstest.CheckDistributionData(t, "dispatch_latencies", wantTags1, 2, 1100.0, 9100.0) + + // test ReportEventDeliveryTime + expectSuccess(t, func() error { + return r.ReportEventDeliveryTime(args, nil, 1000*time.Millisecond) + }) + expectSuccess(t, func() error { + return r.ReportEventDeliveryTime(args, nil, 8000*time.Millisecond) + }) + metricstest.CheckDistributionData(t, "event_latencies", wantTags1, 2, 1000.0, 8000.0) + + wantTags2 := map[string]string(wantTags) + wantTags2[metricskey.FilterResult] = "pass" + + // test ReportFilterTime + expectSuccess(t, func() error { + return r.ReportFilterTime(args, "pass", 100*time.Millisecond) + }) + expectSuccess(t, func() error { + return r.ReportFilterTime(args, "pass", 500*time.Millisecond) + }) + metricstest.CheckDistributionData(t, "filter_latencies", wantTags1, 2, 100.0, 500.0) +} + +func TestReporterEmptySourceAndType(t *testing.T) { + r, err := NewStatsReporter() + defer unregister() + + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + + args := &ReportArgs{ + ns: "testns", + trigger: "testtrigger", + broker: "testbroker", + eventType: "", + eventSource: "", + } + + wantTags := map[string]string{ + metricskey.NamespaceName: "testns", + metricskey.TriggerName: "testtrigger", + metricskey.BrokerName: "testbroker", + metricskey.TriggerType: metricskey.Any, + metricskey.TriggerSource: metricskey.Any, + metricskey.Result: "success", + } + + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 4) +} + +func expectSuccess(t *testing.T, f func() error) { + t.Helper() + if err := f(); err != nil { + t.Errorf("Reporter expected success but got error: %v", err) + } +} diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 8dc0dcba003..4c0f0a6bc4d 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -8,9 +8,7 @@ import ( "reflect" "time" - cloudevents "github.com/cloudevents/sdk-go" - "go.opencensus.io/stats" - "go.opencensus.io/tag" + "github.com/cloudevents/sdk-go" "go.uber.org/zap" "knative.dev/eventing/pkg/broker" ) @@ -26,6 +24,8 @@ type Handler struct { CeClient cloudevents.Client ChannelURI *url.URL BrokerName string + Namespace string + Reporter StatsReporter } func (h *Handler) Start(stopCh <-chan struct{}) error { @@ -57,7 +57,7 @@ func (h *Handler) Start(stopCh <-chan struct{}) error { } func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { - event.SetExtension(broker.TimeInFlightMetadataName, time.Now()) + event.SetExtension(broker.EventArrivalTime, time.Now()) tctx := cloudevents.HTTPTransportContextFrom(ctx) if tctx.Method != http.MethodPost { resp.Status = http.StatusMethodNotAllowed @@ -70,38 +70,27 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp * return nil } - ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.BrokerName)) - defer func() { - stats.Record(ctx, MeasureEventsTotal.M(1)) - }() + reporterArgs := &ReportArgs{ + ns: h.Namespace, + broker: h.BrokerName, + eventType: event.Type(), + } send := h.decrementTTL(&event) if !send { - ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL")) + // Record the event count. + h.Reporter.ReportEventCount(reporterArgs, errors.New("dropped due to TTL")) return nil } - // TODO Filter. - - ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched")) - return h.sendEvent(ctx, tctx, event) -} - -func (h *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportContext, event cloudevents.Event) error { + start := time.Now() sendingCTX := broker.SendingContext(ctx, tctx, h.ChannelURI) - - startTS := time.Now() - defer func() { - dispatchTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond) - stats.Record(sendingCTX, MeasureDispatchTime.M(dispatchTimeMS)) - }() - + // TODO get HTTP status codes and use those. _, err := h.CeClient.Send(sendingCTX, event) - if err != nil { - sendingCTX, _ = tag.New(sendingCTX, tag.Insert(TagResult, "error")) - } else { - sendingCTX, _ = tag.New(sendingCTX, tag.Insert(TagResult, "ok")) - } + // Record the dispatch time. + h.Reporter.ReportDispatchTime(reporterArgs, err, time.Since(start)) + // Record the event count. + h.Reporter.ReportEventCount(reporterArgs, err) return err } diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index 61e7f846005..85162e3a7da 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -5,6 +5,7 @@ import ( nethttp "net/http" "net/url" "testing" + "time" cloudevents "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" @@ -12,6 +13,7 @@ import ( ) const ( + namespace = "testNamespace" brokerName = "testBroker" validURI = "/" urlHost = "testHost" @@ -20,6 +22,16 @@ const ( validHTTPMethod = nethttp.MethodPost ) +type mockReporter struct{} + +func (r *mockReporter) ReportEventCount(args *ReportArgs, err error) error { + return nil +} + +func (r *mockReporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { + return nil +} + type fakeClient struct{ sent bool } func (f *fakeClient) Send(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) { @@ -61,6 +73,8 @@ func TestIngressHandler_ServeHTTP_FAIL(t *testing.T) { Path: urlPath, }, BrokerName: brokerName, + Namespace: namespace, + Reporter: &mockReporter{}, } event := cloudevents.NewEvent() resp := new(cloudevents.EventResponse) @@ -85,6 +99,8 @@ func TestIngressHandler_ServeHTTP_Succeed(t *testing.T) { Path: urlPath, }, BrokerName: brokerName, + Namespace: namespace, + Reporter: &mockReporter{}, } event := cloudevents.NewEvent() resp := new(cloudevents.EventResponse) diff --git a/pkg/broker/ingress/metrics.go b/pkg/broker/ingress/metrics.go deleted file mode 100644 index 67e266fe78e..00000000000 --- a/pkg/broker/ingress/metrics.go +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2019 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ingress - -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - utils "knative.dev/eventing/pkg/broker" -) - -var ( - // MeasureEventsTotal is a counter which records the number of events received - // by the ingress. The value of the Result tag indicates whether the event - // was filtered or dispatched by the ingress. - MeasureEventsTotal = stats.Int64( - "knative.dev/eventing/broker/measures/events_total", - "Total number of events received", - stats.UnitNone, - ) - - // MeasureDispatchTime records the time spent dispatching an event, in - // milliseconds. - MeasureDispatchTime = stats.Int64( - "knative.dev/eventing/broker/measures/dispatch_time", - "Time spent dispatching an event", - stats.UnitMilliseconds, - ) - - // Tag keys must conform to the restrictions described in - // go.opencensus.io/tag/validate.go. Currently those restrictions are: - // - length between 1 and 255 inclusive - // - characters are printable US-ASCII - - // TagResult is a tag key referring to the observed result of an operation. - TagResult = utils.MustNewTagKey("result") - - // TagBroker is a tag key referring to the Broker name serviced by this - // ingress process. - TagBroker = utils.MustNewTagKey("broker") -) - -func init() { - // Create views for exporting measurements. This returns an error if a - // previously registered view has the same name with a different value. - err := view.Register( - &view.View{ - Name: "broker_events_total", - Measure: MeasureEventsTotal, - Aggregation: view.Count(), - TagKeys: []tag.Key{TagResult, TagBroker}, - }, - &view.View{ - Name: "broker_dispatch_time", - Measure: MeasureDispatchTime, - Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 - TagKeys: []tag.Key{TagResult, TagBroker}, - }, - ) - if err != nil { - panic(err) - } -} diff --git a/pkg/broker/ingress/stats_reporter.go b/pkg/broker/ingress/stats_reporter.go new file mode 100644 index 00000000000..a8d22bdb7d0 --- /dev/null +++ b/pkg/broker/ingress/stats_reporter.go @@ -0,0 +1,160 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ingress + +import ( + "context" + "fmt" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + utils "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics" + "time" +) + +var ( + // eventCountM is a counter which records the number of events received + // by the Broker. + eventCountM = stats.Int64( + "event_count", + "Number of events received by a Broker", + stats.UnitDimensionless, + ) + + // dispatchTimeInMsecM records the time spent dispatching an event to + // a Trigger, in milliseconds. + dispatchTimeInMsecM = stats.Float64( + "dispatch_latencies", + "The time spent dispatching an event to a Trigger", + stats.UnitMilliseconds, + ) +) + +type ReportArgs struct { + ns string + broker string + eventType string +} + +// StatsReporter defines the interface for sending ingress metrics. +type StatsReporter interface { + ReportEventCount(args *ReportArgs, err error) error + ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error +} + +// Reporter holds cached metric objects to report ingress metrics. +type Reporter struct { + initialized bool + namespaceTagKey tag.Key + brokerTagKey tag.Key + eventTypeKey tag.Key + // TODO add support for EventSource + resultKey tag.Key +} + +// NewStatsReporter creates a reporter that collects and reports ingress metrics. +func NewStatsReporter() (*Reporter, error) { + var r = &Reporter{} + + // Create the tag keys that will be used to add tags to our measurements. + nsTag, err := tag.NewKey(metricskey.NamespaceName) + if err != nil { + return nil, err + } + r.namespaceTagKey = nsTag + brokerTag, err := tag.NewKey(metricskey.BrokerName) + if err != nil { + return nil, err + } + r.brokerTagKey = brokerTag + eventTypeTag, err := tag.NewKey(metricskey.EventType) + if err != nil { + return nil, err + } + r.eventTypeKey = eventTypeTag + resultTag, err := tag.NewKey(metricskey.Result) + if err != nil { + return nil, err + } + r.resultKey = resultTag + + // Create view to see our measurements. + err = view.Register( + &view.View{ + Description: eventCountM.Description(), + Measure: eventCountM, + // TODO count or sum aggregation? + Aggregation: view.Count(), + TagKeys: []tag.Key{r.namespaceTagKey, r.brokerTagKey, r.eventTypeKey, r.resultKey}, + }, + &view.View{ + Description: dispatchTimeInMsecM.Description(), + Measure: dispatchTimeInMsecM, + Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 + TagKeys: []tag.Key{r.namespaceTagKey, r.brokerTagKey, r.eventTypeKey, r.resultKey}, + }, + ) + if err != nil { + return nil, err + } + + r.initialized = true + return r, nil +} + +// ReportEventCount captures the event count. +func (r *Reporter) ReportEventCount(args *ReportArgs, err error) error { + if !r.initialized { + return fmt.Errorf("StatsReporter is not initialized yet") + } + + ctx, err := tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.brokerTagKey, args.broker), + tag.Insert(r.eventTypeKey, args.eventType), + tag.Insert(r.resultKey, utils.Result(err))) + if err != nil { + return err + } + + metrics.Record(ctx, eventCountM.M(1)) + return nil +} + +// ReportDispatchTime captures dispatch times. +func (r *Reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { + if !r.initialized { + return fmt.Errorf("StatsReporter is not initialized yet") + } + + ctx, err := tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.brokerTagKey, args.broker), + tag.Insert(r.eventTypeKey, args.eventType), + tag.Insert(r.resultKey, utils.Result(err))) + if err != nil { + return err + } + + // convert time.Duration in nanoseconds to milliseconds. + metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) + return nil +} diff --git a/pkg/broker/ingress/stats_reporter_test.go b/pkg/broker/ingress/stats_reporter_test.go new file mode 100644 index 00000000000..845028193b6 --- /dev/null +++ b/pkg/broker/ingress/stats_reporter_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ingress + +import ( + "github.com/pkg/errors" + "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" + "testing" + "time" +) + +// unregister, ehm, unregisters the metrics that were registered, by +// virtue of StatsReporter creation. +// Since golang executes test iterations within the same process, the stats reporter +// returns an error if the metric is already registered and the test panics. +func unregister() { + metricstest.Unregister("event_count", "dispatch_latencies") +} + +func TestStatsReporter(t *testing.T) { + r := &Reporter{} + + args := &ReportArgs{ + ns: "testns", + broker: "testbroker", + eventType: "testeventtype", + } + if err := r.ReportEventCount(args, errors.New("error")); err == nil { + t.Error("Reporter expected an error for Report call before init. Got success.") + } + + var err error + if r, err = NewStatsReporter(); err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + // Without this `go test ... -count=X`, where X > 1, fails, since + // we get an error about view already being registered. + defer unregister() + + wantTags := map[string]string{ + metricskey.NamespaceName: "testns", + metricskey.BrokerName: "testbroker", + metricskey.EventType: "testeventtype", + metricskey.Result: "success", + } + + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 2) + + // test ReportDispatchTime + expectSuccess(t, func() error { + return r.ReportDispatchTime(args, nil, 1100*time.Millisecond) + }) + expectSuccess(t, func() error { + return r.ReportDispatchTime(args, nil, 9100*time.Millisecond) + }) + metricstest.CheckDistributionData(t, "dispatch_latencies", wantTags, 2, 1100.0, 9100.0) +} + +func expectSuccess(t *testing.T, f func() error) { + t.Helper() + if err := f(); err != nil { + t.Errorf("Reporter expected success but got error: %v", err) + } +} diff --git a/pkg/broker/metrics.go b/pkg/broker/metrics.go index 9e3b7df4d8d..c3e9485a796 100644 --- a/pkg/broker/metrics.go +++ b/pkg/broker/metrics.go @@ -16,18 +16,12 @@ package broker -import "go.opencensus.io/tag" - -// MustNewTagKey creates a Tag or panics. This will only fail if the tag key -// doesn't conform to tag name validations. -// TODO OC library should provide this -func MustNewTagKey(k string) tag.Key { - tagKey, err := tag.NewKey(k) - if err != nil { - panic(err) - } - return tagKey -} +const ( + // EventArrivalTime is used to access the metadata stored on a + // CloudEvent to measure the time difference between when an event is + // received on a broker and when it is dispatched to the trigger function. + EventArrivalTime = "knativearrivaltime" +) // Buckets125 generates an array of buckets with approximate powers-of-two // buckets that also aligns with powers of 10 on every 3rd step. This can @@ -39,3 +33,11 @@ func Buckets125(low, high float64) []float64 { } return buckets } + +// Result converts an error to a result string (either "success" or "error"). +func Result(err error) string { + if err != nil { + return "error" + } + return "success" +} diff --git a/pkg/broker/ttl.go b/pkg/broker/ttl.go index ad6e459896f..33fbf4b89c3 100644 --- a/pkg/broker/ttl.go +++ b/pkg/broker/ttl.go @@ -27,12 +27,6 @@ const ( // Broker's TTL (number of times a single event can reply through a Broker continuously). All // interactions with the attribute should be done through the GetTTL and SetTTL functions. V03TTLAttribute = "knativebrokerttl" - - // TODO rename this extension, and place it somewhere else. - // TimeInFlightMetadataName is used to access the metadata stored on a - // CloudEvent to measure the time difference between when an event is - // received and when it is dispatched to the trigger function. - TimeInFlightMetadataName = "kn00timeinflight" ) // GetTTL finds the TTL in the EventContext using a case insensitive comparison diff --git a/pkg/metrics/metricskey/constants.go b/pkg/metrics/metricskey/constants.go new file mode 100644 index 00000000000..fbfe49b7173 --- /dev/null +++ b/pkg/metrics/metricskey/constants.go @@ -0,0 +1,85 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricskey + +import "k8s.io/apimachinery/pkg/util/sets" + +const ( + // KnativeTrigger is the Stackdriver resource type for Triggers. + KnativeTrigger = "knative_trigger" + + // Project is the label for the project (e.g., GCP project ID). + Project = "project_id" + + // Location is the label for the location (e.g. GCE zone) where the cluster is deployed. + Location = "location" + + // ClusterName is the label for the immutable name of the cluster. + ClusterName = "cluster_name" + + // NamespaceName is the label for the immutable name of the namespace where the resource type exists. + NamespaceName = "namespace_name" + + // TriggerName is the label for the name of the Trigger. + TriggerName = "trigger_name" + + // BrokerName is the label for the name of the Broker. + BrokerName = "broker_name" + + // TriggerType is the label for the type attribute filter of the Trigger. + TriggerType = "trigger_type" + + // TriggerSource is the label for the source attribute filter of the Trigger. + TriggerSource = "trigger_source" + + // EventType is the label for the CloudEvents type context attribute. + EventType = "event_type" + + // FilterResult is the label for the Trigger filtering result. + FilterResult = "filter_result" + + // Unknown is the default value if the field is unknown, e.g., the project will be unknown if Knative + // is not running on GKE. + Unknown = "unknown" + + // Result is the label for the result of sending an event to a downstream consumer. One of "success", "error". + Result = "result" + + // Any is the default value if the trigger filter attributes are empty. + Any = "any" +) + +var ( + // KnativeTriggerLabels stores the set of resource labels for the resource type knative_trigger. + KnativeTriggerLabels = sets.NewString( + Project, + Location, + ClusterName, + NamespaceName, + TriggerName, + TriggerType, + TriggerSource, + ) + + // KnativeTriggerMetrics stores the set of metric types that are supported by the resource type knative_trigger. + KnativeTriggerMetrics = sets.NewString( + "knative.dev/eventing/trigger/event_count", + "knative.dev/eventing/trigger/dispatch_latencies", + "knative.dev/eventing/trigger/filter_latencies", + // TODO event_latencies should be associated with Broker. + ) +) From 51925250c334a0f397954fb0163031d23c0c0932 Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 26 Aug 2019 09:00:45 -0700 Subject: [PATCH 2/4] updating lock --- Gopkg.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index 5a5b5a50eff..185e16ad730 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1236,6 +1236,7 @@ "logging/testing", "metrics", "metrics/metricskey", + "metrics/metricstest", "reconciler/testing", "signals", "system", @@ -1413,6 +1414,7 @@ "knative.dev/pkg/logging/logkey", "knative.dev/pkg/logging/testing", "knative.dev/pkg/metrics", + "knative.dev/pkg/metrics/metricstest", "knative.dev/pkg/reconciler/testing", "knative.dev/pkg/signals", "knative.dev/pkg/system", From 213f19d88a767057729dd3a83c04ba48eaa9ced7 Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 26 Aug 2019 16:45:50 -0700 Subject: [PATCH 3/4] updates after code review --- pkg/broker/filter/filter_handler.go | 32 +++++--- pkg/broker/filter/filter_handler_test.go | 2 +- pkg/broker/filter/stats_reporter.go | 93 +++++++---------------- pkg/broker/filter/stats_reporter_test.go | 10 +-- pkg/broker/ingress/stats_reporter.go | 50 +++++------- pkg/broker/ingress/stats_reporter_test.go | 10 +-- pkg/broker/metrics.go | 2 + pkg/metrics/metricskey/constants.go | 23 ------ 8 files changed, 73 insertions(+), 149 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index a6c24abfd40..26f250f88a4 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -30,6 +30,7 @@ import ( "go.uber.org/zap" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler/trigger/path" "knative.dev/pkg/tracing" "sigs.k8s.io/controller-runtime/pkg/client" @@ -37,6 +38,10 @@ import ( const ( writeTimeout = 1 * time.Minute + + passFilter FilterResult = "pass" + failFilter FilterResult = "fail" + noFilter FilterResult = "no_filter" ) // Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. @@ -47,6 +52,9 @@ type Handler struct { reporter StatsReporter } +// FilterResult has the result of the filtering operation. +type FilterResult string + // NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for // Start()ing the returned Handler. func NewHandler(logger *zap.Logger, client client.Client, reporter StatsReporter) (*Handler, error) { @@ -226,10 +234,10 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC // Check if the event should be sent, and record filtering time. start := time.Now() - pass, filterResult := r.shouldSendEvent(ctx, &t.Spec, event) + filterResult := r.shouldSendEvent(ctx, &t.Spec, event) r.reporter.ReportFilterTime(reportArgs, filterResult, time.Since(start)) - if !pass { + if filterResult == failFilter { r.logger.Debug("Event did not pass filter", zap.Any("triggerRef", trigger)) // Record the event count. r.reporter.ReportEventCount(reportArgs, errors.New("event did not pass filter")) @@ -267,11 +275,11 @@ func (r *Handler) getTrigger(ctx context.Context, ref path.NamespacedNameUID) (* // shouldSendEvent determines whether event 'event' should be sent based on the triggerSpec 'ts'. // Currently it supports exact matching on event context attributes and extension attributes. -// If no filter is present, shouldSendEvent returns true. -func (r *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) (bool, string) { +// If no filter is present, shouldSendEvent returns passFilter. +func (r *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) FilterResult { // No filter specified, default to passing everything. if ts.Filter == nil || (ts.Filter.DeprecatedSourceAndType == nil && ts.Filter.Attributes == nil) { - return true, "no_filter" + return noFilter } attrs := map[string]string{} @@ -284,15 +292,15 @@ func (r *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1alpha1.Trig attrs = map[string]string(*ts.Filter.Attributes) } - result := r.filterEventByAttributes(attrs, event) - resultStr := "fail" + result := r.filterEventByAttributes(ctx, attrs, event) + filterResult := failFilter if result { - resultStr = "pass" + filterResult = passFilter } - return result, resultStr + return filterResult } -func (r *Handler) filterEventByAttributes(attrs map[string]string, event *cloudevents.Event) bool { +func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool { // Set standard context attributes. The attributes available may not be // exactly the same as the attributes defined in the current version of the // CloudEvents spec. @@ -320,12 +328,12 @@ func (r *Handler) filterEventByAttributes(attrs map[string]string, event *cloude value, ok := ce[k] // If the attribute does not exist in the event, return false. if !ok { - r.logger.Debug("Attribute not found", zap.String("attribute", k)) + logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) return false } // If the attribute is not set to any and is different than the one from the event, return false. if v != eventingv1alpha1.TriggerAnyFilter && v != value { - r.logger.Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) + logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) return false } } diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 08832714ac6..1a9bef36d55 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -367,7 +367,7 @@ func (r *mockReporter) ReportDispatchTime(args *ReportArgs, err error, d time.Du return nil } -func (r *mockReporter) ReportFilterTime(args *ReportArgs, filterResult string, d time.Duration) error { +func (r *mockReporter) ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error { return nil } diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go index ac50810874a..7b682519671 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/broker/filter/stats_reporter.go @@ -18,14 +18,14 @@ package filter import ( "context" - "fmt" + "time" + "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" utils "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/metrics/metricskey" "knative.dev/pkg/metrics" - "time" ) var ( @@ -74,13 +74,14 @@ type ReportArgs struct { type StatsReporter interface { ReportEventCount(args *ReportArgs, err error) error ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error - ReportFilterTime(args *ReportArgs, filterResult string, d time.Duration) error + ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error } -// Reporter holds cached metric objects to report filter metrics. -type Reporter struct { - initialized bool +var _ StatsReporter = (*reporter)(nil) + +// reporter holds cached metric objects to report filter metrics. +type reporter struct { namespaceTagKey tag.Key triggerTagKey tag.Key brokerTagKey tag.Key @@ -91,8 +92,8 @@ type Reporter struct { } // NewStatsReporter creates a reporter that collects and reports filter metrics. -func NewStatsReporter() (*Reporter, error) { - var r = &Reporter{} +func NewStatsReporter() (StatsReporter, error) { + var r = &reporter{} // Create the tag keys that will be used to add tags to our measurements. nsTag, err := tag.NewKey(metricskey.NamespaceName) @@ -163,103 +164,63 @@ func NewStatsReporter() (*Reporter, error) { return nil, err } - r.initialized = true return r, nil } // ReportEventCount captures the event count. -func (r *Reporter) ReportEventCount(args *ReportArgs, err error) error { - if !r.initialized { - return fmt.Errorf("StatsReporter is not initialized yet") - } - - // Note that eventType and eventSource can be empty strings, so they need a special treatment. - ctx, err := tag.New( - context.Background(), - tag.Insert(r.namespaceTagKey, args.ns), - tag.Insert(r.triggerTagKey, args.trigger), - tag.Insert(r.brokerTagKey, args.broker), - tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), - tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), - tag.Insert(r.resultKey, utils.Result(err))) +func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { + ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err))) if err != nil { return err } - metrics.Record(ctx, eventCountM.M(1)) return nil } // ReportDispatchTime captures dispatch times. -func (r *Reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { - if !r.initialized { - return fmt.Errorf("StatsReporter is not initialized yet") - } - - // Note that eventType and eventSource can be empty strings, so they need a special treatment. - ctx, err := tag.New( - context.Background(), - tag.Insert(r.namespaceTagKey, args.ns), - tag.Insert(r.triggerTagKey, args.trigger), - tag.Insert(r.brokerTagKey, args.broker), - tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), - tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), - tag.Insert(r.resultKey, utils.Result(err))) +func (r *reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { + ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err))) if err != nil { return err } - // convert time.Duration in nanoseconds to milliseconds. metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) return nil } // ReportFilterTime captures filtering times. -func (r *Reporter) ReportFilterTime(args *ReportArgs, filterResult string, d time.Duration) error { - if !r.initialized { - return fmt.Errorf("StatsReporter is not initialized yet") - } - - // Note that eventType and eventSource can be empty strings, so they need a special treatment. - ctx, err := tag.New( - context.Background(), - tag.Insert(r.namespaceTagKey, args.ns), - tag.Insert(r.triggerTagKey, args.trigger), - tag.Insert(r.brokerTagKey, args.broker), - tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), - tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), - tag.Insert(r.filterResultKey, filterResult)) +func (r *reporter) ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error { + ctx, err := r.generateTag(args, tag.Insert(r.filterResultKey, string(filterResult))) if err != nil { return err } - // convert time.Duration in nanoseconds to milliseconds. metrics.Record(ctx, filterTimeInMsecM.M(float64(d/time.Millisecond))) return nil } // ReportEventDeliveryTime captures event delivery times. -func (r *Reporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error { - if !r.initialized { - return fmt.Errorf("StatsReporter is not initialized yet") +func (r *reporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error { + ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err))) + if err != nil { + return err } + // convert time.Duration in nanoseconds to milliseconds. + metrics.Record(ctx, deliveryTimeInMsecM.M(float64(d/time.Millisecond))) + return nil +} + +func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context, error) { // Note that eventType and eventSource can be empty strings, so they need a special treatment. - ctx, err := tag.New( + return tag.New( context.Background(), tag.Insert(r.namespaceTagKey, args.ns), tag.Insert(r.triggerTagKey, args.trigger), tag.Insert(r.brokerTagKey, args.broker), tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), - tag.Insert(r.resultKey, utils.Result(err))) - if err != nil { - return err - } - - // convert time.Duration in nanoseconds to milliseconds. - metrics.Record(ctx, deliveryTimeInMsecM.M(float64(d/time.Millisecond))) - return nil + t) } func valueOrAny(v string) string { diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index 4c0dac26d9b..eddae32f385 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -17,7 +17,6 @@ limitations under the License. package filter import ( - "github.com/pkg/errors" "knative.dev/eventing/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricstest" "testing" @@ -33,8 +32,6 @@ func unregister() { } func TestStatsReporter(t *testing.T) { - r := &Reporter{} - args := &ReportArgs{ ns: "testns", trigger: "testtrigger", @@ -42,12 +39,9 @@ func TestStatsReporter(t *testing.T) { eventType: "testeventtype", eventSource: "testeventsource", } - if err := r.ReportEventCount(args, errors.New("error")); err == nil { - t.Error("Reporter expected an error for Report call before init. Got success.") - } - var err error - if r, err = NewStatsReporter(); err != nil { + r, err := NewStatsReporter() + if err != nil { t.Fatalf("Failed to create a new reporter: %v", err) } // Without this `go test ... -count=X`, where X > 1, fails, since diff --git a/pkg/broker/ingress/stats_reporter.go b/pkg/broker/ingress/stats_reporter.go index a8d22bdb7d0..e250ab1674d 100644 --- a/pkg/broker/ingress/stats_reporter.go +++ b/pkg/broker/ingress/stats_reporter.go @@ -18,7 +18,6 @@ package ingress import ( "context" - "fmt" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" @@ -38,10 +37,10 @@ var ( ) // dispatchTimeInMsecM records the time spent dispatching an event to - // a Trigger, in milliseconds. + // a Channel, in milliseconds. dispatchTimeInMsecM = stats.Float64( "dispatch_latencies", - "The time spent dispatching an event to a Trigger", + "The time spent dispatching an event to a Channel", stats.UnitMilliseconds, ) ) @@ -58,9 +57,10 @@ type StatsReporter interface { ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error } +var _ StatsReporter = (*reporter)(nil) + // Reporter holds cached metric objects to report ingress metrics. -type Reporter struct { - initialized bool +type reporter struct { namespaceTagKey tag.Key brokerTagKey tag.Key eventTypeKey tag.Key @@ -69,8 +69,8 @@ type Reporter struct { } // NewStatsReporter creates a reporter that collects and reports ingress metrics. -func NewStatsReporter() (*Reporter, error) { - var r = &Reporter{} +func NewStatsReporter() (StatsReporter, error) { + var r = &reporter{} // Create the tag keys that will be used to add tags to our measurements. nsTag, err := tag.NewKey(metricskey.NamespaceName) @@ -114,47 +114,35 @@ func NewStatsReporter() (*Reporter, error) { return nil, err } - r.initialized = true return r, nil } // ReportEventCount captures the event count. -func (r *Reporter) ReportEventCount(args *ReportArgs, err error) error { - if !r.initialized { - return fmt.Errorf("StatsReporter is not initialized yet") - } - - ctx, err := tag.New( - context.Background(), - tag.Insert(r.namespaceTagKey, args.ns), - tag.Insert(r.brokerTagKey, args.broker), - tag.Insert(r.eventTypeKey, args.eventType), - tag.Insert(r.resultKey, utils.Result(err))) +func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { + ctx, err := r.generateTag(args, err) if err != nil { return err } - metrics.Record(ctx, eventCountM.M(1)) return nil } // ReportDispatchTime captures dispatch times. -func (r *Reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { - if !r.initialized { - return fmt.Errorf("StatsReporter is not initialized yet") +func (r *reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { + ctx, err := r.generateTag(args, err) + if err != nil { + return err } + // convert time.Duration in nanoseconds to milliseconds. + metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) + return nil +} - ctx, err := tag.New( +func (r *reporter) generateTag(args *ReportArgs, err error) (context.Context, error) { + return tag.New( context.Background(), tag.Insert(r.namespaceTagKey, args.ns), tag.Insert(r.brokerTagKey, args.broker), tag.Insert(r.eventTypeKey, args.eventType), tag.Insert(r.resultKey, utils.Result(err))) - if err != nil { - return err - } - - // convert time.Duration in nanoseconds to milliseconds. - metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) - return nil } diff --git a/pkg/broker/ingress/stats_reporter_test.go b/pkg/broker/ingress/stats_reporter_test.go index 845028193b6..7a9e734cfb5 100644 --- a/pkg/broker/ingress/stats_reporter_test.go +++ b/pkg/broker/ingress/stats_reporter_test.go @@ -17,7 +17,6 @@ limitations under the License. package ingress import ( - "github.com/pkg/errors" "knative.dev/eventing/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricstest" "testing" @@ -33,19 +32,14 @@ func unregister() { } func TestStatsReporter(t *testing.T) { - r := &Reporter{} - args := &ReportArgs{ ns: "testns", broker: "testbroker", eventType: "testeventtype", } - if err := r.ReportEventCount(args, errors.New("error")); err == nil { - t.Error("Reporter expected an error for Report call before init. Got success.") - } - var err error - if r, err = NewStatsReporter(); err != nil { + r, err := NewStatsReporter() + if err != nil { t.Fatalf("Failed to create a new reporter: %v", err) } // Without this `go test ... -count=X`, where X > 1, fails, since diff --git a/pkg/broker/metrics.go b/pkg/broker/metrics.go index c3e9485a796..6b08e664ca1 100644 --- a/pkg/broker/metrics.go +++ b/pkg/broker/metrics.go @@ -20,6 +20,8 @@ const ( // EventArrivalTime is used to access the metadata stored on a // CloudEvent to measure the time difference between when an event is // received on a broker and when it is dispatched to the trigger function. + // Should be set using time.Now(), which returns the current local time. + // The format is: 2019-08-26T23:38:17.834384404Z. EventArrivalTime = "knativearrivaltime" ) diff --git a/pkg/metrics/metricskey/constants.go b/pkg/metrics/metricskey/constants.go index fbfe49b7173..c3bd4e9be74 100644 --- a/pkg/metrics/metricskey/constants.go +++ b/pkg/metrics/metricskey/constants.go @@ -16,8 +16,6 @@ limitations under the License. package metricskey -import "k8s.io/apimachinery/pkg/util/sets" - const ( // KnativeTrigger is the Stackdriver resource type for Triggers. KnativeTrigger = "knative_trigger" @@ -62,24 +60,3 @@ const ( // Any is the default value if the trigger filter attributes are empty. Any = "any" ) - -var ( - // KnativeTriggerLabels stores the set of resource labels for the resource type knative_trigger. - KnativeTriggerLabels = sets.NewString( - Project, - Location, - ClusterName, - NamespaceName, - TriggerName, - TriggerType, - TriggerSource, - ) - - // KnativeTriggerMetrics stores the set of metric types that are supported by the resource type knative_trigger. - KnativeTriggerMetrics = sets.NewString( - "knative.dev/eventing/trigger/event_count", - "knative.dev/eventing/trigger/dispatch_latencies", - "knative.dev/eventing/trigger/filter_latencies", - // TODO event_latencies should be associated with Broker. - ) -) From 94fdc0c48dd7b924c203bb93e66ed60efa8ca50d Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 26 Aug 2019 16:47:35 -0700 Subject: [PATCH 4/4] go imports --- pkg/broker/filter/stats_reporter_test.go | 5 +++-- pkg/broker/ingress/stats_reporter.go | 3 ++- pkg/broker/ingress/stats_reporter_test.go | 5 +++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index eddae32f385..71ab7e989f0 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -17,10 +17,11 @@ limitations under the License. package filter import ( - "knative.dev/eventing/pkg/metrics/metricskey" - "knative.dev/pkg/metrics/metricstest" "testing" "time" + + "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" ) // unregister, ehm, unregisters the metrics that were registered, by diff --git a/pkg/broker/ingress/stats_reporter.go b/pkg/broker/ingress/stats_reporter.go index e250ab1674d..a1335087e1c 100644 --- a/pkg/broker/ingress/stats_reporter.go +++ b/pkg/broker/ingress/stats_reporter.go @@ -18,13 +18,14 @@ package ingress import ( "context" + "time" + "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" utils "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/metrics/metricskey" "knative.dev/pkg/metrics" - "time" ) var ( diff --git a/pkg/broker/ingress/stats_reporter_test.go b/pkg/broker/ingress/stats_reporter_test.go index 7a9e734cfb5..6d713b8056b 100644 --- a/pkg/broker/ingress/stats_reporter_test.go +++ b/pkg/broker/ingress/stats_reporter_test.go @@ -17,10 +17,11 @@ limitations under the License. package ingress import ( - "knative.dev/eventing/pkg/metrics/metricskey" - "knative.dev/pkg/metrics/metricstest" "testing" "time" + + "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" ) // unregister, ehm, unregisters the metrics that were registered, by