From c2a9e3e13bd963967a6eeccb3f7c924699baef6f Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 19 Feb 2021 16:21:11 +0200 Subject: [PATCH 1/5] remove global metrics config lock --- metrics/exporter.go | 75 ++++++++++++++++-------------- metrics/metrics_worker.go | 97 +++++++++++++++++++++++++++++++++++++++ metrics/testing.go | 1 + 3 files changed, 138 insertions(+), 35 deletions(-) create mode 100644 metrics/metrics_worker.go diff --git a/metrics/exporter.go b/metrics/exporter.go index b98a18de1f..81bc7b667f 100644 --- a/metrics/exporter.go +++ b/metrics/exporter.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "strings" - "sync" "go.opencensus.io/resource" "go.opencensus.io/stats/view" @@ -32,9 +31,21 @@ import ( var ( curMetricsExporter view.Exporter curMetricsConfig *metricsConfig - metricsMux sync.RWMutex + mWorker *metricsWorker ) +func init() { + setupMetricsWorker() +} + +func setupMetricsWorker() { + // No lock is needed because we only need this for the testing initialization + if mWorker == nil { + mWorker = newMetricsWorker() + go mWorker.start() + } +} + // SecretFetcher is a function (extracted from SecretNamespaceLister) for fetching // a specific Secret. This avoids requiring global or namespace list in controllers. type SecretFetcher func(string) (*corev1.Secret, error) @@ -154,28 +165,14 @@ func UpdateExporter(ctx context.Context, ops ExporterOptions, logger *zap.Sugare // Updating the metrics config and the metrics exporters needs to be atomic to // avoid using an outdated metrics config with new exporters. - metricsMux.Lock() - defer metricsMux.Unlock() - - if isNewExporterRequired(newConfig) { - logger.Info("Flushing the existing exporter before setting up the new exporter.") - flushGivenExporter(curMetricsExporter) - e, f, err := newMetricsExporter(newConfig, logger) - if err != nil { - logger.Errorw("Failed to update a new metrics exporter based on metric config", zap.Error(err), "config", newConfig) - return err - } - existingConfig := curMetricsConfig - curMetricsExporter = e - if err := setFactory(f); err != nil { - logger.Errorw("Failed to update metrics factory when loading metric config", zap.Error(err), "config", newConfig) - return err - } - logger.Infof("Successfully updated the metrics exporter; old config: %v; new config %v", existingConfig, newConfig) + updateCmd := &updateMetricsConfigWithExporter{ + ctx: ctx, + newConfig: newConfig, + done: make(chan error), } - - setCurMetricsConfigUnlocked(newConfig) - return nil + mWorker.c <- updateCmd + err = <-updateCmd.done + return err } // isNewExporterRequired compares the non-nil newConfig against curMetricsConfig. When backend changes, @@ -228,27 +225,35 @@ func newMetricsExporter(config *metricsConfig, logger *zap.SugaredLogger) (view. } func getCurMetricsExporter() view.Exporter { - metricsMux.RLock() - defer metricsMux.RUnlock() - return curMetricsExporter + readCmd := &readExporter{done: make(chan *view.Exporter)} + mWorker.c <- readCmd + e := <-readCmd.done + return *e } func setCurMetricsExporter(e view.Exporter) { - metricsMux.Lock() - defer metricsMux.Unlock() - curMetricsExporter = e + setCmd := &setExporter{ + newExporter: &e, + done: make(chan struct{}), + } + mWorker.c <- setCmd + <-setCmd.done } func getCurMetricsConfig() *metricsConfig { - metricsMux.RLock() - defer metricsMux.RUnlock() - return curMetricsConfig + readCmd := &readMetricsConfig{done: make(chan *metricsConfig)} + mWorker.c <- readCmd + cfg := <-readCmd.done + return cfg } func setCurMetricsConfig(c *metricsConfig) { - metricsMux.Lock() - defer metricsMux.Unlock() - setCurMetricsConfigUnlocked(c) + setCmd := &setMetricsConfig{ + newConfig: c, + done: make(chan struct{}), + } + mWorker.c <- setCmd + <-setCmd.done } func setCurMetricsConfigUnlocked(c *metricsConfig) { diff --git a/metrics/metrics_worker.go b/metrics/metrics_worker.go new file mode 100644 index 0000000000..b9a77ccb6f --- /dev/null +++ b/metrics/metrics_worker.go @@ -0,0 +1,97 @@ +package metrics + +import ( + "context" + + "go.opencensus.io/stats/view" + "go.uber.org/zap" + "knative.dev/pkg/logging" +) + +type metricsWorker struct { + c chan command +} + +func newMetricsWorker() *metricsWorker { + return &metricsWorker{c: make(chan command)} +} + +type command interface { + handleCommand(w *metricsWorker) +} + +type readExporter struct { + done chan *view.Exporter +} + +type setExporter struct { + newExporter *view.Exporter + done chan struct{} +} + +type readMetricsConfig struct { + done chan *metricsConfig +} + +type updateMetricsConfigWithExporter struct { + ctx context.Context + newConfig *metricsConfig + done chan error +} + +type setMetricsConfig struct { + newConfig *metricsConfig + done chan struct{} +} + +func (cmd *readMetricsConfig) handleCommand(w *metricsWorker) { + cmd.done <- curMetricsConfig +} + +func (cmd *setMetricsConfig) handleCommand(w *metricsWorker) { + setCurMetricsConfigUnlocked(cmd.newConfig) + cmd.done <- struct{}{} +} + +func (cmd *updateMetricsConfigWithExporter) handleCommand(w *metricsWorker) { + ctx := cmd.ctx + logger := logging.FromContext(ctx) + if isNewExporterRequired(cmd.newConfig) { + logger.Info("Flushing the existing exporter before setting up the new exporter.") + flushGivenExporter(curMetricsExporter) + e, f, err := newMetricsExporter(cmd.newConfig, logger) + if err != nil { + logger.Errorw("Failed to update a new metrics exporter based on metric config", zap.Error(err), "config", cmd.newConfig) + cmd.done <- err + return + } + existingConfig := curMetricsConfig + curMetricsExporter = e + if err := setFactory(f); err != nil { + logger.Errorw("Failed to update metrics factory when loading metric config", zap.Error(err), "config", cmd.newConfig) + cmd.done <- err + return + } + logger.Infof("Successfully updated the metrics exporter; old config: %v; new config %v", existingConfig, cmd.newConfig) + } + setCurMetricsConfigUnlocked(cmd.newConfig) + cmd.done <- nil +} + +func (w *metricsWorker) start() { + for { + select { + case cmd := <-w.c: + cmd.handleCommand(w) + } + } +} + +func (cmd *setExporter) handleCommand(w *metricsWorker) { + curMetricsExporter = *cmd.newExporter + cmd.done <- struct{}{} +} + +func (cmd *readExporter) handleCommand(w *metricsWorker) { + cmd.done <- &curMetricsExporter +} diff --git a/metrics/testing.go b/metrics/testing.go index 28b2e58780..2e7328b765 100644 --- a/metrics/testing.go +++ b/metrics/testing.go @@ -22,6 +22,7 @@ const ( // InitForTesting initialize the necessary global variables for unit tests. func InitForTesting() { + setupMetricsWorker() setCurMetricsConfig(&metricsConfig{ backendDestination: prometheus, component: "test", From 4d0c2985861a8fb6564caf2add114f351db83228 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 19 Feb 2021 16:27:23 +0200 Subject: [PATCH 2/5] add header --- metrics/metrics_worker.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/metrics/metrics_worker.go b/metrics/metrics_worker.go index b9a77ccb6f..b532991ef3 100644 --- a/metrics/metrics_worker.go +++ b/metrics/metrics_worker.go @@ -1,3 +1,18 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package metrics import ( From 9f0785f20eb044034174458a070e59ded79c819d Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 19 Feb 2021 16:33:30 +0200 Subject: [PATCH 3/5] add space --- metrics/metrics_worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metrics/metrics_worker.go b/metrics/metrics_worker.go index b532991ef3..14859a4332 100644 --- a/metrics/metrics_worker.go +++ b/metrics/metrics_worker.go @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package metrics import ( From 3ca75a4ce21924b0693b63212e5dd36b08862987 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 19 Feb 2021 16:58:21 +0200 Subject: [PATCH 4/5] lint --- metrics/metrics_worker.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/metrics/metrics_worker.go b/metrics/metrics_worker.go index 14859a4332..0b7753acb2 100644 --- a/metrics/metrics_worker.go +++ b/metrics/metrics_worker.go @@ -96,10 +96,8 @@ func (cmd *updateMetricsConfigWithExporter) handleCommand(w *metricsWorker) { func (w *metricsWorker) start() { for { - select { - case cmd := <-w.c: - cmd.handleCommand(w) - } + cmd := <-w.c + cmd.handleCommand(w) } } From 908eed3c54455b53bcd848c73ba4ac6f7fef6ab5 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 19 Feb 2021 20:29:38 +0200 Subject: [PATCH 5/5] remove metrics worker setup func --- metrics/exporter.go | 11 ++--------- metrics/testing.go | 1 - 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/metrics/exporter.go b/metrics/exporter.go index 81bc7b667f..3fc4966fdd 100644 --- a/metrics/exporter.go +++ b/metrics/exporter.go @@ -35,15 +35,8 @@ var ( ) func init() { - setupMetricsWorker() -} - -func setupMetricsWorker() { - // No lock is needed because we only need this for the testing initialization - if mWorker == nil { - mWorker = newMetricsWorker() - go mWorker.start() - } + mWorker = newMetricsWorker() + go mWorker.start() } // SecretFetcher is a function (extracted from SecretNamespaceLister) for fetching diff --git a/metrics/testing.go b/metrics/testing.go index 2e7328b765..28b2e58780 100644 --- a/metrics/testing.go +++ b/metrics/testing.go @@ -22,7 +22,6 @@ const ( // InitForTesting initialize the necessary global variables for unit tests. func InitForTesting() { - setupMetricsWorker() setCurMetricsConfig(&metricsConfig{ backendDestination: prometheus, component: "test",