From eb1f0a9b6d53e1502ccfcb57f640f4c7466532f2 Mon Sep 17 00:00:00 2001 From: nachocano Date: Wed, 21 Aug 2019 17:39:45 -0700 Subject: [PATCH 1/4] generic infra for metrics, either prometheus or stackdriver for now. --- cmd/broker/filter/main.go | 62 ++++++-------------------------------- cmd/broker/ingress/main.go | 62 ++++++-------------------------------- 2 files changed, 20 insertions(+), 104 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 1bd2ac23c8d..efb484e9d81 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -18,15 +18,7 @@ package main import ( "flag" - "fmt" - "log" - "net/http" - "sync" - "time" - - "contrib.go.opencensus.io/exporter/prometheus" "github.com/kelseyhightower/envconfig" - "go.opencensus.io/stats/view" "go.uber.org/zap" "go.uber.org/zap/zapcore" "k8s.io/client-go/kubernetes" @@ -34,8 +26,8 @@ import ( "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/provisioners" "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/configmap" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -47,21 +39,11 @@ type envConfig struct { Namespace string `envconfig:"NAMESPACE" required:"true"` } -var ( - metricsPort = 9090 - - writeTimeout = 1 * time.Minute - shutdownTimeout = 1 * time.Minute - - wg sync.WaitGroup -) - func main() { logConfig := provisioners.NewLoggingConfig() logConfig.LoggingLevel["provisioner"] = zapcore.DebugLevel logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar() - defer logger.Sync() - + defer flush(logger) flag.Parse() logger.Info("Starting...") @@ -101,32 +83,13 @@ func main() { } err = mgr.Add(receiver) if err != nil { - logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("receiver", receiver)) + logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("broker_receiver", receiver)) } - // Metrics - e, err := prometheus.NewExporter(prometheus.Options{}) - if err != nil { - logger.Fatal("Unable to create Prometheus exporter", zap.Error(err)) - } - view.RegisterExporter(e) - sm := http.NewServeMux() - sm.Handle("/metrics", e) - metricsSrv := &http.Server{ - Addr: fmt.Sprintf(":%d", metricsPort), - Handler: e, - ErrorLog: zap.NewStdLog(logger), - WriteTimeout: writeTimeout, - } + // TODO watch logging config map. - err = mgr.Add(&utils.RunnableServer{ - Server: metricsSrv, - ShutdownTimeout: shutdownTimeout, - WaitGroup: &wg, - }) - if err != nil { - logger.Fatal("Unable to add metrics runnableServer", zap.Error(err)) - } + // Watch the observability config map and dynamically update metrics exporter. + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_receiver", logger.Sugar())) // Set up signals so we handle the first shutdown signal gracefully. stopCh := signals.SetupSignalHandler() @@ -143,14 +106,9 @@ func main() { logger.Fatal("Manager.Start() returned an error", zap.Error(err)) } logger.Info("Exiting...") +} - go func() { - <-time.After(shutdownTimeout) - log.Fatalf("Shutdown took longer than %v", shutdownTimeout) - }() - - // Wait for runnables to stop. This blocks indefinitely, but the above - // goroutine will exit the process if it takes longer than shutdownTimeout. - wg.Wait() - logger.Info("Done.") +func flush(logger *zap.Logger) { + logger.Sync() + metrics.FlushExporter() } diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 5c5d8c4b5c3..6d96cfd07a5 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -18,28 +18,21 @@ package main import ( "flag" - "fmt" - "log" + "knative.dev/pkg/metrics" "net/http" "net/url" - "sync" - "time" - // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "contrib.go.opencensus.io/exporter/prometheus" - cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "github.com/kelseyhightower/envconfig" - "go.opencensus.io/stats/view" "go.uber.org/zap" "k8s.io/client-go/kubernetes" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/broker/ingress" "knative.dev/eventing/pkg/provisioners" "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/configmap" "knative.dev/pkg/signals" "knative.dev/pkg/system" @@ -55,19 +48,10 @@ type envConfig struct { Namespace string `envconfig:"NAMESPACE" required:"true"` } -var ( - metricsPort = 9090 - - writeTimeout = 1 * time.Minute - shutdownTimeout = 1 * time.Minute - - wg sync.WaitGroup -) - func main() { logConfig := provisioners.NewLoggingConfig() logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar() - defer logger.Sync() + defer flush(logger) flag.Parse() crlog.SetLogger(crlog.ZapLogger(false)) @@ -132,29 +116,10 @@ func main() { logger.Fatal("Unable to add handler", zap.Error(err)) } - // Metrics - e, err := prometheus.NewExporter(prometheus.Options{}) - if err != nil { - logger.Fatal("Unable to create Prometheus exporter", zap.Error(err)) - } - view.RegisterExporter(e) - sm := http.NewServeMux() - sm.Handle("/metrics", e) - metricsSrv := &http.Server{ - Addr: fmt.Sprintf(":%d", metricsPort), - Handler: e, - ErrorLog: zap.NewStdLog(logger), - WriteTimeout: writeTimeout, - } + // TODO watch logging config map. - err = mgr.Add(&utils.RunnableServer{ - Server: metricsSrv, - ShutdownTimeout: shutdownTimeout, - WaitGroup: &wg, - }) - if err != nil { - logger.Fatal("Unable to add metrics runnableServer", zap.Error(err)) - } + // Watch the observability config map and dynamically update metrics exporter. + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", logger.Sugar())) // Set up signals so we handle the first shutdown signal gracefully. stopCh := signals.SetupSignalHandler() @@ -169,16 +134,9 @@ func main() { logger.Error("manager.Start() returned an error", zap.Error(err)) } logger.Info("Exiting...") +} - // TODO Gracefully shutdown the ingress server. CloudEvents SDK doesn't seem - // to let us do that today. - go func() { - <-time.After(shutdownTimeout) - log.Fatalf("Shutdown took longer than %v", shutdownTimeout) - }() - - // Wait for runnables to stop. This blocks indefinitely, but the above - // goroutine will exit the process if it takes longer than shutdownTimeout. - wg.Wait() - logger.Info("Done.") +func flush(logger *zap.Logger) { + logger.Sync() + metrics.FlushExporter() } From 9a8cbb40e94e26265c3ec51f91e89df6a347d0f6 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Wed, 21 Aug 2019 22:11:16 -0700 Subject: [PATCH 2/4] adding metrics domain --- pkg/reconciler/broker/broker_test.go | 8 ++++++++ pkg/reconciler/broker/resources/filter.go | 4 ++++ pkg/reconciler/broker/resources/ingress.go | 4 ++++ 3 files changed, 16 insertions(+) diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 28b56fa863d..4e35af68156 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -884,6 +884,10 @@ func envVars(containerName string) []corev1.EnvVar { Name: "BROKER", Value: brokerName, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, } case ingressContainerName: return []corev1.EnvVar{ @@ -911,6 +915,10 @@ func envVars(containerName string) []corev1.EnvVar { Name: "BROKER", Value: brokerName, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, } } return []corev1.EnvVar{} diff --git a/pkg/reconciler/broker/resources/filter.go b/pkg/reconciler/broker/resources/filter.go index 6fbb267e384..ecca79458ed 100644 --- a/pkg/reconciler/broker/resources/filter.go +++ b/pkg/reconciler/broker/resources/filter.go @@ -99,6 +99,10 @@ func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment { Name: "BROKER", Value: args.Broker.Name, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, }, Ports: []corev1.ContainerPort{ { diff --git a/pkg/reconciler/broker/resources/ingress.go b/pkg/reconciler/broker/resources/ingress.go index 1245ca965df..b229634de7f 100644 --- a/pkg/reconciler/broker/resources/ingress.go +++ b/pkg/reconciler/broker/resources/ingress.go @@ -98,6 +98,10 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment { Name: "BROKER", Value: args.Broker.Name, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, }, Ports: []corev1.ContainerPort{ { From e997cc9244957eb64ac9cea795a2efe1df1e9125 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 22 Aug 2019 14:47:46 -0700 Subject: [PATCH 3/4] done --- cmd/broker/filter/main.go | 1 + cmd/broker/ingress/main.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index efb484e9d81..8165f6bcd41 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "github.com/kelseyhightower/envconfig" "go.uber.org/zap" "go.uber.org/zap/zapcore" diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 6d96cfd07a5..78838cbc718 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -18,13 +18,13 @@ package main import ( "flag" - "knative.dev/pkg/metrics" "net/http" "net/url" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" @@ -34,6 +34,7 @@ import ( "knative.dev/eventing/pkg/provisioners" "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/configmap" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" pkgtracing "knative.dev/pkg/tracing" From 69e9ac0cc839f8296dbfa979cb8c93033dd75640 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 22 Aug 2019 15:31:23 -0700 Subject: [PATCH 4/4] updating lock --- Gopkg.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/Gopkg.lock b/Gopkg.lock index f4d6f957870..5a5b5a50eff 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1304,7 +1304,6 @@ analyzer-name = "dep" analyzer-version = 1 input-imports = [ - "contrib.go.opencensus.io/exporter/prometheus", "github.com/cloudevents/sdk-go", "github.com/cloudevents/sdk-go/pkg/cloudevents", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http",