diff --git a/metrics/exporter.go b/metrics/exporter.go index b98a18de1f..3fc4966fdd 100644 --- a/metrics/exporter.go +++ b/metrics/exporter.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "strings" - "sync" "go.opencensus.io/resource" "go.opencensus.io/stats/view" @@ -32,9 +31,14 @@ import ( var ( curMetricsExporter view.Exporter curMetricsConfig *metricsConfig - metricsMux sync.RWMutex + mWorker *metricsWorker ) +func init() { + mWorker = newMetricsWorker() + go mWorker.start() +} + // SecretFetcher is a function (extracted from SecretNamespaceLister) for fetching // a specific Secret. This avoids requiring global or namespace list in controllers. type SecretFetcher func(string) (*corev1.Secret, error) @@ -154,28 +158,14 @@ func UpdateExporter(ctx context.Context, ops ExporterOptions, logger *zap.Sugare // Updating the metrics config and the metrics exporters needs to be atomic to // avoid using an outdated metrics config with new exporters. - metricsMux.Lock() - defer metricsMux.Unlock() - - if isNewExporterRequired(newConfig) { - logger.Info("Flushing the existing exporter before setting up the new exporter.") - flushGivenExporter(curMetricsExporter) - e, f, err := newMetricsExporter(newConfig, logger) - if err != nil { - logger.Errorw("Failed to update a new metrics exporter based on metric config", zap.Error(err), "config", newConfig) - return err - } - existingConfig := curMetricsConfig - curMetricsExporter = e - if err := setFactory(f); err != nil { - logger.Errorw("Failed to update metrics factory when loading metric config", zap.Error(err), "config", newConfig) - return err - } - logger.Infof("Successfully updated the metrics exporter; old config: %v; new config %v", existingConfig, newConfig) + updateCmd := &updateMetricsConfigWithExporter{ + ctx: ctx, + newConfig: newConfig, + done: make(chan error), } - - setCurMetricsConfigUnlocked(newConfig) - return nil + mWorker.c <- updateCmd + err = <-updateCmd.done + return err } // isNewExporterRequired compares the non-nil newConfig against curMetricsConfig. When backend changes, @@ -228,27 +218,35 @@ func newMetricsExporter(config *metricsConfig, logger *zap.SugaredLogger) (view. } func getCurMetricsExporter() view.Exporter { - metricsMux.RLock() - defer metricsMux.RUnlock() - return curMetricsExporter + readCmd := &readExporter{done: make(chan *view.Exporter)} + mWorker.c <- readCmd + e := <-readCmd.done + return *e } func setCurMetricsExporter(e view.Exporter) { - metricsMux.Lock() - defer metricsMux.Unlock() - curMetricsExporter = e + setCmd := &setExporter{ + newExporter: &e, + done: make(chan struct{}), + } + mWorker.c <- setCmd + <-setCmd.done } func getCurMetricsConfig() *metricsConfig { - metricsMux.RLock() - defer metricsMux.RUnlock() - return curMetricsConfig + readCmd := &readMetricsConfig{done: make(chan *metricsConfig)} + mWorker.c <- readCmd + cfg := <-readCmd.done + return cfg } func setCurMetricsConfig(c *metricsConfig) { - metricsMux.Lock() - defer metricsMux.Unlock() - setCurMetricsConfigUnlocked(c) + setCmd := &setMetricsConfig{ + newConfig: c, + done: make(chan struct{}), + } + mWorker.c <- setCmd + <-setCmd.done } func setCurMetricsConfigUnlocked(c *metricsConfig) { diff --git a/metrics/metrics_worker.go b/metrics/metrics_worker.go new file mode 100644 index 0000000000..0b7753acb2 --- /dev/null +++ b/metrics/metrics_worker.go @@ -0,0 +1,111 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + + "go.opencensus.io/stats/view" + "go.uber.org/zap" + "knative.dev/pkg/logging" +) + +type metricsWorker struct { + c chan command +} + +func newMetricsWorker() *metricsWorker { + return &metricsWorker{c: make(chan command)} +} + +type command interface { + handleCommand(w *metricsWorker) +} + +type readExporter struct { + done chan *view.Exporter +} + +type setExporter struct { + newExporter *view.Exporter + done chan struct{} +} + +type readMetricsConfig struct { + done chan *metricsConfig +} + +type updateMetricsConfigWithExporter struct { + ctx context.Context + newConfig *metricsConfig + done chan error +} + +type setMetricsConfig struct { + newConfig *metricsConfig + done chan struct{} +} + +func (cmd *readMetricsConfig) handleCommand(w *metricsWorker) { + cmd.done <- curMetricsConfig +} + +func (cmd *setMetricsConfig) handleCommand(w *metricsWorker) { + setCurMetricsConfigUnlocked(cmd.newConfig) + cmd.done <- struct{}{} +} + +func (cmd *updateMetricsConfigWithExporter) handleCommand(w *metricsWorker) { + ctx := cmd.ctx + logger := logging.FromContext(ctx) + if isNewExporterRequired(cmd.newConfig) { + logger.Info("Flushing the existing exporter before setting up the new exporter.") + flushGivenExporter(curMetricsExporter) + e, f, err := newMetricsExporter(cmd.newConfig, logger) + if err != nil { + logger.Errorw("Failed to update a new metrics exporter based on metric config", zap.Error(err), "config", cmd.newConfig) + cmd.done <- err + return + } + existingConfig := curMetricsConfig + curMetricsExporter = e + if err := setFactory(f); err != nil { + logger.Errorw("Failed to update metrics factory when loading metric config", zap.Error(err), "config", cmd.newConfig) + cmd.done <- err + return + } + logger.Infof("Successfully updated the metrics exporter; old config: %v; new config %v", existingConfig, cmd.newConfig) + } + setCurMetricsConfigUnlocked(cmd.newConfig) + cmd.done <- nil +} + +func (w *metricsWorker) start() { + for { + cmd := <-w.c + cmd.handleCommand(w) + } +} + +func (cmd *setExporter) handleCommand(w *metricsWorker) { + curMetricsExporter = *cmd.newExporter + cmd.done <- struct{}{} +} + +func (cmd *readExporter) handleCommand(w *metricsWorker) { + cmd.done <- &curMetricsExporter +}