From eb1f0a9b6d53e1502ccfcb57f640f4c7466532f2 Mon Sep 17 00:00:00 2001 From: nachocano Date: Wed, 21 Aug 2019 17:39:45 -0700 Subject: [PATCH 1/9] generic infra for metrics, either prometheus or stackdriver for now. --- cmd/broker/filter/main.go | 62 ++++++-------------------------------- cmd/broker/ingress/main.go | 62 ++++++-------------------------------- 2 files changed, 20 insertions(+), 104 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 1bd2ac23c8d..efb484e9d81 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -18,15 +18,7 @@ package main import ( "flag" - "fmt" - "log" - "net/http" - "sync" - "time" - - "contrib.go.opencensus.io/exporter/prometheus" "github.com/kelseyhightower/envconfig" - "go.opencensus.io/stats/view" "go.uber.org/zap" "go.uber.org/zap/zapcore" "k8s.io/client-go/kubernetes" @@ -34,8 +26,8 @@ import ( "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/provisioners" "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/configmap" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -47,21 +39,11 @@ type envConfig struct { Namespace string `envconfig:"NAMESPACE" required:"true"` } -var ( - metricsPort = 9090 - - writeTimeout = 1 * time.Minute - shutdownTimeout = 1 * time.Minute - - wg sync.WaitGroup -) - func main() { logConfig := provisioners.NewLoggingConfig() logConfig.LoggingLevel["provisioner"] = zapcore.DebugLevel logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar() - defer logger.Sync() - + defer flush(logger) flag.Parse() logger.Info("Starting...") @@ -101,32 +83,13 @@ func main() { } err = mgr.Add(receiver) if err != nil { - logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("receiver", receiver)) + logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("broker_receiver", receiver)) } - // Metrics - e, err := prometheus.NewExporter(prometheus.Options{}) - if err != nil { - logger.Fatal("Unable to create Prometheus exporter", zap.Error(err)) - } - view.RegisterExporter(e) - sm := http.NewServeMux() - sm.Handle("/metrics", e) - metricsSrv := &http.Server{ - Addr: fmt.Sprintf(":%d", metricsPort), - Handler: e, - ErrorLog: zap.NewStdLog(logger), - WriteTimeout: writeTimeout, - } + // TODO watch logging config map. - err = mgr.Add(&utils.RunnableServer{ - Server: metricsSrv, - ShutdownTimeout: shutdownTimeout, - WaitGroup: &wg, - }) - if err != nil { - logger.Fatal("Unable to add metrics runnableServer", zap.Error(err)) - } + // Watch the observability config map and dynamically update metrics exporter. + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_receiver", logger.Sugar())) // Set up signals so we handle the first shutdown signal gracefully. stopCh := signals.SetupSignalHandler() @@ -143,14 +106,9 @@ func main() { logger.Fatal("Manager.Start() returned an error", zap.Error(err)) } logger.Info("Exiting...") +} - go func() { - <-time.After(shutdownTimeout) - log.Fatalf("Shutdown took longer than %v", shutdownTimeout) - }() - - // Wait for runnables to stop. This blocks indefinitely, but the above - // goroutine will exit the process if it takes longer than shutdownTimeout. - wg.Wait() - logger.Info("Done.") +func flush(logger *zap.Logger) { + logger.Sync() + metrics.FlushExporter() } diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 5c5d8c4b5c3..6d96cfd07a5 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -18,28 +18,21 @@ package main import ( "flag" - "fmt" - "log" + "knative.dev/pkg/metrics" "net/http" "net/url" - "sync" - "time" - // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "contrib.go.opencensus.io/exporter/prometheus" - cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "github.com/kelseyhightower/envconfig" - "go.opencensus.io/stats/view" "go.uber.org/zap" "k8s.io/client-go/kubernetes" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/broker/ingress" "knative.dev/eventing/pkg/provisioners" "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/configmap" "knative.dev/pkg/signals" "knative.dev/pkg/system" @@ -55,19 +48,10 @@ type envConfig struct { Namespace string `envconfig:"NAMESPACE" required:"true"` } -var ( - metricsPort = 9090 - - writeTimeout = 1 * time.Minute - shutdownTimeout = 1 * time.Minute - - wg sync.WaitGroup -) - func main() { logConfig := provisioners.NewLoggingConfig() logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar() - defer logger.Sync() + defer flush(logger) flag.Parse() crlog.SetLogger(crlog.ZapLogger(false)) @@ -132,29 +116,10 @@ func main() { logger.Fatal("Unable to add handler", zap.Error(err)) } - // Metrics - e, err := prometheus.NewExporter(prometheus.Options{}) - if err != nil { - logger.Fatal("Unable to create Prometheus exporter", zap.Error(err)) - } - view.RegisterExporter(e) - sm := http.NewServeMux() - sm.Handle("/metrics", e) - metricsSrv := &http.Server{ - Addr: fmt.Sprintf(":%d", metricsPort), - Handler: e, - ErrorLog: zap.NewStdLog(logger), - WriteTimeout: writeTimeout, - } + // TODO watch logging config map. - err = mgr.Add(&utils.RunnableServer{ - Server: metricsSrv, - ShutdownTimeout: shutdownTimeout, - WaitGroup: &wg, - }) - if err != nil { - logger.Fatal("Unable to add metrics runnableServer", zap.Error(err)) - } + // Watch the observability config map and dynamically update metrics exporter. + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", logger.Sugar())) // Set up signals so we handle the first shutdown signal gracefully. stopCh := signals.SetupSignalHandler() @@ -169,16 +134,9 @@ func main() { logger.Error("manager.Start() returned an error", zap.Error(err)) } logger.Info("Exiting...") +} - // TODO Gracefully shutdown the ingress server. CloudEvents SDK doesn't seem - // to let us do that today. - go func() { - <-time.After(shutdownTimeout) - log.Fatalf("Shutdown took longer than %v", shutdownTimeout) - }() - - // Wait for runnables to stop. This blocks indefinitely, but the above - // goroutine will exit the process if it takes longer than shutdownTimeout. - wg.Wait() - logger.Info("Done.") +func flush(logger *zap.Logger) { + logger.Sync() + metrics.FlushExporter() } From 9a8cbb40e94e26265c3ec51f91e89df6a347d0f6 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Wed, 21 Aug 2019 22:11:16 -0700 Subject: [PATCH 2/9] adding metrics domain --- pkg/reconciler/broker/broker_test.go | 8 ++++++++ pkg/reconciler/broker/resources/filter.go | 4 ++++ pkg/reconciler/broker/resources/ingress.go | 4 ++++ 3 files changed, 16 insertions(+) diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 28b56fa863d..4e35af68156 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -884,6 +884,10 @@ func envVars(containerName string) []corev1.EnvVar { Name: "BROKER", Value: brokerName, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, } case ingressContainerName: return []corev1.EnvVar{ @@ -911,6 +915,10 @@ func envVars(containerName string) []corev1.EnvVar { Name: "BROKER", Value: brokerName, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, } } return []corev1.EnvVar{} diff --git a/pkg/reconciler/broker/resources/filter.go b/pkg/reconciler/broker/resources/filter.go index 6fbb267e384..ecca79458ed 100644 --- a/pkg/reconciler/broker/resources/filter.go +++ b/pkg/reconciler/broker/resources/filter.go @@ -99,6 +99,10 @@ func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment { Name: "BROKER", Value: args.Broker.Name, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, }, Ports: []corev1.ContainerPort{ { diff --git a/pkg/reconciler/broker/resources/ingress.go b/pkg/reconciler/broker/resources/ingress.go index 1245ca965df..b229634de7f 100644 --- a/pkg/reconciler/broker/resources/ingress.go +++ b/pkg/reconciler/broker/resources/ingress.go @@ -98,6 +98,10 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment { Name: "BROKER", Value: args.Broker.Name, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, }, Ports: []corev1.ContainerPort{ { From e997cc9244957eb64ac9cea795a2efe1df1e9125 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 22 Aug 2019 14:47:46 -0700 Subject: [PATCH 3/9] done --- cmd/broker/filter/main.go | 1 + cmd/broker/ingress/main.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index efb484e9d81..8165f6bcd41 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "github.com/kelseyhightower/envconfig" "go.uber.org/zap" "go.uber.org/zap/zapcore" diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 6d96cfd07a5..78838cbc718 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -18,13 +18,13 @@ package main import ( "flag" - "knative.dev/pkg/metrics" "net/http" "net/url" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" @@ -34,6 +34,7 @@ import ( "knative.dev/eventing/pkg/provisioners" "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/configmap" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" pkgtracing "knative.dev/pkg/tracing" From 69e9ac0cc839f8296dbfa979cb8c93033dd75640 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 22 Aug 2019 15:31:23 -0700 Subject: [PATCH 4/9] updating lock --- Gopkg.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/Gopkg.lock b/Gopkg.lock index f4d6f957870..5a5b5a50eff 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1304,7 +1304,6 @@ analyzer-name = "dep" analyzer-version = 1 input-imports = [ - "contrib.go.opencensus.io/exporter/prometheus", "github.com/cloudevents/sdk-go", "github.com/cloudevents/sdk-go/pkg/cloudevents", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", From c20d855fb1f44436b6752cd6ba9b9e62cd07df0f Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 30 Aug 2019 11:47:40 -0700 Subject: [PATCH 5/9] updates for stackdriver integration --- pkg/broker/filter/filter_handler.go | 48 +++--- pkg/broker/filter/filter_handler_test.go | 8 +- pkg/broker/filter/stats_reporter.go | 116 +++++-------- pkg/broker/filter/stats_reporter_test.go | 83 ++++----- pkg/broker/ingress/ingress_handler.go | 11 +- pkg/broker/ingress/ingress_handler_test.go | 2 +- pkg/broker/ingress/stats_reporter.go | 44 +++-- pkg/broker/ingress/stats_reporter_test.go | 27 +-- pkg/metrics/metricskey/constants.go | 46 +---- .../knative.dev/pkg/metrics/gcp_metadata.go | 9 + .../pkg/metrics/metricskey/constants.go | 50 +----- .../metrics/metricskey/constants_eventing.go | 107 ++++++++++++ .../metrics/metricskey/constants_serving.go | 68 ++++++++ .../pkg/metrics/monitored_resources.go | 44 ++--- .../metrics/monitored_resources_eventing.go | 163 ++++++++++++++++++ .../metrics/monitored_resources_serving.go | 75 ++++++++ vendor/knative.dev/pkg/metrics/record.go | 13 +- .../pkg/metrics/stackdriver_exporter.go | 62 ++----- 18 files changed, 627 insertions(+), 349 deletions(-) create mode 100644 vendor/knative.dev/pkg/metrics/metricskey/constants_eventing.go create mode 100644 vendor/knative.dev/pkg/metrics/metricskey/constants_serving.go create mode 100644 vendor/knative.dev/pkg/metrics/monitored_resources_eventing.go create mode 100644 vendor/knative.dev/pkg/metrics/monitored_resources_serving.go diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index b5f6b1952cd..7727f6bbc6b 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -202,11 +202,11 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC } reportArgs := &ReportArgs{ - ns: t.Namespace, - trigger: t.Name, - broker: t.Spec.Broker, - eventType: triggerFilterAttribute(t.Spec.Filter, "type"), - eventSource: triggerFilterAttribute(t.Spec.Filter, "source"), + ns: t.Namespace, + trigger: t.Name, + broker: t.Spec.Broker, + filterType: triggerFilterAttribute(t.Spec.Filter, "type"), + filterSource: triggerFilterAttribute(t.Spec.Filter, "source"), } subscriberURIString := t.Status.SubscriberURI @@ -226,10 +226,8 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC return nil, err } - // Check if the event should be sent, and record filtering time. - start := time.Now() + // Check if the event should be sent. filterResult := r.shouldSendEvent(ctx, &t.Spec, event) - r.reporter.ReportFilterTime(reportArgs, filterResult, time.Since(start)) if filterResult == failFilter { r.logger.Debug("Event did not pass filter", zap.Any("triggerRef", trigger)) @@ -238,20 +236,21 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC return nil, nil } - start = time.Now() + // Record the event processing time. This might be off if the receiver and the filter pods are running in + // different nodes with different clocks. + var arrivalTime time.Time + if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTime); extErr == nil { + r.reporter.ReportEventProcessingTime(reportArgs, err, time.Since(arrivalTime)) + } + + start := time.Now() sendingCTX := broker.SendingContext(ctx, tctx, subscriberURI) - // TODO get HTTP status codes and use those. + // TODO use HTTP codes: https://github.com/cloudevents/sdk-go/pull/177 replyEvent, err := r.ceClient.Send(sendingCTX, *event) // Record the dispatch time. - r.reporter.ReportDispatchTime(reportArgs, err, time.Since(start)) + r.reporter.ReportEventDispatchTime(reportArgs, err, time.Since(start)) // Record the event count. r.reporter.ReportEventCount(reportArgs, err) - // Record the event latency. This might be off if the receiver and the filter pods are running in - // different nodes with different clocks. - var arrivalTime time.Time - if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTime); extErr == nil { - r.reporter.ReportEventDeliveryTime(reportArgs, err, time.Since(arrivalTime)) - } return replyEvent, err } @@ -285,15 +284,10 @@ func (r *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1alpha1.Trig attrs = map[string]string(*ts.Filter.Attributes) } - result := r.filterEventByAttributes(ctx, attrs, event) - filterResult := failFilter - if result { - filterResult = passFilter - } - return filterResult + return r.filterEventByAttributes(ctx, attrs, event) } -func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool { +func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) FilterResult { // Set standard context attributes. The attributes available may not be // exactly the same as the attributes defined in the current version of the // CloudEvents spec. @@ -322,15 +316,15 @@ func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string] // If the attribute does not exist in the event, return false. if !ok { logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) - return false + return failFilter } // If the attribute is not set to any and is different than the one from the event, return false. if v != eventingv1alpha1.TriggerAnyFilter && v != value { logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) - return false + return failFilter } } - return true + return passFilter } // triggerFilterAttribute returns the filter attribute value for a given `attributeName`. If it doesn't not exist, diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 19fe912227e..61129340c9c 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -354,15 +354,11 @@ func (r *mockReporter) ReportEventCount(args *ReportArgs, err error) error { return nil } -func (r *mockReporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { +func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error { return nil } -func (r *mockReporter) ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error { - return nil -} - -func (r *mockReporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error { +func (r *mockReporter) ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error { return nil } diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go index 7b682519671..7c2163667cd 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/broker/filter/stats_reporter.go @@ -24,8 +24,9 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" utils "knative.dev/eventing/pkg/broker" - "knative.dev/eventing/pkg/metrics/metricskey" + . "knative.dev/eventing/pkg/metrics/metricskey" "knative.dev/pkg/metrics" + "knative.dev/pkg/metrics/metricskey" ) var ( @@ -40,55 +41,46 @@ var ( // dispatchTimeInMsecM records the time spent dispatching an event to // a Trigger subscriber, in milliseconds. dispatchTimeInMsecM = stats.Float64( - "dispatch_latencies", + "event_dispatch_latencies", "The time spent dispatching an event to a Trigger subscriber", stats.UnitMilliseconds, ) - // filterTimeInMsecM records the time spent filtering an event for a - // Trigger, in milliseconds. - filterTimeInMsecM = stats.Float64( - "filter_latencies", - "The time spent filtering an event for a Trigger", - stats.UnitMilliseconds, - ) - - // deliveryTimeInMsecM records the time spent between arrival at the Broker - // and delivery to the Trigger subscriber. - deliveryTimeInMsecM = stats.Float64( - "event_latencies", - "The time spent routing an event from a Broker to a Trigger subscriber", + // processingTimeInMsecM records the time spent between arrival at the Broker + // and the delivery to the Trigger subscriber. + processingTimeInMsecM = stats.Float64( + "event_processing_latencies", + "The time spent processing an event before it is dispatched to a Trigger subscriber", stats.UnitMilliseconds, ) ) type ReportArgs struct { - ns string - trigger string - broker string - eventType string - eventSource string + ns string + trigger string + broker string + filterType string + filterSource string } // StatsReporter defines the interface for sending filter metrics. type StatsReporter interface { ReportEventCount(args *ReportArgs, err error) error - ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error - ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error - ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error + ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error + ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error } var _ StatsReporter = (*reporter)(nil) // reporter holds cached metric objects to report filter metrics. type reporter struct { - namespaceTagKey tag.Key - triggerTagKey tag.Key - brokerTagKey tag.Key - triggerTypeKey tag.Key - triggerSourceKey tag.Key - resultKey tag.Key - filterResultKey tag.Key + namespaceTagKey tag.Key + triggerTagKey tag.Key + brokerTagKey tag.Key + triggerFilterTypeKey tag.Key + triggerFilterSourceKey tag.Key + resultKey tag.Key + filterResultKey tag.Key } // NewStatsReporter creates a reporter that collects and reports filter metrics. @@ -96,37 +88,37 @@ func NewStatsReporter() (StatsReporter, error) { var r = &reporter{} // Create the tag keys that will be used to add tags to our measurements. - nsTag, err := tag.NewKey(metricskey.NamespaceName) + nsTag, err := tag.NewKey(metricskey.LabelNamespaceName) if err != nil { return nil, err } r.namespaceTagKey = nsTag - triggerTag, err := tag.NewKey(metricskey.TriggerName) + triggerTag, err := tag.NewKey(metricskey.LabelTriggerName) if err != nil { return nil, err } r.triggerTagKey = triggerTag - brokerTag, err := tag.NewKey(metricskey.BrokerName) + brokerTag, err := tag.NewKey(metricskey.LabelBrokerName) if err != nil { return nil, err } r.brokerTagKey = brokerTag - triggerTypeTag, err := tag.NewKey(metricskey.TriggerType) + triggerFilterTypeTag, err := tag.NewKey(metricskey.LabelFilterType) if err != nil { return nil, err } - r.triggerTypeKey = triggerTypeTag - triggerSourceKey, err := tag.NewKey(metricskey.TriggerSource) + r.triggerFilterTypeKey = triggerFilterTypeTag + triggerFilterSourceKey, err := tag.NewKey(metricskey.LabelFilterSource) if err != nil { return nil, err } - r.triggerSourceKey = triggerSourceKey - filterResultTag, err := tag.NewKey(metricskey.FilterResult) + r.triggerFilterSourceKey = triggerFilterSourceKey + filterResultTag, err := tag.NewKey(LabelFilterResult) if err != nil { return nil, err } r.filterResultKey = filterResultTag - resultTag, err := tag.NewKey(metricskey.Result) + resultTag, err := tag.NewKey(LabelResult) if err != nil { return nil, err } @@ -137,27 +129,20 @@ func NewStatsReporter() (StatsReporter, error) { &view.View{ Description: eventCountM.Description(), Measure: eventCountM, - // TODO count or sum aggregation? Aggregation: view.Count(), - TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey}, + TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey}, }, &view.View{ Description: dispatchTimeInMsecM.Description(), Measure: dispatchTimeInMsecM, Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 - TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey}, + TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey}, }, &view.View{ - Description: filterTimeInMsecM.Description(), - Measure: filterTimeInMsecM, - Aggregation: view.Distribution(utils.Buckets125(0.1, 10)...), // 0.1, 0.2, 0.5, 1, 2, 5, 10 - TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.filterResultKey}, - }, - &view.View{ - Description: deliveryTimeInMsecM.Description(), - Measure: deliveryTimeInMsecM, + Description: processingTimeInMsecM.Description(), + Measure: processingTimeInMsecM, Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 - TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey}, + TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey}, }, ) if err != nil { @@ -177,8 +162,8 @@ func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { return nil } -// ReportDispatchTime captures dispatch times. -func (r *reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { +// ReportEventDispatchTime captures dispatch times. +func (r *reporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error { ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err))) if err != nil { return err @@ -188,38 +173,27 @@ func (r *reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Durati return nil } -// ReportFilterTime captures filtering times. -func (r *reporter) ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error { - ctx, err := r.generateTag(args, tag.Insert(r.filterResultKey, string(filterResult))) - if err != nil { - return err - } - // convert time.Duration in nanoseconds to milliseconds. - metrics.Record(ctx, filterTimeInMsecM.M(float64(d/time.Millisecond))) - return nil -} - -// ReportEventDeliveryTime captures event delivery times. -func (r *reporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error { +// ReportEventProcessingTime captures event processing times. +func (r *reporter) ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error { ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err))) if err != nil { return err } // convert time.Duration in nanoseconds to milliseconds. - metrics.Record(ctx, deliveryTimeInMsecM.M(float64(d/time.Millisecond))) + metrics.Record(ctx, processingTimeInMsecM.M(float64(d/time.Millisecond))) return nil } func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context, error) { - // Note that eventType and eventSource can be empty strings, so they need a special treatment. + // Note that filterType and filterSource can be empty strings, so they need a special treatment. return tag.New( context.Background(), tag.Insert(r.namespaceTagKey, args.ns), tag.Insert(r.triggerTagKey, args.trigger), tag.Insert(r.brokerTagKey, args.broker), - tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)), - tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)), + tag.Insert(r.triggerFilterTypeKey, valueOrAny(args.filterType)), + tag.Insert(r.triggerFilterSourceKey, valueOrAny(args.filterSource)), t) } @@ -227,5 +201,5 @@ func valueOrAny(v string) string { if v != "" { return v } - return metricskey.Any + return AnyValue } diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index 71ab7e989f0..6afb106e39d 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -20,7 +20,8 @@ import ( "testing" "time" - "knative.dev/eventing/pkg/metrics/metricskey" + . "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricstest" ) @@ -29,16 +30,16 @@ import ( // Since golang executes test iterations within the same process, the stats reporter // returns an error if the metric is already registered and the test panics. func unregister() { - metricstest.Unregister("event_count", "dispatch_latencies", "filter_latencies", "event_latencies") + metricstest.Unregister("event_count", "event_dispatch_latencies", "event_processing_latencies") } func TestStatsReporter(t *testing.T) { args := &ReportArgs{ - ns: "testns", - trigger: "testtrigger", - broker: "testbroker", - eventType: "testeventtype", - eventSource: "testeventsource", + ns: "testns", + trigger: "testtrigger", + broker: "testbroker", + filterType: "testeventtype", + filterSource: "testeventsource", } r, err := NewStatsReporter() @@ -50,16 +51,14 @@ func TestStatsReporter(t *testing.T) { defer unregister() wantTags := map[string]string{ - metricskey.NamespaceName: "testns", - metricskey.TriggerName: "testtrigger", - metricskey.BrokerName: "testbroker", - metricskey.TriggerType: "testeventtype", - metricskey.TriggerSource: "testeventsource", + metricskey.LabelNamespaceName: "testns", + metricskey.LabelTriggerName: "testtrigger", + metricskey.LabelBrokerName: "testbroker", + metricskey.LabelFilterType: "testeventtype", + metricskey.LabelFilterSource: "testeventsource", + LabelResult: "success", } - wantTags1 := map[string]string(wantTags) - wantTags1[metricskey.Result] = "success" - // test ReportEventCount expectSuccess(t, func() error { return r.ReportEventCount(args, nil) @@ -67,40 +66,28 @@ func TestStatsReporter(t *testing.T) { expectSuccess(t, func() error { return r.ReportEventCount(args, nil) }) - metricstest.CheckCountData(t, "event_count", wantTags1, 2) + metricstest.CheckCountData(t, "event_count", wantTags, 2) - // test ReportDispatchTime - expectSuccess(t, func() error { - return r.ReportDispatchTime(args, nil, 1100*time.Millisecond) - }) + // test ReportEventDispatchTime expectSuccess(t, func() error { - return r.ReportDispatchTime(args, nil, 9100*time.Millisecond) + return r.ReportEventDispatchTime(args, nil, 1100*time.Millisecond) }) - metricstest.CheckDistributionData(t, "dispatch_latencies", wantTags1, 2, 1100.0, 9100.0) - - // test ReportEventDeliveryTime expectSuccess(t, func() error { - return r.ReportEventDeliveryTime(args, nil, 1000*time.Millisecond) + return r.ReportEventDispatchTime(args, nil, 9100*time.Millisecond) }) - expectSuccess(t, func() error { - return r.ReportEventDeliveryTime(args, nil, 8000*time.Millisecond) - }) - metricstest.CheckDistributionData(t, "event_latencies", wantTags1, 2, 1000.0, 8000.0) - - wantTags2 := map[string]string(wantTags) - wantTags2[metricskey.FilterResult] = "pass" + metricstest.CheckDistributionData(t, "event_dispatch_latencies", wantTags, 2, 1100.0, 9100.0) - // test ReportFilterTime + // test ReportEventProcessingTime expectSuccess(t, func() error { - return r.ReportFilterTime(args, "pass", 100*time.Millisecond) + return r.ReportEventProcessingTime(args, nil, 1000*time.Millisecond) }) expectSuccess(t, func() error { - return r.ReportFilterTime(args, "pass", 500*time.Millisecond) + return r.ReportEventProcessingTime(args, nil, 8000*time.Millisecond) }) - metricstest.CheckDistributionData(t, "filter_latencies", wantTags1, 2, 100.0, 500.0) + metricstest.CheckDistributionData(t, "event_processing_latencies", wantTags, 2, 1000.0, 8000.0) } -func TestReporterEmptySourceAndType(t *testing.T) { +func TestReporterEmptySourceAndTypeFilter(t *testing.T) { r, err := NewStatsReporter() defer unregister() @@ -109,20 +96,20 @@ func TestReporterEmptySourceAndType(t *testing.T) { } args := &ReportArgs{ - ns: "testns", - trigger: "testtrigger", - broker: "testbroker", - eventType: "", - eventSource: "", + ns: "testns", + trigger: "testtrigger", + broker: "testbroker", + filterType: "", + filterSource: "", } wantTags := map[string]string{ - metricskey.NamespaceName: "testns", - metricskey.TriggerName: "testtrigger", - metricskey.BrokerName: "testbroker", - metricskey.TriggerType: metricskey.Any, - metricskey.TriggerSource: metricskey.Any, - metricskey.Result: "success", + metricskey.LabelNamespaceName: "testns", + metricskey.LabelTriggerName: "testtrigger", + metricskey.LabelBrokerName: "testbroker", + metricskey.LabelFilterType: AnyValue, + metricskey.LabelFilterSource: AnyValue, + LabelResult: "success", } // test ReportEventCount diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 5c4ee55496d..bca8814690b 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -78,9 +78,10 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp * tctx = addOutGoingTracing(ctx, event, tctx) reporterArgs := &ReportArgs{ - ns: h.Namespace, - broker: h.BrokerName, - eventType: event.Type(), + ns: h.Namespace, + broker: h.BrokerName, + eventType: event.Type(), + eventSource: event.Source(), } send := h.decrementTTL(&event) @@ -92,10 +93,10 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp * start := time.Now() sendingCTX := broker.SendingContext(ctx, tctx, h.ChannelURI) - // TODO get HTTP status codes and use those. + // TODO use HTTP codes: https://github.com/cloudevents/sdk-go/pull/177 _, err := h.CeClient.Send(sendingCTX, event) // Record the dispatch time. - h.Reporter.ReportDispatchTime(reporterArgs, err, time.Since(start)) + h.Reporter.ReportEventDispatchTime(reporterArgs, err, time.Since(start)) // Record the event count. h.Reporter.ReportEventCount(reporterArgs, err) return err diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index 0c9a59257ab..1f2228652a2 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -28,7 +28,7 @@ func (r *mockReporter) ReportEventCount(args *ReportArgs, err error) error { return nil } -func (r *mockReporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { +func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error { return nil } diff --git a/pkg/broker/ingress/stats_reporter.go b/pkg/broker/ingress/stats_reporter.go index a1335087e1c..f1a86017654 100644 --- a/pkg/broker/ingress/stats_reporter.go +++ b/pkg/broker/ingress/stats_reporter.go @@ -18,14 +18,14 @@ package ingress import ( "context" - "time" - "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" utils "knative.dev/eventing/pkg/broker" - "knative.dev/eventing/pkg/metrics/metricskey" + . "knative.dev/eventing/pkg/metrics/metricskey" "knative.dev/pkg/metrics" + "knative.dev/pkg/metrics/metricskey" + "time" ) var ( @@ -40,22 +40,23 @@ var ( // dispatchTimeInMsecM records the time spent dispatching an event to // a Channel, in milliseconds. dispatchTimeInMsecM = stats.Float64( - "dispatch_latencies", + "event_dispatch_latencies", "The time spent dispatching an event to a Channel", stats.UnitMilliseconds, ) ) type ReportArgs struct { - ns string - broker string - eventType string + ns string + broker string + eventType string + eventSource string } // StatsReporter defines the interface for sending ingress metrics. type StatsReporter interface { ReportEventCount(args *ReportArgs, err error) error - ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error + ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error } var _ StatsReporter = (*reporter)(nil) @@ -65,8 +66,8 @@ type reporter struct { namespaceTagKey tag.Key brokerTagKey tag.Key eventTypeKey tag.Key - // TODO add support for EventSource - resultKey tag.Key + eventSourceKey tag.Key + resultKey tag.Key } // NewStatsReporter creates a reporter that collects and reports ingress metrics. @@ -74,22 +75,27 @@ func NewStatsReporter() (StatsReporter, error) { var r = &reporter{} // Create the tag keys that will be used to add tags to our measurements. - nsTag, err := tag.NewKey(metricskey.NamespaceName) + nsTag, err := tag.NewKey(metricskey.LabelNamespaceName) if err != nil { return nil, err } r.namespaceTagKey = nsTag - brokerTag, err := tag.NewKey(metricskey.BrokerName) + brokerTag, err := tag.NewKey(metricskey.LabelBrokerName) if err != nil { return nil, err } r.brokerTagKey = brokerTag - eventTypeTag, err := tag.NewKey(metricskey.EventType) + eventTypeTag, err := tag.NewKey(metricskey.LabelEventType) if err != nil { return nil, err } r.eventTypeKey = eventTypeTag - resultTag, err := tag.NewKey(metricskey.Result) + eventSourceTag, err := tag.NewKey(metricskey.LabelEventSource) + if err != nil { + return nil, err + } + r.eventSourceKey = eventSourceTag + resultTag, err := tag.NewKey(LabelResult) if err != nil { return nil, err } @@ -100,15 +106,14 @@ func NewStatsReporter() (StatsReporter, error) { &view.View{ Description: eventCountM.Description(), Measure: eventCountM, - // TODO count or sum aggregation? Aggregation: view.Count(), - TagKeys: []tag.Key{r.namespaceTagKey, r.brokerTagKey, r.eventTypeKey, r.resultKey}, + TagKeys: []tag.Key{r.namespaceTagKey, r.brokerTagKey, r.eventTypeKey, r.eventSourceKey, r.resultKey}, }, &view.View{ Description: dispatchTimeInMsecM.Description(), Measure: dispatchTimeInMsecM, Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100 - TagKeys: []tag.Key{r.namespaceTagKey, r.brokerTagKey, r.eventTypeKey, r.resultKey}, + TagKeys: []tag.Key{r.namespaceTagKey, r.brokerTagKey, r.eventTypeKey, r.eventSourceKey, r.resultKey}, }, ) if err != nil { @@ -128,8 +133,8 @@ func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { return nil } -// ReportDispatchTime captures dispatch times. -func (r *reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error { +// ReportEventDispatchTime captures dispatch times. +func (r *reporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error { ctx, err := r.generateTag(args, err) if err != nil { return err @@ -145,5 +150,6 @@ func (r *reporter) generateTag(args *ReportArgs, err error) (context.Context, er tag.Insert(r.namespaceTagKey, args.ns), tag.Insert(r.brokerTagKey, args.broker), tag.Insert(r.eventTypeKey, args.eventType), + tag.Insert(r.eventSourceKey, args.eventSource), tag.Insert(r.resultKey, utils.Result(err))) } diff --git a/pkg/broker/ingress/stats_reporter_test.go b/pkg/broker/ingress/stats_reporter_test.go index 6d713b8056b..9ce3c25f786 100644 --- a/pkg/broker/ingress/stats_reporter_test.go +++ b/pkg/broker/ingress/stats_reporter_test.go @@ -20,7 +20,8 @@ import ( "testing" "time" - "knative.dev/eventing/pkg/metrics/metricskey" + . "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricstest" ) @@ -29,14 +30,15 @@ import ( // Since golang executes test iterations within the same process, the stats reporter // returns an error if the metric is already registered and the test panics. func unregister() { - metricstest.Unregister("event_count", "dispatch_latencies") + metricstest.Unregister("event_count", "event_dispatch_latencies") } func TestStatsReporter(t *testing.T) { args := &ReportArgs{ - ns: "testns", - broker: "testbroker", - eventType: "testeventtype", + ns: "testns", + broker: "testbroker", + eventType: "testeventtype", + eventSource: "testeventsource", } r, err := NewStatsReporter() @@ -48,10 +50,11 @@ func TestStatsReporter(t *testing.T) { defer unregister() wantTags := map[string]string{ - metricskey.NamespaceName: "testns", - metricskey.BrokerName: "testbroker", - metricskey.EventType: "testeventtype", - metricskey.Result: "success", + metricskey.LabelNamespaceName: "testns", + metricskey.LabelBrokerName: "testbroker", + metricskey.LabelEventType: "testeventtype", + metricskey.LabelEventSource: "testeventsource", + LabelResult: "success", } // test ReportEventCount @@ -65,12 +68,12 @@ func TestStatsReporter(t *testing.T) { // test ReportDispatchTime expectSuccess(t, func() error { - return r.ReportDispatchTime(args, nil, 1100*time.Millisecond) + return r.ReportEventDispatchTime(args, nil, 1100*time.Millisecond) }) expectSuccess(t, func() error { - return r.ReportDispatchTime(args, nil, 9100*time.Millisecond) + return r.ReportEventDispatchTime(args, nil, 9100*time.Millisecond) }) - metricstest.CheckDistributionData(t, "dispatch_latencies", wantTags, 2, 1100.0, 9100.0) + metricstest.CheckDistributionData(t, "event_dispatch_latencies", wantTags, 2, 1100.0, 9100.0) } func expectSuccess(t *testing.T, f func() error) { diff --git a/pkg/metrics/metricskey/constants.go b/pkg/metrics/metricskey/constants.go index c3bd4e9be74..7419bf71b57 100644 --- a/pkg/metrics/metricskey/constants.go +++ b/pkg/metrics/metricskey/constants.go @@ -17,46 +17,12 @@ limitations under the License. package metricskey const ( - // KnativeTrigger is the Stackdriver resource type for Triggers. - KnativeTrigger = "knative_trigger" + // LabelFilterResult is the label for the Trigger filtering result. + LabelFilterResult = "filter_result" - // Project is the label for the project (e.g., GCP project ID). - Project = "project_id" + // LabelResult is the label for the result of sending an event to a downstream consumer. One of "success", "error". + LabelResult = "result" - // Location is the label for the location (e.g. GCE zone) where the cluster is deployed. - Location = "location" - - // ClusterName is the label for the immutable name of the cluster. - ClusterName = "cluster_name" - - // NamespaceName is the label for the immutable name of the namespace where the resource type exists. - NamespaceName = "namespace_name" - - // TriggerName is the label for the name of the Trigger. - TriggerName = "trigger_name" - - // BrokerName is the label for the name of the Broker. - BrokerName = "broker_name" - - // TriggerType is the label for the type attribute filter of the Trigger. - TriggerType = "trigger_type" - - // TriggerSource is the label for the source attribute filter of the Trigger. - TriggerSource = "trigger_source" - - // EventType is the label for the CloudEvents type context attribute. - EventType = "event_type" - - // FilterResult is the label for the Trigger filtering result. - FilterResult = "filter_result" - - // Unknown is the default value if the field is unknown, e.g., the project will be unknown if Knative - // is not running on GKE. - Unknown = "unknown" - - // Result is the label for the result of sending an event to a downstream consumer. One of "success", "error". - Result = "result" - - // Any is the default value if the trigger filter attributes are empty. - Any = "any" + // AnyValue is the default value if the trigger filter attributes are empty. + AnyValue = "any" ) diff --git a/vendor/knative.dev/pkg/metrics/gcp_metadata.go b/vendor/knative.dev/pkg/metrics/gcp_metadata.go index 5f33e3ea49d..9cf9882e248 100644 --- a/vendor/knative.dev/pkg/metrics/gcp_metadata.go +++ b/vendor/knative.dev/pkg/metrics/gcp_metadata.go @@ -1,9 +1,12 @@ /* Copyright 2019 The Knative Authors + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +21,12 @@ import ( "knative.dev/pkg/metrics/metricskey" ) +type gcpMetadata struct { + project string + location string + cluster string +} + func retrieveGCPMetadata() *gcpMetadata { gm := gcpMetadata{ project: metricskey.ValueUnknown, diff --git a/vendor/knative.dev/pkg/metrics/metricskey/constants.go b/vendor/knative.dev/pkg/metrics/metricskey/constants.go index f941f222ae5..01e5adff7e8 100644 --- a/vendor/knative.dev/pkg/metrics/metricskey/constants.go +++ b/vendor/knative.dev/pkg/metrics/metricskey/constants.go @@ -1,9 +1,12 @@ /* Copyright 2018 The Knative Authors + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -13,12 +16,7 @@ limitations under the License. package metricskey -import "k8s.io/apimachinery/pkg/util/sets" - const ( - // ResourceTypeKnativeRevision is the Stackdriver resource type for Knative revision - ResourceTypeKnativeRevision = "knative_revision" - // LabelProject is the label for project (e.g. GCP GAIA ID, AWS project name) LabelProject = "project_id" @@ -31,49 +29,7 @@ const ( // LabelNamespaceName is the label for immutable name of the namespace that the service is deployed LabelNamespaceName = "namespace_name" - // LabelServiceName is the label for the deployed service name - LabelServiceName = "service_name" - - // LabelRouteName is the label for immutable name of the route that receives the request - LabelRouteName = "route_name" - - // LabelConfigurationName is the label for the configuration which created the monitored revision - LabelConfigurationName = "configuration_name" - - // LabelRevisionName is the label for the monitored revision - LabelRevisionName = "revision_name" - // ValueUnknown is the default value if the field is unknown, e.g. project will be unknown if Knative // is not running on GKE. ValueUnknown = "unknown" ) - -var ( - // KnativeRevisionLabels stores the set of resource labels for resource type knative_revision. - // LabelRouteName is added as extra label since it is optional, not in this map. - KnativeRevisionLabels = sets.NewString( - LabelProject, - LabelLocation, - LabelClusterName, - LabelNamespaceName, - LabelServiceName, - LabelConfigurationName, - LabelRevisionName, - ) - - // KnativeRevisionMetrics stores a set of metric types which are supported - // by resource type knative_revision. - KnativeRevisionMetrics = sets.NewString( - "knative.dev/serving/activator/request_count", - "knative.dev/serving/activator/request_latencies", - "knative.dev/serving/autoscaler/desired_pods", - "knative.dev/serving/autoscaler/requested_pods", - "knative.dev/serving/autoscaler/actual_pods", - "knative.dev/serving/autoscaler/stable_request_concurrency", - "knative.dev/serving/autoscaler/panic_request_concurrency", - "knative.dev/serving/autoscaler/target_concurrency_per_pod", - "knative.dev/serving/autoscaler/panic_mode", - "knative.dev/serving/revision/request_count", - "knative.dev/serving/revision/request_latencies", - ) -) diff --git a/vendor/knative.dev/pkg/metrics/metricskey/constants_eventing.go b/vendor/knative.dev/pkg/metrics/metricskey/constants_eventing.go new file mode 100644 index 00000000000..249c582f48b --- /dev/null +++ b/vendor/knative.dev/pkg/metrics/metricskey/constants_eventing.go @@ -0,0 +1,107 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricskey + +import "k8s.io/apimachinery/pkg/util/sets" + +// TODO should be moved to eventing. See https://github.com/knative/pkg/issues/608 + +const ( + // ResourceTypeKnativeTrigger is the Stackdriver resource type for Knative Triggers. + ResourceTypeKnativeTrigger = "knative_trigger" + + // ResourceTypeKnativeBroker is the Stackdriver resource type for Knative Brokers. + ResourceTypeKnativeBroker = "knative_broker" + + // ResourceTypeKnativeImporter is the Stackdriver resource type for Knative Importers. + ResourceTypeKnativeImporter = "knative_importer" + + // LabelTriggerName is the label for the name of the Trigger. + LabelTriggerName = "trigger_name" + + // LabelBrokerName is the label for the name of the Broker. + LabelBrokerName = "broker_name" + + // LabelEventType is the label for the name of the event type. + LabelEventType = "event_type" + + // LabelEventSource is the label for the name of the event source. + LabelEventSource = "event_source" + + // LabelFilterType is the label for the Trigger filter attribute "type". + LabelFilterType = "filter_type" + + // LabelFilterSource is the label for the Trigger filter attribute "source". + LabelFilterSource = "filter_source" + + // LabelImporterName is the label for the name of the Importer. + LabelImporterName = "importer_name" + + // LabelImporterResourceGroup is the name of the Importer CRD. + LabelImporterResourceGroup = "importer_resource_group" +) + +var ( + // KnativeTriggerLabels stores the set of resource labels for resource type knative_trigger. + KnativeTriggerLabels = sets.NewString( + LabelProject, + LabelLocation, + LabelClusterName, + LabelNamespaceName, + LabelTriggerName, + LabelBrokerName, + ) + + // KnativeTriggerMetrics stores a set of metric types which are supported + // by resource type knative_trigger. + KnativeTriggerMetrics = sets.NewString( + "knative.dev/eventing/trigger/event_count", + "knative.dev/eventing/trigger/event_processing_latencies", + "knative.dev/eventing/trigger/event_dispatch_latencies", + ) + + // KnativeBrokerLabels stores the set of resource labels for resource type knative_broker. + KnativeBrokerLabels = sets.NewString( + LabelProject, + LabelLocation, + LabelClusterName, + LabelNamespaceName, + LabelBrokerName, + ) + + // KnativeBrokerMetrics stores a set of metric types which are supported + // by resource type knative_trigger. + KnativeBrokerMetrics = sets.NewString( + "knative.dev/eventing/broker/event_count", + ) + + // KnativeImporterLabels stores the set of resource labels for resource type knative_importer. + KnativeImporterLabels = sets.NewString( + LabelProject, + LabelLocation, + LabelClusterName, + LabelNamespaceName, + LabelImporterName, + LabelImporterResourceGroup, + ) + + // KnativeImporterMetrics stores a set of metric types which are supported + // by resource type knative_importer. + KnativeImporterMetrics = sets.NewString( + "knative.dev/eventing/importer/event_count", + ) +) diff --git a/vendor/knative.dev/pkg/metrics/metricskey/constants_serving.go b/vendor/knative.dev/pkg/metrics/metricskey/constants_serving.go new file mode 100644 index 00000000000..2b50dda770d --- /dev/null +++ b/vendor/knative.dev/pkg/metrics/metricskey/constants_serving.go @@ -0,0 +1,68 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricskey + +import "k8s.io/apimachinery/pkg/util/sets" + +// TODO should be moved to serving. See https://github.com/knative/pkg/issues/608 + +const ( + // ResourceTypeKnativeRevision is the Stackdriver resource type for Knative revision + ResourceTypeKnativeRevision = "knative_revision" + + // LabelServiceName is the label for the deployed service name + LabelServiceName = "service_name" + + // LabelRouteName is the label for immutable name of the route that receives the request + LabelRouteName = "route_name" + + // LabelConfigurationName is the label for the configuration which created the monitored revision + LabelConfigurationName = "configuration_name" + + // LabelRevisionName is the label for the monitored revision + LabelRevisionName = "revision_name" +) + +var ( + // KnativeRevisionLabels stores the set of resource labels for resource type knative_revision. + // LabelRouteName is added as extra label since it is optional, not in this map. + KnativeRevisionLabels = sets.NewString( + LabelProject, + LabelLocation, + LabelClusterName, + LabelNamespaceName, + LabelServiceName, + LabelConfigurationName, + LabelRevisionName, + ) + + // KnativeRevisionMetrics stores a set of metric types which are supported + // by resource type knative_revision. + KnativeRevisionMetrics = sets.NewString( + "knative.dev/serving/activator/request_count", + "knative.dev/serving/activator/request_latencies", + "knative.dev/serving/autoscaler/desired_pods", + "knative.dev/serving/autoscaler/requested_pods", + "knative.dev/serving/autoscaler/actual_pods", + "knative.dev/serving/autoscaler/stable_request_concurrency", + "knative.dev/serving/autoscaler/panic_request_concurrency", + "knative.dev/serving/autoscaler/target_concurrency_per_pod", + "knative.dev/serving/autoscaler/panic_mode", + "knative.dev/serving/revision/request_count", + "knative.dev/serving/revision/request_latencies", + ) +) diff --git a/vendor/knative.dev/pkg/metrics/monitored_resources.go b/vendor/knative.dev/pkg/metrics/monitored_resources.go index d8ab5d875d9..d8034efc925 100644 --- a/vendor/knative.dev/pkg/metrics/monitored_resources.go +++ b/vendor/knative.dev/pkg/metrics/monitored_resources.go @@ -1,9 +1,12 @@ /* Copyright 2018 The Knative Authors + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,40 +17,27 @@ limitations under the License. package metrics import ( + "go.opencensus.io/tag" "knative.dev/pkg/metrics/metricskey" ) -type gcpMetadata struct { - project string - location string - cluster string -} +type Global struct{} -type KnativeRevision struct { - Project string - Location string - ClusterName string - NamespaceName string - ServiceName string - ConfigurationName string - RevisionName string +func (g *Global) MonitoredResource() (resType string, labels map[string]string) { + return "global", nil } -func (kr *KnativeRevision) MonitoredResource() (resType string, labels map[string]string) { - labels = map[string]string{ - metricskey.LabelProject: kr.Project, - metricskey.LabelLocation: kr.Location, - metricskey.LabelClusterName: kr.ClusterName, - metricskey.LabelNamespaceName: kr.NamespaceName, - metricskey.LabelServiceName: kr.ServiceName, - metricskey.LabelConfigurationName: kr.ConfigurationName, - metricskey.LabelRevisionName: kr.RevisionName, +func getTagsMap(tags []tag.Tag) map[string]string { + tagsMap := map[string]string{} + for _, t := range tags { + tagsMap[t.Key.Name()] = t.Value } - return "knative_revision", labels + return tagsMap } -type Global struct{} - -func (g *Global) MonitoredResource() (resType string, labels map[string]string) { - return "global", nil +func valueOrUnknown(key string, tagsMap map[string]string) string { + if value, ok := tagsMap[key]; ok { + return value + } + return metricskey.ValueUnknown } diff --git a/vendor/knative.dev/pkg/metrics/monitored_resources_eventing.go b/vendor/knative.dev/pkg/metrics/monitored_resources_eventing.go new file mode 100644 index 00000000000..30208ca9598 --- /dev/null +++ b/vendor/knative.dev/pkg/metrics/monitored_resources_eventing.go @@ -0,0 +1,163 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +// TODO should be moved to eventing. See https://github.com/knative/pkg/issues/608 + +import ( + "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "knative.dev/pkg/metrics/metricskey" +) + +type KnativeTrigger struct { + Project string + Location string + ClusterName string + NamespaceName string + TriggerName string + BrokerName string + TypeFilterAttribute string + SourceFilterAttribute string +} + +type KnativeBroker struct { + Project string + Location string + ClusterName string + NamespaceName string + BrokerName string +} + +type KnativeImporter struct { + Project string + Location string + ClusterName string + NamespaceName string + ImporterName string + ImporterResourceGroup string +} + +func (kt *KnativeTrigger) MonitoredResource() (resType string, labels map[string]string) { + labels = map[string]string{ + metricskey.LabelProject: kt.Project, + metricskey.LabelLocation: kt.Location, + metricskey.LabelClusterName: kt.ClusterName, + metricskey.LabelNamespaceName: kt.NamespaceName, + metricskey.LabelTriggerName: kt.TriggerName, + metricskey.LabelBrokerName: kt.BrokerName, + } + return metricskey.ResourceTypeKnativeTrigger, labels +} + +func (kb *KnativeBroker) MonitoredResource() (resType string, labels map[string]string) { + labels = map[string]string{ + metricskey.LabelProject: kb.Project, + metricskey.LabelLocation: kb.Location, + metricskey.LabelClusterName: kb.ClusterName, + metricskey.LabelNamespaceName: kb.NamespaceName, + metricskey.LabelBrokerName: kb.BrokerName, + } + return metricskey.ResourceTypeKnativeBroker, labels +} + +func (ki *KnativeImporter) MonitoredResource() (resType string, labels map[string]string) { + labels = map[string]string{ + metricskey.LabelProject: ki.Project, + metricskey.LabelLocation: ki.Location, + metricskey.LabelClusterName: ki.ClusterName, + metricskey.LabelNamespaceName: ki.NamespaceName, + metricskey.LabelImporterName: ki.ImporterName, + metricskey.LabelImporterResourceGroup: ki.ImporterResourceGroup, + } + return metricskey.ResourceTypeKnativeImporter, labels +} + +func GetKnativeBrokerMonitoredResource( + v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { + tagsMap := getTagsMap(tags) + kb := &KnativeBroker{ + // The first three resource labels are from metadata. + Project: gm.project, + Location: gm.location, + ClusterName: gm.cluster, + // The rest resource labels are from metrics labels. + NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), + BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap), + } + + var newTags []tag.Tag + for _, t := range tags { + // Keep the metrics labels that are not resource labels + if !metricskey.KnativeBrokerLabels.Has(t.Key.Name()) { + newTags = append(newTags, t) + } + } + + return newTags, kb +} + +func GetKnativeTriggerMonitoredResource( + v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { + tagsMap := getTagsMap(tags) + kt := &KnativeTrigger{ + // The first three resource labels are from metadata. + Project: gm.project, + Location: gm.location, + ClusterName: gm.cluster, + // The rest resource labels are from metrics labels. + NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), + TriggerName: valueOrUnknown(metricskey.LabelTriggerName, tagsMap), + BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap), + } + + var newTags []tag.Tag + for _, t := range tags { + // Keep the metrics labels that are not resource labels + if !metricskey.KnativeTriggerLabels.Has(t.Key.Name()) { + newTags = append(newTags, t) + } + } + + return newTags, kt +} + +func GetKnativeImporterMonitoredResource( + v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { + tagsMap := getTagsMap(tags) + ki := &KnativeImporter{ + // The first three resource labels are from metadata. + Project: gm.project, + Location: gm.location, + ClusterName: gm.cluster, + // The rest resource labels are from metrics labels. + NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), + ImporterName: valueOrUnknown(metricskey.LabelImporterName, tagsMap), + ImporterResourceGroup: valueOrUnknown(metricskey.LabelImporterResourceGroup, tagsMap), + } + + var newTags []tag.Tag + for _, t := range tags { + // Keep the metrics labels that are not resource labels + if !metricskey.KnativeImporterLabels.Has(t.Key.Name()) { + newTags = append(newTags, t) + } + } + + return newTags, ki +} diff --git a/vendor/knative.dev/pkg/metrics/monitored_resources_serving.go b/vendor/knative.dev/pkg/metrics/monitored_resources_serving.go new file mode 100644 index 00000000000..b2a1d33f5c6 --- /dev/null +++ b/vendor/knative.dev/pkg/metrics/monitored_resources_serving.go @@ -0,0 +1,75 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "knative.dev/pkg/metrics/metricskey" +) + +// TODO should be moved to serving. See https://github.com/knative/pkg/issues/608 + +type KnativeRevision struct { + Project string + Location string + ClusterName string + NamespaceName string + ServiceName string + ConfigurationName string + RevisionName string +} + +func (kr *KnativeRevision) MonitoredResource() (resType string, labels map[string]string) { + labels = map[string]string{ + metricskey.LabelProject: kr.Project, + metricskey.LabelLocation: kr.Location, + metricskey.LabelClusterName: kr.ClusterName, + metricskey.LabelNamespaceName: kr.NamespaceName, + metricskey.LabelServiceName: kr.ServiceName, + metricskey.LabelConfigurationName: kr.ConfigurationName, + metricskey.LabelRevisionName: kr.RevisionName, + } + return metricskey.ResourceTypeKnativeRevision, labels +} + +func GetKnativeRevisionMonitoredResource( + v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { + tagsMap := getTagsMap(tags) + kr := &KnativeRevision{ + // The first three resource labels are from metadata. + Project: gm.project, + Location: gm.location, + ClusterName: gm.cluster, + // The rest resource labels are from metrics labels. + NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), + ServiceName: valueOrUnknown(metricskey.LabelServiceName, tagsMap), + ConfigurationName: valueOrUnknown(metricskey.LabelConfigurationName, tagsMap), + RevisionName: valueOrUnknown(metricskey.LabelRevisionName, tagsMap), + } + + var newTags []tag.Tag + for _, t := range tags { + // Keep the metrics labels that are not resource labels + if !metricskey.KnativeRevisionLabels.Has(t.Key.Name()) { + newTags = append(newTags, t) + } + } + + return newTags, kr +} diff --git a/vendor/knative.dev/pkg/metrics/record.go b/vendor/knative.dev/pkg/metrics/record.go index 1b045ea0ab7..87e67fc3564 100644 --- a/vendor/knative.dev/pkg/metrics/record.go +++ b/vendor/knative.dev/pkg/metrics/record.go @@ -24,6 +24,9 @@ import ( "knative.dev/pkg/metrics/metricskey" ) +// TODO should be properly refactored and pieces should move to eventing and serving, as appropriate. +// See https://github.com/knative/pkg/issues/608 + // Record decides whether to record one measurement via OpenCensus based on the // following conditions: // 1) No package level metrics config. In this case it just proxies to OpenCensus @@ -32,7 +35,8 @@ import ( // using this function to get expected behavior. // 2) The backend is not Stackdriver. // 3) The backend is Stackdriver and it is allowed to use custom metrics. -// 4) The backend is Stackdriver and the metric is "knative_revison" built-in metric. +// 4) The backend is Stackdriver and the metric is one of the built-in metrics: "knative_revision", "knative_broker", +// "knative_trigger", "knative_importer". func Record(ctx context.Context, ms stats.Measurement) { mc := getCurMetricsConfig() @@ -50,7 +54,12 @@ func Record(ctx context.Context, ms stats.Measurement) { // Condition 4) metricType := path.Join(mc.stackdriverMetricTypePrefix, ms.Measure().Name()) - if metricskey.KnativeRevisionMetrics.Has(metricType) { + isServingBuiltIn := metricskey.KnativeRevisionMetrics.Has(metricType) + isEventingBuiltIn := metricskey.KnativeTriggerMetrics.Has(metricType) || + metricskey.KnativeBrokerMetrics.Has(metricType) || + metricskey.KnativeImporterMetrics.Has(metricType) + + if isServingBuiltIn || isEventingBuiltIn { stats.Record(ctx, ms) } } diff --git a/vendor/knative.dev/pkg/metrics/stackdriver_exporter.go b/vendor/knative.dev/pkg/metrics/stackdriver_exporter.go index 6d2a8cab8e4..80f830216a9 100644 --- a/vendor/knative.dev/pkg/metrics/stackdriver_exporter.go +++ b/vendor/knative.dev/pkg/metrics/stackdriver_exporter.go @@ -1,9 +1,12 @@ /* Copyright 2019 The Knative Authors + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -55,6 +58,8 @@ func newOpencensusSDExporter(o stackdriver.Options) (view.Exporter, error) { return stackdriver.NewExporter(o) } +// TODO should be properly refactored to be able to inject the getMonitoredResourceFunc function. +// See https://github.com/knative/pkg/issues/608 func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.Exporter, error) { gm := gcpMetadataFunc() mtf := getMetricTypeFunc(config.stackdriverMetricTypePrefix, config.stackdriverCustomMetricTypePrefix) @@ -77,54 +82,19 @@ func getMonitoredResourceFunc(metricTypePrefix string, gm *gcpMetadata) func(v * return func(view *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) { metricType := path.Join(metricTypePrefix, view.Measure.Name()) if metricskey.KnativeRevisionMetrics.Has(metricType) { - return getKnativeRevisionMonitoredResource(view, tags, gm) + return GetKnativeRevisionMonitoredResource(view, tags, gm) + } else if metricskey.KnativeBrokerMetrics.Has(metricType) { + return GetKnativeBrokerMonitoredResource(view, tags, gm) + } else if metricskey.KnativeTriggerMetrics.Has(metricType) { + return GetKnativeTriggerMonitoredResource(view, tags, gm) + } else if metricskey.KnativeImporterMetrics.Has(metricType) { + return GetKnativeImporterMonitoredResource(view, tags, gm) } - // Unsupported metric by knative_revision, use "global" resource type. + // Unsupported metric by knative_revision, knative_broker, knative_trigger, and knative_importer, use "global" resource type. return getGlobalMonitoredResource(view, tags) } } -func getKnativeRevisionMonitoredResource( - v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { - tagsMap := getTagsMap(tags) - kr := &KnativeRevision{ - // The first three resource labels are from metadata. - Project: gm.project, - Location: gm.location, - ClusterName: gm.cluster, - // The rest resource labels are from metrics labels. - NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), - ServiceName: valueOrUnknown(metricskey.LabelServiceName, tagsMap), - ConfigurationName: valueOrUnknown(metricskey.LabelConfigurationName, tagsMap), - RevisionName: valueOrUnknown(metricskey.LabelRevisionName, tagsMap), - } - - var newTags []tag.Tag - for _, t := range tags { - // Keep the metrics labels that are not resource labels - if !metricskey.KnativeRevisionLabels.Has(t.Key.Name()) { - newTags = append(newTags, t) - } - } - - return newTags, kr -} - -func getTagsMap(tags []tag.Tag) map[string]string { - tagsMap := map[string]string{} - for _, t := range tags { - tagsMap[t.Key.Name()] = t.Value - } - return tagsMap -} - -func valueOrUnknown(key string, tagsMap map[string]string) string { - if value, ok := tagsMap[key]; ok { - return value - } - return metricskey.ValueUnknown -} - func getGlobalMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) { return tags, &Global{} } @@ -132,7 +102,11 @@ func getGlobalMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, monito func getMetricTypeFunc(metricTypePrefix, customMetricTypePrefix string) func(view *view.View) string { return func(view *view.View) string { metricType := path.Join(metricTypePrefix, view.Measure.Name()) - if metricskey.KnativeRevisionMetrics.Has(metricType) { + inServing := metricskey.KnativeRevisionMetrics.Has(metricType) + inEventing := metricskey.KnativeBrokerMetrics.Has(metricType) || + metricskey.KnativeTriggerMetrics.Has(metricType) || + metricskey.KnativeImporterMetrics.Has(metricType) + if inServing || inEventing { return metricType } // Unsupported metric by knative_revision, use custom domain. From 54553ce2f3efefda1a916bcd99f71fade7e21785 Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 30 Aug 2019 11:49:38 -0700 Subject: [PATCH 6/9] updating master --- Gopkg.lock | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 30c77b7b198..91d73b1cc5c 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1174,7 +1174,7 @@ [[projects]] branch = "master" - digest = "1:c695f3415658f521f10bd3c3ed7f9a86433eafb2ca554ba64c2160d05c9633b0" + digest = "1:4dbfbd630beb854049c6d436ef47e1e6f07abb75cf66b130cc44dc4a7ce45078" name = "knative.dev/pkg" packages = [ "apis", @@ -1255,7 +1255,7 @@ "webhook", ] pruneopts = "T" - revision = "0c1c6db1df9ea2343a6dbf1290e4325682fcf85d" + revision = "ec2f20ae67fbbccfac425061f2c7c089d1932105" [[projects]] branch = "master" @@ -1386,6 +1386,7 @@ "knative.dev/pkg/logging/logkey", "knative.dev/pkg/logging/testing", "knative.dev/pkg/metrics", + "knative.dev/pkg/metrics/metricskey", "knative.dev/pkg/metrics/metricstest", "knative.dev/pkg/reconciler/testing", "knative.dev/pkg/signals", From 17d6a0088b76cd430d3b4c4788d1f58553a20abd Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 30 Aug 2019 12:53:56 -0700 Subject: [PATCH 7/9] fixes https://github.com/knative/eventing/issues/1753 --- pkg/broker/filter/filter_handler.go | 9 ++++++--- pkg/broker/ingress/ingress_handler.go | 3 ++- pkg/broker/metrics.go | 5 ++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 7727f6bbc6b..9f26b0955b5 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -238,9 +238,12 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC // Record the event processing time. This might be off if the receiver and the filter pods are running in // different nodes with different clocks. - var arrivalTime time.Time - if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTime); extErr == nil { - r.reporter.ReportEventProcessingTime(reportArgs, err, time.Since(arrivalTime)) + var arrivalTimeStr string + if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTimeStr); extErr == nil { + arrivalTime, err := time.Parse(time.RFC3339, arrivalTimeStr) + if err != nil { + r.reporter.ReportEventProcessingTime(reportArgs, err, time.Since(arrivalTime)) + } } start := time.Now() diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index bca8814690b..1c52d7f9db9 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -62,7 +62,8 @@ func (h *Handler) Start(ctx context.Context) error { } func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { - event.SetExtension(broker.EventArrivalTime, time.Now()) + // Setting the extension as a string as the CloudEvents sdk does not support non-string extensions. + event.SetExtension(broker.EventArrivalTime, time.Now().String()) tctx := cloudevents.HTTPTransportContextFrom(ctx) if tctx.Method != http.MethodPost { resp.Status = http.StatusMethodNotAllowed diff --git a/pkg/broker/metrics.go b/pkg/broker/metrics.go index db3ab6d3c22..74b6b423f7e 100644 --- a/pkg/broker/metrics.go +++ b/pkg/broker/metrics.go @@ -19,9 +19,8 @@ package broker const ( // EventArrivalTime is used to access the metadata stored on a // CloudEvent to measure the time difference between when an event is - // received on a broker and when it is dispatched to the trigger function. - // Should be set using time.Now(), which returns the current local time. - // The format is: 2019-08-26T23:38:17.834384404Z. + // received on a broker and before it is dispatched to the trigger function. + // The format is an RFC3339 time in string format. For example: 2019-08-26T23:38:17.834384404Z. EventArrivalTime = "knativearrivaltime" // TraceParent is a documented extension for CloudEvent to include traces. From c1d948a746498ddbdee3e21d9aee9befb622eb64 Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 30 Aug 2019 12:55:03 -0700 Subject: [PATCH 8/9] updating TODOs --- cmd/broker/filter/main.go | 1 + cmd/broker/ingress/main.go | 1 + 2 files changed, 2 insertions(+) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index a6a88f056b8..e7ef63af845 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -105,6 +105,7 @@ func main() { // TODO watch logging config map. + // TODO change the component name to trigger once Stackdriver metrics are approved. // Watch the observability config map and dynamically update metrics exporter. cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_filter", logger.Sugar())) diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index feb7407cdd7..9f3302dcd62 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -92,6 +92,7 @@ func main() { // TODO watch logging config map. + // TODO change the component name to broker once Stackdriver metrics are approved. // Watch the observability config map and dynamically update metrics exporter. cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", logger.Sugar())) From e8d3067b8205cde3afb4da0d0ecc9de9c950cea2 Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 30 Aug 2019 12:58:01 -0700 Subject: [PATCH 9/9] update --- pkg/broker/ingress/ingress_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 1c52d7f9db9..acd7639b549 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -63,7 +63,7 @@ func (h *Handler) Start(ctx context.Context) error { func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { // Setting the extension as a string as the CloudEvents sdk does not support non-string extensions. - event.SetExtension(broker.EventArrivalTime, time.Now().String()) + event.SetExtension(broker.EventArrivalTime, time.Now().Format(time.RFC3339)) tctx := cloudevents.HTTPTransportContextFrom(ctx) if tctx.Method != http.MethodPost { resp.Status = http.StatusMethodNotAllowed