diff --git a/Gopkg.lock b/Gopkg.lock index f4d6f957870..5a5b5a50eff 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1304,7 +1304,6 @@ analyzer-name = "dep" analyzer-version = 1 input-imports = [ - "contrib.go.opencensus.io/exporter/prometheus", "github.com/cloudevents/sdk-go", "github.com/cloudevents/sdk-go/pkg/cloudevents", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 1bd2ac23c8d..8165f6bcd41 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -18,15 +18,8 @@ package main import ( "flag" - "fmt" - "log" - "net/http" - "sync" - "time" - "contrib.go.opencensus.io/exporter/prometheus" "github.com/kelseyhightower/envconfig" - "go.opencensus.io/stats/view" "go.uber.org/zap" "go.uber.org/zap/zapcore" "k8s.io/client-go/kubernetes" @@ -34,8 +27,8 @@ import ( "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/provisioners" "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/configmap" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -47,21 +40,11 @@ type envConfig struct { Namespace string `envconfig:"NAMESPACE" required:"true"` } -var ( - metricsPort = 9090 - - writeTimeout = 1 * time.Minute - shutdownTimeout = 1 * time.Minute - - wg sync.WaitGroup -) - func main() { logConfig := provisioners.NewLoggingConfig() logConfig.LoggingLevel["provisioner"] = zapcore.DebugLevel logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar() - defer logger.Sync() - + defer flush(logger) flag.Parse() logger.Info("Starting...") @@ -101,32 +84,13 @@ func main() { } err = mgr.Add(receiver) if err != nil { - logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("receiver", receiver)) + logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("broker_receiver", receiver)) } - // Metrics - e, err := prometheus.NewExporter(prometheus.Options{}) - if err != nil { - logger.Fatal("Unable to create Prometheus exporter", zap.Error(err)) - } - view.RegisterExporter(e) - sm := http.NewServeMux() - sm.Handle("/metrics", e) - metricsSrv := &http.Server{ - Addr: fmt.Sprintf(":%d", metricsPort), - Handler: e, - ErrorLog: zap.NewStdLog(logger), - WriteTimeout: writeTimeout, - } + // TODO watch logging config map. - err = mgr.Add(&utils.RunnableServer{ - Server: metricsSrv, - ShutdownTimeout: shutdownTimeout, - WaitGroup: &wg, - }) - if err != nil { - logger.Fatal("Unable to add metrics runnableServer", zap.Error(err)) - } + // Watch the observability config map and dynamically update metrics exporter. + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_receiver", logger.Sugar())) // Set up signals so we handle the first shutdown signal gracefully. stopCh := signals.SetupSignalHandler() @@ -143,14 +107,9 @@ func main() { logger.Fatal("Manager.Start() returned an error", zap.Error(err)) } logger.Info("Exiting...") +} - go func() { - <-time.After(shutdownTimeout) - log.Fatalf("Shutdown took longer than %v", shutdownTimeout) - }() - - // Wait for runnables to stop. This blocks indefinitely, but the above - // goroutine will exit the process if it takes longer than shutdownTimeout. - wg.Wait() - logger.Info("Done.") +func flush(logger *zap.Logger) { + logger.Sync() + metrics.FlushExporter() } diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 5c5d8c4b5c3..78838cbc718 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -18,29 +18,23 @@ package main import ( "flag" - "fmt" - "log" "net/http" "net/url" - "sync" - "time" // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "contrib.go.opencensus.io/exporter/prometheus" cloudevents "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "github.com/kelseyhightower/envconfig" - "go.opencensus.io/stats/view" "go.uber.org/zap" "k8s.io/client-go/kubernetes" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/broker/ingress" "knative.dev/eventing/pkg/provisioners" "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/configmap" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" pkgtracing "knative.dev/pkg/tracing" @@ -55,19 +49,10 @@ type envConfig struct { Namespace string `envconfig:"NAMESPACE" required:"true"` } -var ( - metricsPort = 9090 - - writeTimeout = 1 * time.Minute - shutdownTimeout = 1 * time.Minute - - wg sync.WaitGroup -) - func main() { logConfig := provisioners.NewLoggingConfig() logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar() - defer logger.Sync() + defer flush(logger) flag.Parse() crlog.SetLogger(crlog.ZapLogger(false)) @@ -132,29 +117,10 @@ func main() { logger.Fatal("Unable to add handler", zap.Error(err)) } - // Metrics - e, err := prometheus.NewExporter(prometheus.Options{}) - if err != nil { - logger.Fatal("Unable to create Prometheus exporter", zap.Error(err)) - } - view.RegisterExporter(e) - sm := http.NewServeMux() - sm.Handle("/metrics", e) - metricsSrv := &http.Server{ - Addr: fmt.Sprintf(":%d", metricsPort), - Handler: e, - ErrorLog: zap.NewStdLog(logger), - WriteTimeout: writeTimeout, - } + // TODO watch logging config map. - err = mgr.Add(&utils.RunnableServer{ - Server: metricsSrv, - ShutdownTimeout: shutdownTimeout, - WaitGroup: &wg, - }) - if err != nil { - logger.Fatal("Unable to add metrics runnableServer", zap.Error(err)) - } + // Watch the observability config map and dynamically update metrics exporter. + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", logger.Sugar())) // Set up signals so we handle the first shutdown signal gracefully. stopCh := signals.SetupSignalHandler() @@ -169,16 +135,9 @@ func main() { logger.Error("manager.Start() returned an error", zap.Error(err)) } logger.Info("Exiting...") +} - // TODO Gracefully shutdown the ingress server. CloudEvents SDK doesn't seem - // to let us do that today. - go func() { - <-time.After(shutdownTimeout) - log.Fatalf("Shutdown took longer than %v", shutdownTimeout) - }() - - // Wait for runnables to stop. This blocks indefinitely, but the above - // goroutine will exit the process if it takes longer than shutdownTimeout. - wg.Wait() - logger.Info("Done.") +func flush(logger *zap.Logger) { + logger.Sync() + metrics.FlushExporter() } diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 28b56fa863d..4e35af68156 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -884,6 +884,10 @@ func envVars(containerName string) []corev1.EnvVar { Name: "BROKER", Value: brokerName, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, } case ingressContainerName: return []corev1.EnvVar{ @@ -911,6 +915,10 @@ func envVars(containerName string) []corev1.EnvVar { Name: "BROKER", Value: brokerName, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, } } return []corev1.EnvVar{} diff --git a/pkg/reconciler/broker/resources/filter.go b/pkg/reconciler/broker/resources/filter.go index 6fbb267e384..ecca79458ed 100644 --- a/pkg/reconciler/broker/resources/filter.go +++ b/pkg/reconciler/broker/resources/filter.go @@ -99,6 +99,10 @@ func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment { Name: "BROKER", Value: args.Broker.Name, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, }, Ports: []corev1.ContainerPort{ { diff --git a/pkg/reconciler/broker/resources/ingress.go b/pkg/reconciler/broker/resources/ingress.go index 1245ca965df..b229634de7f 100644 --- a/pkg/reconciler/broker/resources/ingress.go +++ b/pkg/reconciler/broker/resources/ingress.go @@ -98,6 +98,10 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment { Name: "BROKER", Value: args.Broker.Name, }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, }, Ports: []corev1.ContainerPort{ {