From 21512c5522860e22c5e921ea8a27c9d4a25972f6 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Thu, 18 Aug 2022 09:38:24 -0400 Subject: [PATCH 1/8] Support delta metrics which are counters with an in-memory store Signed-off-by: Kyle Eckhart --- collectors/delta_counter_store.go | 23 +++ collectors/in_memory_delta_counter_store.go | 149 ++++++++++++++++++++ collectors/monitoring_collector.go | 42 ++++-- collectors/monitoring_metrics.go | 78 ++++++++-- stackdriver_exporter.go | 3 +- 5 files changed, 273 insertions(+), 22 deletions(-) create mode 100644 collectors/delta_counter_store.go create mode 100644 collectors/in_memory_delta_counter_store.go diff --git a/collectors/delta_counter_store.go b/collectors/delta_counter_store.go new file mode 100644 index 00000000..2263db01 --- /dev/null +++ b/collectors/delta_counter_store.go @@ -0,0 +1,23 @@ +package collectors + +import ( + "time" + + "google.golang.org/api/monitoring/v3" +) + +type CollectedMetric struct { + metric *ConstMetric + lastCollectedAt time.Time +} + +type MetricDescriptor struct { + name string + description string +} + +type DeltaCounterStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) + ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric + ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor +} diff --git a/collectors/in_memory_delta_counter_store.go b/collectors/in_memory_delta_counter_store.go new file mode 100644 index 00000000..01092d57 --- /dev/null +++ b/collectors/in_memory_delta_counter_store.go @@ -0,0 +1,149 @@ +package collectors + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" +) + +type metricEntry struct { + collectedMetrics map[uint64]*CollectedMetric + lastListedAt time.Time + description string +} + +type inMemoryDeltaCounterStore struct { + store map[string]*metricEntry + ttl time.Duration + storeMutex *sync.RWMutex + logger log.Logger +} + +func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { + return inMemoryDeltaCounterStore{ + store: map[string]*metricEntry{}, + storeMutex: &sync.RWMutex{}, + logger: logger, + ttl: ttl, + } +} + +func (s inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { + if currentValue == nil { + return + } + key := toKey(currentValue) + + var metric *metricEntry + s.storeMutex.Lock() + if _, exists := s.store[metricDescriptor.Name]; !exists { + s.store[metricDescriptor.Name] = &metricEntry{ + collectedMetrics: map[uint64]*CollectedMetric{}, + lastListedAt: time.Time{}, + } + } + metric = s.store[metricDescriptor.Name] + s.storeMutex.Unlock() + + if _, exists := metric.collectedMetrics[key]; !exists { + level.Debug(s.logger).Log("msg", "Tracking new value", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) + metric.collectedMetrics[key] = &CollectedMetric{currentValue, time.Now()} + } else if metric.collectedMetrics[key].metric.reportTime.Before(currentValue.reportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing value", "fqName", currentValue.fqName, "key", key, "current_value", metric.collectedMetrics[key].metric.value, "adding", currentValue.value, "last_reported_time", metric.collectedMetrics[key].metric.reportTime, "incoming_time", currentValue.reportTime) + currentValue.value = currentValue.value + metric.collectedMetrics[key].metric.value + metric.collectedMetrics[key].metric = currentValue + metric.collectedMetrics[key].lastCollectedAt = time.Now() + } else { + level.Debug(s.logger).Log("msg", "Ignoring repeat sample", "fqName", currentValue.fqName, "key", key, "last_reported_time", metric.collectedMetrics[key].metric.reportTime, "incoming_time", currentValue.reportTime) + } +} + +func toKey(c *ConstMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, c.labelKeys...) + for i := range c.labelKeys { + labels[c.labelKeys[i]] = c.labelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", c.fqName, strings.Join(keyParts, "|")) + h := hashNew() + h = hashAdd(h, hashText) + + return h +} + +func (s inMemoryDeltaCounterStore) ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric { + output := map[string][]*CollectedMetric{} + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + s.storeMutex.Lock() + metric := s.store[metricDescriptorName] + if metric == nil { + s.storeMutex.Unlock() + return output + } + metric.lastListedAt = now + s.storeMutex.Unlock() + + for key, collectedMetric := range metric.collectedMetrics { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collectedMetric.lastCollectedAt) { + delete(metric.collectedMetrics, key) + continue + } + + metrics, exists := output[collectedMetric.metric.fqName] + if !exists { + metrics = make([]*CollectedMetric, 0) + } + thing := *collectedMetric.metric + outputCopy := CollectedMetric{ + metric: &thing, + lastCollectedAt: collectedMetric.lastCollectedAt, + } + output[collectedMetric.metric.fqName] = append(metrics, &outputCopy) + } + + return output +} + +func (s inMemoryDeltaCounterStore) ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor { + level.Debug(s.logger).Log("msg", "Listing metrics not collected", "since", since) + var names []MetricDescriptor + ttlWindowStart := time.Now().Add(-s.ttl) + + s.storeMutex.Lock() + defer s.storeMutex.Unlock() + + for name, metrics := range s.store { + //Scan and remove metrics which are outside the TTL + for key, collectedMetric := range metrics.collectedMetrics { + if ttlWindowStart.After(collectedMetric.lastCollectedAt) { + delete(metrics.collectedMetrics, key) + } + } + + if len(metrics.collectedMetrics) == 0 { + delete(s.store, name) + continue + } + + if since.After(metrics.lastListedAt) { + names = append(names, MetricDescriptor{name: name, description: metrics.description}) + } + } + + return names +} diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index 80390e14..f09f43dd 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -52,6 +52,7 @@ type MonitoringCollector struct { collectorFillMissingLabels bool monitoringDropDelegatedProjects bool logger log.Logger + deltaMetricStore DeltaCounterStore } type MonitoringCollectorOptions struct { @@ -75,7 +76,7 @@ type MonitoringCollectorOptions struct { DropDelegatedProjects bool } -func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger) (*MonitoringCollector, error) { +func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, deltaStore DeltaCounterStore) (*MonitoringCollector, error) { apiCallsTotalMetric := prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "stackdriver", @@ -153,6 +154,7 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv collectorFillMissingLabels: opts.FillMissingLabels, monitoringDropDelegatedProjects: opts.DropDelegatedProjects, logger: logger, + deltaMetricStore: deltaStore, } return monitoringCollector, nil @@ -171,7 +173,7 @@ func (c *MonitoringCollector) Collect(ch chan<- prometheus.Metric) { var begun = time.Now() errorMetric := float64(0) - if err := c.reportMonitoringMetrics(ch); err != nil { + if err := c.reportMonitoringMetrics(ch, begun); err != nil { errorMetric = float64(1) c.scrapeErrorsTotalMetric.Inc() level.Error(c.logger).Log("msg", "Error while getting Google Stackdriver Monitoring metrics", "err", err) @@ -193,7 +195,7 @@ func (c *MonitoringCollector) Collect(ch chan<- prometheus.Metric) { c.lastScrapeDurationSecondsMetric.Collect(ch) } -func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metric) error { +func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metric, begun time.Time) error { metricDescriptorsFunction := func(page *monitoring.ListMetricDescriptorsResponse) error { var wg = &sync.WaitGroup{} @@ -270,8 +272,8 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri if page == nil { break } - if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch); err != nil { - level.Error(c.logger).Log("msg", "error reporting Time Series metrics for descripto", "descriptor", metricDescriptor.Type, "err", err) + if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch, c.deltaMetricStore, begun); err != nil { + level.Error(c.logger).Log("msg", "error reporting Time Series metrics for descriptor", "descriptor", metricDescriptor.Type, "err", err) errChannel <- err break } @@ -317,6 +319,21 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri wg.Wait() close(errChannel) + names := c.deltaMetricStore.ListMetricDescriptorsNotCollected(begun) + for _, descriptor := range names { + level.Debug(c.logger).Log("msg", "Exporting uncollected delta counter", "metric_descriptor_name", descriptor.name) + ts := TimeSeriesMetrics{ + metricDescriptor: &monitoring.MetricDescriptor{Name: descriptor.name, Description: descriptor.description}, + ch: ch, + fillMissingLabels: c.collectorFillMissingLabels, + constMetrics: nil, + histogramMetrics: nil, + deltaMetricStore: c.deltaMetricStore, + } + ts.Complete(begun) + } + + level.Debug(c.logger).Log("msg", "Done reporting monitoring metrics") return <-errChannel } @@ -324,6 +341,8 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( page *monitoring.ListTimeSeriesResponse, metricDescriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric, + deltaMetricStore DeltaCounterStore, + begun time.Time, ) error { var metricValue float64 var metricValueType prometheus.ValueType @@ -333,8 +352,9 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( metricDescriptor: metricDescriptor, ch: ch, fillMissingLabels: c.collectorFillMissingLabels, - constMetrics: make(map[string][]ConstMetric), + constMetrics: make(map[string][]*ConstMetric), histogramMetrics: make(map[string][]HistogramMetric), + deltaMetricStore: deltaMetricStore, } for _, timeSeries := range page.TimeSeries { newestEndTime := time.Unix(0, 0) @@ -388,7 +408,7 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( case "GAUGE": metricValueType = prometheus.GaugeValue case "DELTA": - metricValueType = prometheus.GaugeValue + metricValueType = prometheus.CounterValue case "CUMULATIVE": metricValueType = prometheus.CounterValue default: @@ -419,9 +439,13 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( continue } - timeSeriesMetrics.CollectNewConstMetric(timeSeries, newestEndTime, labelKeys, metricValueType, metricValue, labelValues) + if timeSeries.MetricKind == "DELTA" { + timeSeriesMetrics.CollectNewDeltaMetric(timeSeries, newestEndTime, labelKeys, metricValue, labelValues) + } else { + timeSeriesMetrics.CollectNewConstMetric(timeSeries, newestEndTime, labelKeys, metricValueType, metricValue, labelValues) + } } - timeSeriesMetrics.Complete() + timeSeriesMetrics.Complete(begun) return nil } diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 96aee6e6..18e89ad3 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -37,8 +37,10 @@ type TimeSeriesMetrics struct { ch chan<- prometheus.Metric fillMissingLabels bool - constMetrics map[string][]ConstMetric + constMetrics map[string][]*ConstMetric histogramMetrics map[string][]HistogramMetric + + deltaMetricStore DeltaCounterStore } func (t *TimeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *prometheus.Desc { @@ -109,13 +111,30 @@ func (t *TimeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Tim ) } +func (t *TimeSeriesMetrics) CollectNewDeltaMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValue float64, labelValues []string) { + fqName := buildFQName(timeSeries) + + v := ConstMetric{ + fqName: fqName, + labelKeys: labelKeys, + valueType: prometheus.CounterValue, + value: metricValue, + labelValues: labelValues, + reportTime: reportTime, + + keysHash: hashLabelKeys(labelKeys), + } + + t.deltaMetricStore.Increment(t.metricDescriptor, &v) +} + func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) { fqName := buildFQName(timeSeries) if t.fillMissingLabels { vs, ok := t.constMetrics[fqName] if !ok { - vs = make([]ConstMetric, 0) + vs = make([]*ConstMetric, 0) } v := ConstMetric{ fqName: fqName, @@ -127,7 +146,7 @@ func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer keysHash: hashLabelKeys(labelKeys), } - t.constMetrics[fqName] = append(vs, v) + t.constMetrics[fqName] = append(vs, &v) return } t.ch <- t.newConstMetric(fqName, reportTime, labelKeys, metricValueType, metricValue, labelValues) @@ -157,13 +176,14 @@ func hashLabelKeys(labelKeys []string) uint64 { return dh } -func (t *TimeSeriesMetrics) Complete() { - t.completeConstMetrics() +func (t *TimeSeriesMetrics) Complete(reportingStartTime time.Time) { + t.completeDeltaMetrics(reportingStartTime) + t.completeConstMetrics(t.constMetrics) t.completeHistogramMetrics() } -func (t *TimeSeriesMetrics) completeConstMetrics() { - for _, vs := range t.constMetrics { +func (t *TimeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*ConstMetric) { + for _, vs := range constMetrics { if len(vs) > 1 { var needFill bool for i := 1; i < len(vs); i++ { @@ -201,15 +221,50 @@ func (t *TimeSeriesMetrics) completeHistogramMetrics() { } } -func fillConstMetricsLabels(metrics []ConstMetric) []ConstMetric { +func (t *TimeSeriesMetrics) completeDeltaMetrics(reportingStartTime time.Time) { + descriptorMetrics := t.deltaMetricStore.ListMetricsByName(t.metricDescriptor.Name) + now := time.Now().Truncate(time.Minute) + + constMetrics := map[string][]*ConstMetric{} + for _, metrics := range descriptorMetrics { + for _, collected := range metrics { + // If the metric wasn't collected we still need to export it to keep the counter from going stale + if reportingStartTime.After(collected.lastCollectedAt) { + reportingLag := collected.lastCollectedAt.Sub(collected.metric.reportTime).Truncate(time.Minute) + collected.metric.reportTime = now.Add(-reportingLag) + } + if t.fillMissingLabels { + if _, exists := constMetrics[collected.metric.fqName]; !exists { + constMetrics[collected.metric.fqName] = []*ConstMetric{} + } + constMetrics[collected.metric.fqName] = append(constMetrics[collected.metric.fqName], collected.metric) + } else { + t.ch <- t.newConstMetric( + collected.metric.fqName, + collected.metric.reportTime, + collected.metric.labelKeys, + collected.metric.valueType, + collected.metric.value, + collected.metric.labelValues, + ) + } + } + } + + if t.fillMissingLabels { + t.completeConstMetrics(constMetrics) + } +} + +func fillConstMetricsLabels(metrics []*ConstMetric) []*ConstMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { for _, key := range metric.labelKeys { allKeys[key] = struct{}{} } } - result := make([]ConstMetric, len(metrics)) - for i, metric := range metrics { + + for _, metric := range metrics { if len(metric.labelKeys) != len(allKeys) { metricKeys := make(map[string]struct{}) for _, key := range metric.labelKeys { @@ -222,10 +277,9 @@ func fillConstMetricsLabels(metrics []ConstMetric) []ConstMetric { } } } - result[i] = metric } - return result + return metrics } func fillHistogramMetricsLabels(metrics []HistogramMetric) []HistogramMetric { diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 06b61311..f25dc137 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -18,6 +18,7 @@ import ( "net/http" "os" "strings" + "time" "github.com/PuerkitoBio/rehttp" "github.com/go-kit/log" @@ -191,7 +192,7 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { IngestDelay: *monitoringMetricsIngestDelay, FillMissingLabels: *collectorFillMissingLabels, DropDelegatedProjects: *monitoringDropDelegatedProjects, - }, h.logger) + }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, time.Hour)) if err != nil { level.Error(h.logger).Log("err", err) os.Exit(1) From 9c541a8677124b1b5f3df07e945dddf7d50c4ce1 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Thu, 18 Aug 2022 20:08:41 -0400 Subject: [PATCH 2/8] Introduce a delta distribution store and refactor existing store to counter store Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 186 ++++++++++++++++++ collectors/delta_counter_store.go | 23 --- collectors/delta_distribution.go | 201 ++++++++++++++++++++ collectors/in_memory_delta_counter_store.go | 149 --------------- collectors/monitoring_collector.go | 52 +++-- collectors/monitoring_metrics.go | 131 ++++++++----- stackdriver_exporter.go | 4 +- 7 files changed, 506 insertions(+), 240 deletions(-) create mode 100644 collectors/delta_counter.go delete mode 100644 collectors/delta_counter_store.go create mode 100644 collectors/delta_distribution.go delete mode 100644 collectors/in_memory_delta_counter_store.go diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go new file mode 100644 index 00000000..9db04d10 --- /dev/null +++ b/collectors/delta_counter.go @@ -0,0 +1,186 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collectors + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" +) + +type CollectedMetric struct { + metric *ConstMetric + lastCollectedAt time.Time +} + +type MetricDescriptor struct { + name string + description string +} + +type DeltaCounterStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) + ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric + ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor +} + +type metricEntry struct { + collected map[uint64]*CollectedMetric + lastListedAt time.Time + description string +} + +type inMemoryDeltaCounterStore struct { + store map[string]*metricEntry + ttl time.Duration + storeMutex *sync.RWMutex + logger log.Logger +} + +func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { + return inMemoryDeltaCounterStore{ + store: map[string]*metricEntry{}, + storeMutex: &sync.RWMutex{}, + logger: logger, + ttl: ttl, + } +} + +func (s inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { + if currentValue == nil { + return + } + + var metric *metricEntry + s.storeMutex.Lock() + if _, exists := s.store[metricDescriptor.Name]; !exists { + s.store[metricDescriptor.Name] = &metricEntry{ + collected: map[uint64]*CollectedMetric{}, + lastListedAt: time.Time{}, + } + } + metric = s.store[metricDescriptor.Name] + s.storeMutex.Unlock() + + key := toCounterKey(currentValue) + existing := metric.collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) + metric.collected[key] = &CollectedMetric{currentValue, time.Now()} + return + } + + if existing.metric.reportTime.Before(currentValue.reportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", metric.collected[key].metric.reportTime, "incoming_time", currentValue.reportTime) + currentValue.value = currentValue.value + existing.metric.value + existing.metric = currentValue + existing.lastCollectedAt = time.Now() + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.metric.reportTime, "incoming_time", currentValue.reportTime) +} + +func toCounterKey(c *ConstMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, c.labelKeys...) + for i := range c.labelKeys { + labels[c.labelKeys[i]] = c.labelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", c.fqName, strings.Join(keyParts, "|")) + h := hashNew() + h = hashAdd(h, hashText) + + return h +} + +func (s inMemoryDeltaCounterStore) ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric { + output := map[string][]*CollectedMetric{} + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + s.storeMutex.Lock() + metric := s.store[metricDescriptorName] + if metric == nil { + s.storeMutex.Unlock() + return output + } + metric.lastListedAt = now + s.storeMutex.Unlock() + + for key, collected := range metric.collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.lastCollectedAt) { + level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.fqName) + delete(metric.collected, key) + continue + } + + metrics, exists := output[collected.metric.fqName] + if !exists { + metrics = make([]*CollectedMetric, 0) + } + metricCopy := *collected.metric + outputEntry := CollectedMetric{ + metric: &metricCopy, + lastCollectedAt: collected.lastCollectedAt, + } + output[collected.metric.fqName] = append(metrics, &outputEntry) + } + + return output +} + +func (s inMemoryDeltaCounterStore) ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor { + var names []MetricDescriptor + ttlWindowStart := time.Now().Add(-s.ttl) + + s.storeMutex.Lock() + defer s.storeMutex.Unlock() + + for name, metrics := range s.store { + //Scan and remove metrics which are outside the TTL + for key, collectedMetric := range metrics.collected { + if ttlWindowStart.After(collectedMetric.lastCollectedAt) { + level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collectedMetric.metric.fqName) + delete(metrics.collected, key) + } + } + + if len(metrics.collected) == 0 { + level.Debug(s.logger).Log("msg", "Deleting empty descriptor store entry", "metric_descriptor_name", name) + delete(s.store, name) + continue + } + + if since.After(metrics.lastListedAt) { + names = append(names, MetricDescriptor{name: name, description: metrics.description}) + } + } + + return names +} diff --git a/collectors/delta_counter_store.go b/collectors/delta_counter_store.go deleted file mode 100644 index 2263db01..00000000 --- a/collectors/delta_counter_store.go +++ /dev/null @@ -1,23 +0,0 @@ -package collectors - -import ( - "time" - - "google.golang.org/api/monitoring/v3" -) - -type CollectedMetric struct { - metric *ConstMetric - lastCollectedAt time.Time -} - -type MetricDescriptor struct { - name string - description string -} - -type DeltaCounterStore interface { - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) - ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric - ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor -} diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go new file mode 100644 index 00000000..f7055f19 --- /dev/null +++ b/collectors/delta_distribution.go @@ -0,0 +1,201 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collectors + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" +) + +type CollectedHistogram struct { + histogram *HistogramMetric + lastCollectedAt time.Time +} + +type DeltaDistributionStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) + ListMetricsByName(metricDescriptorName string) map[string][]*CollectedHistogram + ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor +} + +type histogramEntry struct { + collected map[uint64]*CollectedHistogram + lastListedAt time.Time + description string +} + +type inMemoryDeltaDistributionStore struct { + store map[string]*histogramEntry + ttl time.Duration + storeMutex *sync.RWMutex + logger log.Logger +} + +func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { + return inMemoryDeltaDistributionStore{ + store: map[string]*histogramEntry{}, + storeMutex: &sync.RWMutex{}, + logger: logger, + ttl: ttl, + } +} + +func (s inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { + if currentValue == nil { + return + } + + var entry *histogramEntry + s.storeMutex.Lock() + if _, exists := s.store[metricDescriptor.Name]; !exists { + s.store[metricDescriptor.Name] = &histogramEntry{ + collected: map[uint64]*CollectedHistogram{}, + lastListedAt: time.Time{}, + } + } + entry = s.store[metricDescriptor.Name] + s.storeMutex.Unlock() + + key := toHistogramKey(currentValue) + existing := entry.collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime) + entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} + return + } + + if existing.histogram.reportTime.Before(currentValue.reportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime) + existing.histogram = mergeHistograms(existing.histogram, currentValue) + existing.lastCollectedAt = time.Now() + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime) +} + +func toHistogramKey(hist *HistogramMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, hist.labelKeys...) + for i := range hist.labelKeys { + labels[hist.labelKeys[i]] = hist.labelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", hist.fqName, strings.Join(keyParts, "|")) + h := hashNew() + h = hashAdd(h, hashText) + + return h +} + +func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *HistogramMetric { + for key, value := range existing.buckets { + current.buckets[key] += value + } + + // Calculate a new mean and overall count + mean := existing.dist.Mean + mean += current.dist.Mean + mean /= 2 + + var count uint64 + for _, v := range current.buckets { + count += v + } + + current.dist.Mean = mean + current.dist.Count = int64(count) + + return current +} + +func (s inMemoryDeltaDistributionStore) ListMetricsByName(metricDescriptorName string) map[string][]*CollectedHistogram { + output := map[string][]*CollectedHistogram{} + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + s.storeMutex.Lock() + metric := s.store[metricDescriptorName] + if metric == nil { + s.storeMutex.Unlock() + return output + } + metric.lastListedAt = now + s.storeMutex.Unlock() + + for key, collected := range metric.collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.lastCollectedAt) { + level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName) + delete(metric.collected, key) + continue + } + + metrics, exists := output[collected.histogram.fqName] + if !exists { + metrics = make([]*CollectedHistogram, 0) + } + histCopy := *collected.histogram + outputEntry := CollectedHistogram{ + histogram: &histCopy, + lastCollectedAt: collected.lastCollectedAt, + } + output[collected.histogram.fqName] = append(metrics, &outputEntry) + } + + return output +} + +func (s inMemoryDeltaDistributionStore) ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor { + var names []MetricDescriptor + ttlWindowStart := time.Now().Add(-s.ttl) + + s.storeMutex.Lock() + defer s.storeMutex.Unlock() + + for name, metrics := range s.store { + //Scan and remove metrics which are outside the TTL + for key, collectedMetric := range metrics.collected { + if ttlWindowStart.After(collectedMetric.lastCollectedAt) { + level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collectedMetric.histogram.fqName) + delete(metrics.collected, key) + } + } + + if len(metrics.collected) == 0 { + level.Debug(s.logger).Log("msg", "Deleting empty descriptor store entry", "metric_descriptor_name", name) + delete(s.store, name) + continue + } + + if since.After(metrics.lastListedAt) { + names = append(names, MetricDescriptor{name: name, description: metrics.description}) + } + } + + return names +} diff --git a/collectors/in_memory_delta_counter_store.go b/collectors/in_memory_delta_counter_store.go deleted file mode 100644 index 01092d57..00000000 --- a/collectors/in_memory_delta_counter_store.go +++ /dev/null @@ -1,149 +0,0 @@ -package collectors - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" -) - -type metricEntry struct { - collectedMetrics map[uint64]*CollectedMetric - lastListedAt time.Time - description string -} - -type inMemoryDeltaCounterStore struct { - store map[string]*metricEntry - ttl time.Duration - storeMutex *sync.RWMutex - logger log.Logger -} - -func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { - return inMemoryDeltaCounterStore{ - store: map[string]*metricEntry{}, - storeMutex: &sync.RWMutex{}, - logger: logger, - ttl: ttl, - } -} - -func (s inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { - if currentValue == nil { - return - } - key := toKey(currentValue) - - var metric *metricEntry - s.storeMutex.Lock() - if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = &metricEntry{ - collectedMetrics: map[uint64]*CollectedMetric{}, - lastListedAt: time.Time{}, - } - } - metric = s.store[metricDescriptor.Name] - s.storeMutex.Unlock() - - if _, exists := metric.collectedMetrics[key]; !exists { - level.Debug(s.logger).Log("msg", "Tracking new value", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) - metric.collectedMetrics[key] = &CollectedMetric{currentValue, time.Now()} - } else if metric.collectedMetrics[key].metric.reportTime.Before(currentValue.reportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing value", "fqName", currentValue.fqName, "key", key, "current_value", metric.collectedMetrics[key].metric.value, "adding", currentValue.value, "last_reported_time", metric.collectedMetrics[key].metric.reportTime, "incoming_time", currentValue.reportTime) - currentValue.value = currentValue.value + metric.collectedMetrics[key].metric.value - metric.collectedMetrics[key].metric = currentValue - metric.collectedMetrics[key].lastCollectedAt = time.Now() - } else { - level.Debug(s.logger).Log("msg", "Ignoring repeat sample", "fqName", currentValue.fqName, "key", key, "last_reported_time", metric.collectedMetrics[key].metric.reportTime, "incoming_time", currentValue.reportTime) - } -} - -func toKey(c *ConstMetric) uint64 { - labels := make(map[string]string) - keysCopy := append([]string{}, c.labelKeys...) - for i := range c.labelKeys { - labels[c.labelKeys[i]] = c.labelValues[i] - } - sort.Strings(keysCopy) - - var keyParts []string - for _, k := range keysCopy { - keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) - } - hashText := fmt.Sprintf("%s|%s", c.fqName, strings.Join(keyParts, "|")) - h := hashNew() - h = hashAdd(h, hashText) - - return h -} - -func (s inMemoryDeltaCounterStore) ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric { - output := map[string][]*CollectedMetric{} - now := time.Now() - ttlWindowStart := now.Add(-s.ttl) - - s.storeMutex.Lock() - metric := s.store[metricDescriptorName] - if metric == nil { - s.storeMutex.Unlock() - return output - } - metric.lastListedAt = now - s.storeMutex.Unlock() - - for key, collectedMetric := range metric.collectedMetrics { - //Scan and remove metrics which are outside the TTL - if ttlWindowStart.After(collectedMetric.lastCollectedAt) { - delete(metric.collectedMetrics, key) - continue - } - - metrics, exists := output[collectedMetric.metric.fqName] - if !exists { - metrics = make([]*CollectedMetric, 0) - } - thing := *collectedMetric.metric - outputCopy := CollectedMetric{ - metric: &thing, - lastCollectedAt: collectedMetric.lastCollectedAt, - } - output[collectedMetric.metric.fqName] = append(metrics, &outputCopy) - } - - return output -} - -func (s inMemoryDeltaCounterStore) ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor { - level.Debug(s.logger).Log("msg", "Listing metrics not collected", "since", since) - var names []MetricDescriptor - ttlWindowStart := time.Now().Add(-s.ttl) - - s.storeMutex.Lock() - defer s.storeMutex.Unlock() - - for name, metrics := range s.store { - //Scan and remove metrics which are outside the TTL - for key, collectedMetric := range metrics.collectedMetrics { - if ttlWindowStart.After(collectedMetric.lastCollectedAt) { - delete(metrics.collectedMetrics, key) - } - } - - if len(metrics.collectedMetrics) == 0 { - delete(s.store, name) - continue - } - - if since.After(metrics.lastListedAt) { - names = append(names, MetricDescriptor{name: name, description: metrics.description}) - } - } - - return names -} diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index f09f43dd..d19e5ac5 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -52,7 +52,8 @@ type MonitoringCollector struct { collectorFillMissingLabels bool monitoringDropDelegatedProjects bool logger log.Logger - deltaMetricStore DeltaCounterStore + deltaCounterStore DeltaCounterStore + deltaDistributionStore DeltaDistributionStore } type MonitoringCollectorOptions struct { @@ -76,7 +77,7 @@ type MonitoringCollectorOptions struct { DropDelegatedProjects bool } -func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, deltaStore DeltaCounterStore) (*MonitoringCollector, error) { +func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, distributionStore DeltaDistributionStore) (*MonitoringCollector, error) { apiCallsTotalMetric := prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "stackdriver", @@ -154,7 +155,8 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv collectorFillMissingLabels: opts.FillMissingLabels, monitoringDropDelegatedProjects: opts.DropDelegatedProjects, logger: logger, - deltaMetricStore: deltaStore, + deltaCounterStore: counterStore, + deltaDistributionStore: distributionStore, } return monitoringCollector, nil @@ -272,7 +274,7 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri if page == nil { break } - if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch, c.deltaMetricStore, begun); err != nil { + if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch, begun); err != nil { level.Error(c.logger).Log("msg", "error reporting Time Series metrics for descriptor", "descriptor", metricDescriptor.Type, "err", err) errChannel <- err break @@ -319,8 +321,18 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri wg.Wait() close(errChannel) - names := c.deltaMetricStore.ListMetricDescriptorsNotCollected(begun) - for _, descriptor := range names { + // Ensure any known descriptors which were not collected are exported to prevent them from going stale + uncollectedCounters := c.deltaCounterStore.ListMetricDescriptorsNotCollected(begun) + uncollectedHistograms := c.deltaDistributionStore.ListMetricDescriptorsNotCollected(begun) + uniqueDescriptors := map[MetricDescriptor]struct{}{} + for _, v := range uncollectedCounters { + uniqueDescriptors[v] = struct{}{} + } + for _, v := range uncollectedHistograms { + uniqueDescriptors[v] = struct{}{} + } + + for descriptor, _ := range uniqueDescriptors { level.Debug(c.logger).Log("msg", "Exporting uncollected delta counter", "metric_descriptor_name", descriptor.name) ts := TimeSeriesMetrics{ metricDescriptor: &monitoring.MetricDescriptor{Name: descriptor.name, Description: descriptor.description}, @@ -328,7 +340,7 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri fillMissingLabels: c.collectorFillMissingLabels, constMetrics: nil, histogramMetrics: nil, - deltaMetricStore: c.deltaMetricStore, + deltaCounterStore: c.deltaCounterStore, } ts.Complete(begun) } @@ -341,7 +353,6 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( page *monitoring.ListTimeSeriesResponse, metricDescriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric, - deltaMetricStore DeltaCounterStore, begun time.Time, ) error { var metricValue float64 @@ -349,12 +360,13 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( var newestTSPoint *monitoring.Point timeSeriesMetrics := &TimeSeriesMetrics{ - metricDescriptor: metricDescriptor, - ch: ch, - fillMissingLabels: c.collectorFillMissingLabels, - constMetrics: make(map[string][]*ConstMetric), - histogramMetrics: make(map[string][]HistogramMetric), - deltaMetricStore: deltaMetricStore, + metricDescriptor: metricDescriptor, + ch: ch, + fillMissingLabels: c.collectorFillMissingLabels, + constMetrics: make(map[string][]*ConstMetric), + histogramMetrics: make(map[string][]*HistogramMetric), + deltaCounterStore: c.deltaCounterStore, + deltaDistributionStore: c.deltaDistributionStore, } for _, timeSeries := range page.TimeSeries { newestEndTime := time.Unix(0, 0) @@ -428,10 +440,12 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( case "DISTRIBUTION": dist := newestTSPoint.Value.DistributionValue buckets, err := c.generateHistogramBuckets(dist) + if err == nil { - timeSeriesMetrics.CollectNewConstHistogram(timeSeries, newestEndTime, labelKeys, dist, buckets, labelValues) + timeSeriesMetrics.CollectNewConstHistogram(timeSeries, newestEndTime, labelKeys, dist, buckets, labelValues, timeSeries.MetricKind) } else { - level.Debug(c.logger).Log("msg", "discarding", "resource", timeSeries.Resource.Type, "metric", timeSeries.Metric.Type, "err", err) + level.Debug(c.logger).Log("msg", "discarding", "resource", timeSeries.Resource.Type, "metric", + timeSeries.Metric.Type, "err", err) } continue default: @@ -439,11 +453,7 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( continue } - if timeSeries.MetricKind == "DELTA" { - timeSeriesMetrics.CollectNewDeltaMetric(timeSeries, newestEndTime, labelKeys, metricValue, labelValues) - } else { - timeSeriesMetrics.CollectNewConstMetric(timeSeries, newestEndTime, labelKeys, metricValueType, metricValue, labelValues) - } + timeSeriesMetrics.CollectNewConstMetric(timeSeries, newestEndTime, labelKeys, metricValueType, metricValue, labelValues, timeSeries.MetricKind) } timeSeriesMetrics.Complete(begun) return nil diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 18e89ad3..03c9d4a2 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -38,9 +38,10 @@ type TimeSeriesMetrics struct { fillMissingLabels bool constMetrics map[string][]*ConstMetric - histogramMetrics map[string][]HistogramMetric + histogramMetrics map[string][]*HistogramMetric - deltaMetricStore DeltaCounterStore + deltaCounterStore DeltaCounterStore + deltaDistributionStore DeltaDistributionStore } func (t *TimeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *prometheus.Desc { @@ -74,15 +75,12 @@ type HistogramMetric struct { keysHash uint64 } -func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) { +func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) { fqName := buildFQName(timeSeries) - if t.fillMissingLabels { - vs, ok := t.histogramMetrics[fqName] - if !ok { - vs = make([]HistogramMetric, 0) - } - v := HistogramMetric{ + var v HistogramMetric + if t.fillMissingLabels || metricKind == "DELTA" { + v = HistogramMetric{ fqName: fqName, labelKeys: labelKeys, dist: dist, @@ -92,9 +90,22 @@ func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time keysHash: hashLabelKeys(labelKeys), } - t.histogramMetrics[fqName] = append(vs, v) + } + + if metricKind == "DELTA" { + t.deltaDistributionStore.Increment(t.metricDescriptor, &v) return } + + if t.fillMissingLabels { + vs, ok := t.histogramMetrics[fqName] + if !ok { + vs = make([]*HistogramMetric, 0) + } + t.histogramMetrics[fqName] = append(vs, &v) + return + } + t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist, buckets, labelValues) } @@ -111,32 +122,12 @@ func (t *TimeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Tim ) } -func (t *TimeSeriesMetrics) CollectNewDeltaMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValue float64, labelValues []string) { +func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string, metricKind string) { fqName := buildFQName(timeSeries) - v := ConstMetric{ - fqName: fqName, - labelKeys: labelKeys, - valueType: prometheus.CounterValue, - value: metricValue, - labelValues: labelValues, - reportTime: reportTime, - - keysHash: hashLabelKeys(labelKeys), - } - - t.deltaMetricStore.Increment(t.metricDescriptor, &v) -} - -func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) { - fqName := buildFQName(timeSeries) - - if t.fillMissingLabels { - vs, ok := t.constMetrics[fqName] - if !ok { - vs = make([]*ConstMetric, 0) - } - v := ConstMetric{ + var v ConstMetric + if t.fillMissingLabels || metricKind == "DELTA" { + v = ConstMetric{ fqName: fqName, labelKeys: labelKeys, valueType: metricValueType, @@ -146,9 +137,22 @@ func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer keysHash: hashLabelKeys(labelKeys), } + } + + if metricKind == "DELTA" { + t.deltaCounterStore.Increment(t.metricDescriptor, &v) + return + } + + if t.fillMissingLabels { + vs, ok := t.constMetrics[fqName] + if !ok { + vs = make([]*ConstMetric, 0) + } t.constMetrics[fqName] = append(vs, &v) return } + t.ch <- t.newConstMetric(fqName, reportTime, labelKeys, metricValueType, metricValue, labelValues) } @@ -177,9 +181,10 @@ func hashLabelKeys(labelKeys []string) uint64 { } func (t *TimeSeriesMetrics) Complete(reportingStartTime time.Time) { - t.completeDeltaMetrics(reportingStartTime) + t.completeDeltaConstMetrics(reportingStartTime) + t.completeDeltaHistogramMetrics(reportingStartTime) t.completeConstMetrics(t.constMetrics) - t.completeHistogramMetrics() + t.completeHistogramMetrics(t.histogramMetrics) } func (t *TimeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*ConstMetric) { @@ -202,8 +207,8 @@ func (t *TimeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*Cons } } -func (t *TimeSeriesMetrics) completeHistogramMetrics() { - for _, vs := range t.histogramMetrics { +func (t *TimeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*HistogramMetric) { + for _, vs := range histograms { if len(vs) > 1 { var needFill bool for i := 1; i < len(vs); i++ { @@ -221,14 +226,14 @@ func (t *TimeSeriesMetrics) completeHistogramMetrics() { } } -func (t *TimeSeriesMetrics) completeDeltaMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaMetricStore.ListMetricsByName(t.metricDescriptor.Name) +func (t *TimeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Time) { + descriptorMetrics := t.deltaCounterStore.ListMetricsByName(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) constMetrics := map[string][]*ConstMetric{} for _, metrics := range descriptorMetrics { for _, collected := range metrics { - // If the metric wasn't collected we still need to export it to keep the counter from going stale + // If the series wasn't collected we still need to export it to keep it from going stale if reportingStartTime.After(collected.lastCollectedAt) { reportingLag := collected.lastCollectedAt.Sub(collected.metric.reportTime).Truncate(time.Minute) collected.metric.reportTime = now.Add(-reportingLag) @@ -256,6 +261,41 @@ func (t *TimeSeriesMetrics) completeDeltaMetrics(reportingStartTime time.Time) { } } +func (t *TimeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime time.Time) { + descriptorMetrics := t.deltaDistributionStore.ListMetricsByName(t.metricDescriptor.Name) + now := time.Now().Truncate(time.Minute) + + histograms := map[string][]*HistogramMetric{} + for _, metrics := range descriptorMetrics { + for _, collected := range metrics { + // If the series wasn't collected we still need to export it to keep it from going stale + if reportingStartTime.After(collected.lastCollectedAt) { + reportingLag := collected.lastCollectedAt.Sub(collected.histogram.reportTime).Truncate(time.Minute) + collected.histogram.reportTime = now.Add(-reportingLag) + } + if t.fillMissingLabels { + if _, exists := histograms[collected.histogram.fqName]; !exists { + histograms[collected.histogram.fqName] = []*HistogramMetric{} + } + histograms[collected.histogram.fqName] = append(histograms[collected.histogram.fqName], collected.histogram) + } else { + t.ch <- t.newConstHistogram( + collected.histogram.fqName, + collected.histogram.reportTime, + collected.histogram.labelKeys, + collected.histogram.dist, + collected.histogram.buckets, + collected.histogram.labelValues, + ) + } + } + } + + if t.fillMissingLabels { + t.completeHistogramMetrics(histograms) + } +} + func fillConstMetricsLabels(metrics []*ConstMetric) []*ConstMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { @@ -282,15 +322,15 @@ func fillConstMetricsLabels(metrics []*ConstMetric) []*ConstMetric { return metrics } -func fillHistogramMetricsLabels(metrics []HistogramMetric) []HistogramMetric { +func fillHistogramMetricsLabels(metrics []*HistogramMetric) []*HistogramMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { for _, key := range metric.labelKeys { allKeys[key] = struct{}{} } } - result := make([]HistogramMetric, len(metrics)) - for i, metric := range metrics { + + for _, metric := range metrics { if len(metric.labelKeys) != len(allKeys) { metricKeys := make(map[string]struct{}) for _, key := range metric.labelKeys { @@ -303,8 +343,7 @@ func fillHistogramMetricsLabels(metrics []HistogramMetric) []HistogramMetric { } } } - result[i] = metric } - return result + return metrics } diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index f25dc137..311b41e5 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -192,7 +192,9 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { IngestDelay: *monitoringMetricsIngestDelay, FillMissingLabels: *collectorFillMissingLabels, DropDelegatedProjects: *monitoringDropDelegatedProjects, - }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, time.Hour)) + }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, time.Minute*30), collectors.NewInMemoryDeltaDistributionStore(h.logger, time.Minute*30)) + //TODO config flag for TTL value + //TODO config flag for aggregate deltas if err != nil { level.Error(h.logger).Log("err", err) os.Exit(1) From 61209c4caac8077faca8140c2820629ab3c62290 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 26 Aug 2022 07:32:51 -0400 Subject: [PATCH 3/8] Only consider descriptors which were collected and add config flags Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 66 +++++------------------------- collectors/delta_distribution.go | 61 +++++---------------------- collectors/monitoring_collector.go | 45 ++++++-------------- collectors/monitoring_metrics.go | 57 ++++++++++++++++++-------- stackdriver_exporter.go | 14 +++++-- 5 files changed, 84 insertions(+), 159 deletions(-) diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index 9db04d10..66e1ae2b 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -30,25 +30,15 @@ type CollectedMetric struct { lastCollectedAt time.Time } -type MetricDescriptor struct { - name string - description string -} - type DeltaCounterStore interface { Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric - ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor } -type metricEntry struct { - collected map[uint64]*CollectedMetric - lastListedAt time.Time - description string -} +type metricEntry = map[uint64]*CollectedMetric type inMemoryDeltaCounterStore struct { - store map[string]*metricEntry + store map[string]metricEntry ttl time.Duration storeMutex *sync.RWMutex logger log.Logger @@ -56,7 +46,7 @@ type inMemoryDeltaCounterStore struct { func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { return inMemoryDeltaCounterStore{ - store: map[string]*metricEntry{}, + store: map[string]metricEntry{}, storeMutex: &sync.RWMutex{}, logger: logger, ttl: ttl, @@ -68,28 +58,25 @@ func (s inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.Metric return } - var metric *metricEntry + var entry metricEntry s.storeMutex.Lock() if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = &metricEntry{ - collected: map[uint64]*CollectedMetric{}, - lastListedAt: time.Time{}, - } + s.store[metricDescriptor.Name] = metricEntry{} } - metric = s.store[metricDescriptor.Name] + entry = s.store[metricDescriptor.Name] s.storeMutex.Unlock() key := toCounterKey(currentValue) - existing := metric.collected[key] + existing := entry[key] if existing == nil { level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) - metric.collected[key] = &CollectedMetric{currentValue, time.Now()} + entry[key] = &CollectedMetric{currentValue, time.Now()} return } if existing.metric.reportTime.Before(currentValue.reportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", metric.collected[key].metric.reportTime, "incoming_time", currentValue.reportTime) + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry[key].metric.reportTime, "incoming_time", currentValue.reportTime) currentValue.value = currentValue.value + existing.metric.value existing.metric = currentValue existing.lastCollectedAt = time.Now() @@ -129,14 +116,13 @@ func (s inMemoryDeltaCounterStore) ListMetricsByName(metricDescriptorName string s.storeMutex.Unlock() return output } - metric.lastListedAt = now s.storeMutex.Unlock() - for key, collected := range metric.collected { + for key, collected := range metric { //Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.lastCollectedAt) { level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.fqName) - delete(metric.collected, key) + delete(metric, key) continue } @@ -154,33 +140,3 @@ func (s inMemoryDeltaCounterStore) ListMetricsByName(metricDescriptorName string return output } - -func (s inMemoryDeltaCounterStore) ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor { - var names []MetricDescriptor - ttlWindowStart := time.Now().Add(-s.ttl) - - s.storeMutex.Lock() - defer s.storeMutex.Unlock() - - for name, metrics := range s.store { - //Scan and remove metrics which are outside the TTL - for key, collectedMetric := range metrics.collected { - if ttlWindowStart.After(collectedMetric.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collectedMetric.metric.fqName) - delete(metrics.collected, key) - } - } - - if len(metrics.collected) == 0 { - level.Debug(s.logger).Log("msg", "Deleting empty descriptor store entry", "metric_descriptor_name", name) - delete(s.store, name) - continue - } - - if since.After(metrics.lastListedAt) { - names = append(names, MetricDescriptor{name: name, description: metrics.description}) - } - } - - return names -} diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index f7055f19..41e9d18f 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -33,17 +33,12 @@ type CollectedHistogram struct { type DeltaDistributionStore interface { Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) ListMetricsByName(metricDescriptorName string) map[string][]*CollectedHistogram - ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor } -type histogramEntry struct { - collected map[uint64]*CollectedHistogram - lastListedAt time.Time - description string -} +type histogramEntry = map[uint64]*CollectedHistogram type inMemoryDeltaDistributionStore struct { - store map[string]*histogramEntry + store map[string]histogramEntry ttl time.Duration storeMutex *sync.RWMutex logger log.Logger @@ -51,7 +46,7 @@ type inMemoryDeltaDistributionStore struct { func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { return inMemoryDeltaDistributionStore{ - store: map[string]*histogramEntry{}, + store: map[string]histogramEntry{}, storeMutex: &sync.RWMutex{}, logger: logger, ttl: ttl, @@ -63,23 +58,20 @@ func (s inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.M return } - var entry *histogramEntry + var entry histogramEntry s.storeMutex.Lock() if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = &histogramEntry{ - collected: map[uint64]*CollectedHistogram{}, - lastListedAt: time.Time{}, - } + s.store[metricDescriptor.Name] = histogramEntry{} } entry = s.store[metricDescriptor.Name] s.storeMutex.Unlock() key := toHistogramKey(currentValue) - existing := entry.collected[key] + existing := entry[key] if existing == nil { level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime) - entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} + entry[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} return } @@ -139,19 +131,18 @@ func (s inMemoryDeltaDistributionStore) ListMetricsByName(metricDescriptorName s ttlWindowStart := now.Add(-s.ttl) s.storeMutex.Lock() - metric := s.store[metricDescriptorName] - if metric == nil { + entry := s.store[metricDescriptorName] + if entry == nil { s.storeMutex.Unlock() return output } - metric.lastListedAt = now s.storeMutex.Unlock() - for key, collected := range metric.collected { + for key, collected := range entry { //Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.lastCollectedAt) { level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName) - delete(metric.collected, key) + delete(entry, key) continue } @@ -169,33 +160,3 @@ func (s inMemoryDeltaDistributionStore) ListMetricsByName(metricDescriptorName s return output } - -func (s inMemoryDeltaDistributionStore) ListMetricDescriptorsNotCollected(since time.Time) []MetricDescriptor { - var names []MetricDescriptor - ttlWindowStart := time.Now().Add(-s.ttl) - - s.storeMutex.Lock() - defer s.storeMutex.Unlock() - - for name, metrics := range s.store { - //Scan and remove metrics which are outside the TTL - for key, collectedMetric := range metrics.collected { - if ttlWindowStart.After(collectedMetric.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collectedMetric.histogram.fqName) - delete(metrics.collected, key) - } - } - - if len(metrics.collected) == 0 { - level.Debug(s.logger).Log("msg", "Deleting empty descriptor store entry", "metric_descriptor_name", name) - delete(s.store, name) - continue - } - - if since.After(metrics.lastListedAt) { - names = append(names, MetricDescriptor{name: name, description: metrics.description}) - } - } - - return names -} diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index d19e5ac5..997652cf 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -54,6 +54,7 @@ type MonitoringCollector struct { logger log.Logger deltaCounterStore DeltaCounterStore deltaDistributionStore DeltaDistributionStore + aggregateDeltas bool } type MonitoringCollectorOptions struct { @@ -75,6 +76,8 @@ type MonitoringCollectorOptions struct { FillMissingLabels bool // DropDelegatedProjects decides if only metrics matching the collector's projectID should be retrieved. DropDelegatedProjects bool + // AggregateDeltas decides if DELTA metrics should be treated as a counter using the provided counterStore/distributionStore or a gauge + AggregateDeltas bool } func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, distributionStore DeltaDistributionStore) (*MonitoringCollector, error) { @@ -157,6 +160,7 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv logger: logger, deltaCounterStore: counterStore, deltaDistributionStore: distributionStore, + aggregateDeltas: opts.AggregateDeltas, } return monitoringCollector, nil @@ -321,30 +325,6 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri wg.Wait() close(errChannel) - // Ensure any known descriptors which were not collected are exported to prevent them from going stale - uncollectedCounters := c.deltaCounterStore.ListMetricDescriptorsNotCollected(begun) - uncollectedHistograms := c.deltaDistributionStore.ListMetricDescriptorsNotCollected(begun) - uniqueDescriptors := map[MetricDescriptor]struct{}{} - for _, v := range uncollectedCounters { - uniqueDescriptors[v] = struct{}{} - } - for _, v := range uncollectedHistograms { - uniqueDescriptors[v] = struct{}{} - } - - for descriptor, _ := range uniqueDescriptors { - level.Debug(c.logger).Log("msg", "Exporting uncollected delta counter", "metric_descriptor_name", descriptor.name) - ts := TimeSeriesMetrics{ - metricDescriptor: &monitoring.MetricDescriptor{Name: descriptor.name, Description: descriptor.description}, - ch: ch, - fillMissingLabels: c.collectorFillMissingLabels, - constMetrics: nil, - histogramMetrics: nil, - deltaCounterStore: c.deltaCounterStore, - } - ts.Complete(begun) - } - level.Debug(c.logger).Log("msg", "Done reporting monitoring metrics") return <-errChannel } @@ -359,14 +339,15 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( var metricValueType prometheus.ValueType var newestTSPoint *monitoring.Point - timeSeriesMetrics := &TimeSeriesMetrics{ - metricDescriptor: metricDescriptor, - ch: ch, - fillMissingLabels: c.collectorFillMissingLabels, - constMetrics: make(map[string][]*ConstMetric), - histogramMetrics: make(map[string][]*HistogramMetric), - deltaCounterStore: c.deltaCounterStore, - deltaDistributionStore: c.deltaDistributionStore, + timeSeriesMetrics, err := NewTimeSeriesMetrics(metricDescriptor, + ch, + c.collectorFillMissingLabels, + c.deltaCounterStore, + c.deltaDistributionStore, + c.aggregateDeltas, + ) + if err != nil { + return fmt.Errorf("error creating the TimeSeriesMetrics %v", err) } for _, timeSeries := range page.TimeSeries { newestEndTime := time.Unix(0, 0) diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 03c9d4a2..37add169 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -32,9 +32,10 @@ func buildFQName(timeSeries *monitoring.TimeSeries) string { return prometheus.BuildFQName("stackdriver", utils.NormalizeMetricName(timeSeries.Resource.Type), utils.NormalizeMetricName(timeSeries.Metric.Type)) } -type TimeSeriesMetrics struct { +type timeSeriesMetrics struct { metricDescriptor *monitoring.MetricDescriptor - ch chan<- prometheus.Metric + + ch chan<- prometheus.Metric fillMissingLabels bool constMetrics map[string][]*ConstMetric @@ -42,9 +43,29 @@ type TimeSeriesMetrics struct { deltaCounterStore DeltaCounterStore deltaDistributionStore DeltaDistributionStore + aggregateDeltas bool +} + +func NewTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, + ch chan<- prometheus.Metric, + fillMissingLabels bool, + deltaCounterStore DeltaCounterStore, + deltaDistributionStore DeltaDistributionStore, + aggregateDeltas bool) (*timeSeriesMetrics, error) { + + return &timeSeriesMetrics{ + metricDescriptor: descriptor, + ch: ch, + fillMissingLabels: fillMissingLabels, + constMetrics: make(map[string][]*ConstMetric), + histogramMetrics: make(map[string][]*HistogramMetric), + deltaCounterStore: deltaCounterStore, + deltaDistributionStore: deltaDistributionStore, + aggregateDeltas: aggregateDeltas, + }, nil } -func (t *TimeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *prometheus.Desc { +func (t *timeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *prometheus.Desc { return prometheus.NewDesc( fqName, t.metricDescriptor.Description, @@ -75,11 +96,11 @@ type HistogramMetric struct { keysHash uint64 } -func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) { +func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) { fqName := buildFQName(timeSeries) var v HistogramMetric - if t.fillMissingLabels || metricKind == "DELTA" { + if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = HistogramMetric{ fqName: fqName, labelKeys: labelKeys, @@ -92,7 +113,7 @@ func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time } } - if metricKind == "DELTA" { + if metricKind == "DELTA" && t.aggregateDeltas { t.deltaDistributionStore.Increment(t.metricDescriptor, &v) return } @@ -109,7 +130,7 @@ func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist, buckets, labelValues) } -func (t *TimeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { +func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { return prometheus.NewMetricWithTimestamp( reportTime, prometheus.MustNewConstHistogram( @@ -122,11 +143,11 @@ func (t *TimeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Tim ) } -func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string, metricKind string) { +func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string, metricKind string) { fqName := buildFQName(timeSeries) var v ConstMetric - if t.fillMissingLabels || metricKind == "DELTA" { + if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = ConstMetric{ fqName: fqName, labelKeys: labelKeys, @@ -139,7 +160,7 @@ func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer } } - if metricKind == "DELTA" { + if metricKind == "DELTA" && t.aggregateDeltas { t.deltaCounterStore.Increment(t.metricDescriptor, &v) return } @@ -156,7 +177,7 @@ func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer t.ch <- t.newConstMetric(fqName, reportTime, labelKeys, metricValueType, metricValue, labelValues) } -func (t *TimeSeriesMetrics) newConstMetric(fqName string, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) prometheus.Metric { +func (t *timeSeriesMetrics) newConstMetric(fqName string, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) prometheus.Metric { return prometheus.NewMetricWithTimestamp( reportTime, prometheus.MustNewConstMetric( @@ -180,14 +201,14 @@ func hashLabelKeys(labelKeys []string) uint64 { return dh } -func (t *TimeSeriesMetrics) Complete(reportingStartTime time.Time) { +func (t *timeSeriesMetrics) Complete(reportingStartTime time.Time) { t.completeDeltaConstMetrics(reportingStartTime) t.completeDeltaHistogramMetrics(reportingStartTime) t.completeConstMetrics(t.constMetrics) t.completeHistogramMetrics(t.histogramMetrics) } -func (t *TimeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*ConstMetric) { +func (t *timeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*ConstMetric) { for _, vs := range constMetrics { if len(vs) > 1 { var needFill bool @@ -207,7 +228,7 @@ func (t *TimeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*Cons } } -func (t *TimeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*HistogramMetric) { +func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*HistogramMetric) { for _, vs := range histograms { if len(vs) > 1 { var needFill bool @@ -226,14 +247,14 @@ func (t *TimeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi } } -func (t *TimeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Time) { +func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Time) { descriptorMetrics := t.deltaCounterStore.ListMetricsByName(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) constMetrics := map[string][]*ConstMetric{} for _, metrics := range descriptorMetrics { for _, collected := range metrics { - // If the series wasn't collected we still need to export it to keep it from going stale + // If the metric wasn't collected we should still export it at the next sample time to avoid staleness if reportingStartTime.After(collected.lastCollectedAt) { reportingLag := collected.lastCollectedAt.Sub(collected.metric.reportTime).Truncate(time.Minute) collected.metric.reportTime = now.Add(-reportingLag) @@ -261,14 +282,14 @@ func (t *TimeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Ti } } -func (t *TimeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime time.Time) { +func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime time.Time) { descriptorMetrics := t.deltaDistributionStore.ListMetricsByName(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) histograms := map[string][]*HistogramMetric{} for _, metrics := range descriptorMetrics { for _, collected := range metrics { - // If the series wasn't collected we still need to export it to keep it from going stale + // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness if reportingStartTime.After(collected.lastCollectedAt) { reportingLag := collected.lastCollectedAt.Sub(collected.histogram.reportTime).Truncate(time.Minute) collected.histogram.reportTime = now.Add(-reportingLag) diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 311b41e5..ad927d67 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -18,7 +18,6 @@ import ( "net/http" "os" "strings" - "time" "github.com/PuerkitoBio/rehttp" "github.com/go-kit/log" @@ -102,6 +101,14 @@ var ( monitoringMetricsExtraFilter = kingpin.Flag( "monitoring.filters", "Filters. i.e: pubsub.googleapis.com/subscription:resource.labels.subscription_id=monitoring.regex.full_match(\"my-subs-prefix.*\")").Strings() + + monitoringMetricsAggregateDeltas = kingpin.Flag( + "monitoring.aggregate-deltas", "If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge", + ).Default("false").Bool() + + monitoringMetricsDeltasTTL = kingpin.Flag( + "monitoring.aggregate-deltas-ttl", "How long should a delta metric continue to be exported after GCP stops producing the metric. Adjusting this can help with memory utilization with delta metrics", + ).Default("30m").Duration() ) func init() { @@ -192,9 +199,8 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { IngestDelay: *monitoringMetricsIngestDelay, FillMissingLabels: *collectorFillMissingLabels, DropDelegatedProjects: *monitoringDropDelegatedProjects, - }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, time.Minute*30), collectors.NewInMemoryDeltaDistributionStore(h.logger, time.Minute*30)) - //TODO config flag for TTL value - //TODO config flag for aggregate deltas + AggregateDeltas: *monitoringMetricsAggregateDeltas, + }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, *monitoringMetricsDeltasTTL), collectors.NewInMemoryDeltaDistributionStore(h.logger, *monitoringMetricsDeltasTTL)) if err != nil { level.Error(h.logger).Log("err", err) os.Exit(1) From d26d63e9311cfdd1f8e60dc97c1c59f87f3080ba Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 26 Aug 2022 14:37:06 -0400 Subject: [PATCH 4/8] Add go docs, a section in the readme, and make sure the aggregateDeltas flag is respected Signed-off-by: Kyle Eckhart --- README.md | 53 +++++++++++++++++++++++------- collectors/delta_counter.go | 12 +++++-- collectors/delta_distribution.go | 12 +++++-- collectors/monitoring_collector.go | 6 +++- collectors/monitoring_metrics.go | 10 ++++-- stackdriver_exporter.go | 2 +- 6 files changed, 75 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index e19150b1..078166eb 100644 --- a/README.md +++ b/README.md @@ -61,16 +61,18 @@ If you are still using the legacy [Access scopes][access-scopes], the `https://w ### Flags -| Flag | Required | Default | Description | -| --------------------------------- | -------- | ------- | ----------- | -| `google.project-id` | No | GCloud SDK auto-discovery | Comma seperated list of Google Project IDs | -| `monitoring.metrics-ingest-delay` | No | | Offsets metric collection by a delay appropriate for each metric type, e.g. because bigquery metrics are slow to appear | -| `monitoring.metrics-type-prefixes` | Yes | | Comma separated Google Stackdriver Monitoring Metric Type prefixes (see [example][metrics-prefix-example] and [available metrics][metrics-list]) | -| `monitoring.metrics-interval` | No | `5m` | Metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API. Only the most recent data point is used | -| `monitoring.metrics-offset` | No | `0s` | Offset (into the past) for the metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API, to handle latency in published metrics | -| `monitoring.filters` | No | | Formatted string to allow filtering on certain metrics type | -| `web.listen-address` | No | `:9255` | Address to listen on for web interface and telemetry | -| `web.telemetry-path` | No | `/metrics` | Path under which to expose Prometheus metrics | +| Flag | Required | Default | Description | +| --------------------------------- | -------- |---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `google.project-id` | No | GCloud SDK auto-discovery | Comma seperated list of Google Project IDs | +| `monitoring.metrics-ingest-delay` | No | | Offsets metric collection by a delay appropriate for each metric type, e.g. because bigquery metrics are slow to appear | +| `monitoring.metrics-type-prefixes` | Yes | | Comma separated Google Stackdriver Monitoring Metric Type prefixes (see [example][metrics-prefix-example] and [available metrics][metrics-list]) | +| `monitoring.metrics-interval` | No | `5m` | Metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API. Only the most recent data point is used | +| `monitoring.metrics-offset` | No | `0s` | Offset (into the past) for the metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API, to handle latency in published metrics | +| `monitoring.filters` | No | | Formatted string to allow filtering on certain metrics type | +| `monitoring.aggregate-deltas` | No | | If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge. Be sure to read [what to know about aggregating DELTA metrics](#what-to-know-about-aggregating-delta-metrics) | +| `monitoring.aggregate-deltas-ttl` | No | `30m` | How long should a delta metric continue to be exported and stored after GCP stops producing it. Read [slow moving metrics](#slow-moving-metrics) to understand the problem this attempts to solve | +| `web.listen-address` | No | `:9255` | Address to listen on for web interface and telemetry | +| `web.telemetry-path` | No | `/metrics` | Path under which to expose Prometheus metrics | ### Metrics @@ -95,7 +97,9 @@ Metrics gathered from Google Stackdriver Monitoring are converted to Prometheus 3. the metric type labels (see [Metrics List][metrics-list]) 4. the monitored resource labels (see [Monitored Resource Types][monitored-resources]) * For each timeseries, only the most recent data point is exported. -* Stackdriver `GAUGE` and `DELTA` metric kinds are reported as Prometheus `Gauge` metrics; Stackdriver `CUMULATIVE` metric kinds are reported as Prometheus `Counter` metrics. +* Stackdriver `GAUGE` metric kinds are reported as Prometheus `Gauge` metrics +* Stackdriver `CUMULATIVE` metric kinds are reported as Prometheus `Counter` metrics. +* Stackdriver `DELTA` metric kinds are reported as Prometheus `Gauge` metrics or an accumulating `Counter` if `monitoring.aggregate-deltas` is set * Only `BOOL`, `INT64`, `DOUBLE` and `DISTRIBUTION` metric types are supported, other types (`STRING` and `MONEY`) are discarded. * `DISTRIBUTION` metric type is reported as a Prometheus `Histogram`, except the `_sum` time series is not supported. @@ -118,7 +122,7 @@ stackdriver_exporter \ --monitoring.filters='pubsub.googleapis.com/subscription:resource.labels.subscription_id=monitoring.regex.full_match("us-west4.*my-team-subs.*")' ``` -## Filtering enabled collectors +### Filtering enabled collectors The `stackdriver_exporter` collects all metrics type prefixes by default. @@ -132,6 +136,31 @@ params: - compute.googleapis.com/instance/disk ``` +### What to know about Aggregating DELTA Metrics + +Treating DELTA Metrics as a gauge produces data which is wildly inaccurate/not very useful (see https://github.com/prometheus-community/stackdriver_exporter/issues/116). However, aggregating the DELTA metrics overtime is not a perfect solution and is intended to produce data which mirrors GCP's data as close as possible. + +The biggest challenge to producing a correct result is that a counter for prometheus does not start at 0, it starts at the first value which is exported. This can cause inconsistencies when the exporter first starts and for slow moving metrics which are described below. + +#### Start-up Delay + +When the exporter first starts it has no persisted counter information and the stores will be empty. When the first sample is received for a series it is intended to be a change from a previous value according to GCP, a delta. But the prometheus counter is not initialized to 0 so it does not export this as a change from 0, it exports that the counter started at the sample value. Since the series exported are dynamic it's impossible to export an initial 0 value in order to account for this issue. The end result is that it can take a few cycles for aggregated metrics to start showing rates exactly as GCP. + +As an example consider a prometheus query, `sum by(backend_target_name) (rate(stackdriver_https_lb_rule_loadbalancing_googleapis_com_https_request_bytes_count[1m]))` which is aggregating 5 series. All 5 series will need to have two samples from GCP in order for the query to produce the same result as GCP. + +#### Slow Moving Metrics + +A slow moving metric would be a metric which is not constantly changing with every sample from GCP. GCP does not consistently report slow moving metrics DELTA metrics. If this occurs for too long (default 5m) prometheus will mark the series as [stale](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness). The end result is that the next reported sample will be treated as the start of a new series and not an increment from the previous value. Here's an example of this in action, ![](https://user-images.githubusercontent.com/4571540/184961445-ed40237b-108e-4177-9d06-aafe61f92430.png) + +There are two features which attempt to combat this issue, + +1. `monitoring.aggregate-deltas-ttl` which controls how long a metric is persisted in the data store after its no longer being reported by GCP +1. Metrics which were not collected during a scrape are still exported at their current counter value + +The configuration when using `monitoring.aggregate-deltas` gives a 30 minute buffer to slower moving metrics and `monitoring.aggregate-deltas-ttl` can be adjusted to tune memory requirements vs correctness. Storing the data for longer results in a higher memory cost. + +The feature which continues to export metrics which are not collected can cause `the sample has been rejected because another sample with the same timestamp, but a different value, has already been ingested` if your [scrape config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) for the exporter has `honor_timestamps` enabled (this is the default value). This is caused by the fact that it's not possible to know the different between GCP having late arriving data and GCP not exporting a value. The underlying counter is still incremented when this happens so the next reported sample will show a higher rate than expected. + ## Contributing Refer to the [contributing guidelines][contributing]. diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index 66e1ae2b..4a5c6f60 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -30,9 +30,16 @@ type CollectedMetric struct { lastCollectedAt time.Time } +// DeltaCounterStore defines a set of functions which must be implemented in order to be used as a DeltaCounterStore +// which accumulates DELTA Counter metrics over time type DeltaCounterStore interface { + + // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming + // value to an existing entry in the underlying store Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) - ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric + + // ListMetrics will return all known entries in the store for a metricDescriptorName + ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric } type metricEntry = map[uint64]*CollectedMetric @@ -44,6 +51,7 @@ type inMemoryDeltaCounterStore struct { logger log.Logger } +// NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { return inMemoryDeltaCounterStore{ store: map[string]metricEntry{}, @@ -105,7 +113,7 @@ func toCounterKey(c *ConstMetric) uint64 { return h } -func (s inMemoryDeltaCounterStore) ListMetricsByName(metricDescriptorName string) map[string][]*CollectedMetric { +func (s inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric { output := map[string][]*CollectedMetric{} now := time.Now() ttlWindowStart := now.Add(-s.ttl) diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index 41e9d18f..fe1b8db6 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -30,9 +30,16 @@ type CollectedHistogram struct { lastCollectedAt time.Time } +// DeltaDistributionStore defines a set of functions which must be implemented in order to be used as a DeltaDistributionStore +// which accumulates DELTA histogram metrics over time type DeltaDistributionStore interface { + + // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming + // value to an existing entry in the underlying store Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) - ListMetricsByName(metricDescriptorName string) map[string][]*CollectedHistogram + + // ListMetrics will return all known entries in the store for a metricDescriptorName + ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram } type histogramEntry = map[uint64]*CollectedHistogram @@ -44,6 +51,7 @@ type inMemoryDeltaDistributionStore struct { logger log.Logger } +// NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { return inMemoryDeltaDistributionStore{ store: map[string]histogramEntry{}, @@ -125,7 +133,7 @@ func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *Histo return current } -func (s inMemoryDeltaDistributionStore) ListMetricsByName(metricDescriptorName string) map[string][]*CollectedHistogram { +func (s inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram { output := map[string][]*CollectedHistogram{} now := time.Now() ttlWindowStart := now.Add(-s.ttl) diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index 997652cf..f58d1369 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -401,7 +401,11 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( case "GAUGE": metricValueType = prometheus.GaugeValue case "DELTA": - metricValueType = prometheus.CounterValue + if c.aggregateDeltas { + metricValueType = prometheus.CounterValue + } else { + metricValueType = prometheus.GaugeValue + } case "CUMULATIVE": metricValueType = prometheus.CounterValue default: diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 37add169..496765c1 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -248,7 +248,7 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi } func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaCounterStore.ListMetricsByName(t.metricDescriptor.Name) + descriptorMetrics := t.deltaCounterStore.ListMetrics(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) constMetrics := map[string][]*ConstMetric{} @@ -256,6 +256,9 @@ func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Ti for _, collected := range metrics { // If the metric wasn't collected we should still export it at the next sample time to avoid staleness if reportingStartTime.After(collected.lastCollectedAt) { + // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many + // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional + // for a monitoring.MetricDescriptor reportingLag := collected.lastCollectedAt.Sub(collected.metric.reportTime).Truncate(time.Minute) collected.metric.reportTime = now.Add(-reportingLag) } @@ -283,7 +286,7 @@ func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Ti } func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaDistributionStore.ListMetricsByName(t.metricDescriptor.Name) + descriptorMetrics := t.deltaDistributionStore.ListMetrics(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) histograms := map[string][]*HistogramMetric{} @@ -291,6 +294,9 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim for _, collected := range metrics { // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness if reportingStartTime.After(collected.lastCollectedAt) { + // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many + // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional + // for a monitoring.MetricDescriptor reportingLag := collected.lastCollectedAt.Sub(collected.histogram.reportTime).Truncate(time.Minute) collected.histogram.reportTime = now.Add(-reportingLag) } diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index ad927d67..ab8b49b9 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -107,7 +107,7 @@ var ( ).Default("false").Bool() monitoringMetricsDeltasTTL = kingpin.Flag( - "monitoring.aggregate-deltas-ttl", "How long should a delta metric continue to be exported after GCP stops producing the metric. Adjusting this can help with memory utilization with delta metrics", + "monitoring.aggregate-deltas-ttl", "How long should a delta metric continue to be exported after GCP stops producing a metric", ).Default("30m").Duration() ) From a28d78e805766ec142e7369f55c6f5ee8ca9b363 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 26 Aug 2022 17:01:44 -0400 Subject: [PATCH 5/8] Minor clarification on where the initial 0 value comes from Signed-off-by: Kyle Eckhart --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 078166eb..c14ef6c1 100644 --- a/README.md +++ b/README.md @@ -144,7 +144,7 @@ The biggest challenge to producing a correct result is that a counter for promet #### Start-up Delay -When the exporter first starts it has no persisted counter information and the stores will be empty. When the first sample is received for a series it is intended to be a change from a previous value according to GCP, a delta. But the prometheus counter is not initialized to 0 so it does not export this as a change from 0, it exports that the counter started at the sample value. Since the series exported are dynamic it's impossible to export an initial 0 value in order to account for this issue. The end result is that it can take a few cycles for aggregated metrics to start showing rates exactly as GCP. +When the exporter first starts it has no persisted counter information and the stores will be empty. When the first sample is received for a series it is intended to be a change from a previous value according to GCP, a delta. But the prometheus counter is not initialized to 0 so it does not export this as a change from 0, it exports that the counter started at the sample value. Since the series exported are dynamic it's not possible to export an [initial 0 value](https://prometheus.io/docs/practices/instrumentation/#avoid-missing-metrics) in order to account for this issue. The end result is that it can take a few cycles for aggregated metrics to start showing rates exactly as GCP. As an example consider a prometheus query, `sum by(backend_target_name) (rate(stackdriver_https_lb_rule_loadbalancing_googleapis_com_https_request_bytes_count[1m]))` which is aggregating 5 series. All 5 series will need to have two samples from GCP in order for the query to produce the same result as GCP. From ffb190baae63fe747e62b6e0be63ef1930d7a941 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 26 Aug 2022 17:02:35 -0400 Subject: [PATCH 6/8] Locking at the entry level just in case external implementation changes Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 41 ++++++++++++++++++++------------ collectors/delta_distribution.go | 35 +++++++++++++++++---------- 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index 4a5c6f60..6e72a30d 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -42,10 +42,13 @@ type DeltaCounterStore interface { ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric } -type metricEntry = map[uint64]*CollectedMetric +type metricEntry struct { + collected map[uint64]*CollectedMetric + mutex *sync.RWMutex +} type inMemoryDeltaCounterStore struct { - store map[string]metricEntry + store map[string]*metricEntry ttl time.Duration storeMutex *sync.RWMutex logger log.Logger @@ -53,38 +56,44 @@ type inMemoryDeltaCounterStore struct { // NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { - return inMemoryDeltaCounterStore{ - store: map[string]metricEntry{}, + return &inMemoryDeltaCounterStore{ + store: map[string]*metricEntry{}, storeMutex: &sync.RWMutex{}, logger: logger, ttl: ttl, } } -func (s inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { +func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { if currentValue == nil { return } - var entry metricEntry + var entry *metricEntry s.storeMutex.Lock() if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = metricEntry{} + s.store[metricDescriptor.Name] = &metricEntry{ + collected: map[uint64]*CollectedMetric{}, + mutex: &sync.RWMutex{}, + } } entry = s.store[metricDescriptor.Name] s.storeMutex.Unlock() key := toCounterKey(currentValue) - existing := entry[key] + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.collected[key] if existing == nil { level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) - entry[key] = &CollectedMetric{currentValue, time.Now()} + entry.collected[key] = &CollectedMetric{currentValue, time.Now()} return } if existing.metric.reportTime.Before(currentValue.reportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry[key].metric.reportTime, "incoming_time", currentValue.reportTime) + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry.collected[key].metric.reportTime, "incoming_time", currentValue.reportTime) currentValue.value = currentValue.value + existing.metric.value existing.metric = currentValue existing.lastCollectedAt = time.Now() @@ -113,24 +122,26 @@ func toCounterKey(c *ConstMetric) uint64 { return h } -func (s inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric { +func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric { output := map[string][]*CollectedMetric{} now := time.Now() ttlWindowStart := now.Add(-s.ttl) s.storeMutex.Lock() - metric := s.store[metricDescriptorName] - if metric == nil { + entry := s.store[metricDescriptorName] + if entry == nil { s.storeMutex.Unlock() return output } s.storeMutex.Unlock() - for key, collected := range metric { + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.collected { //Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.lastCollectedAt) { level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.fqName) - delete(metric, key) + delete(entry.collected, key) continue } diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index fe1b8db6..7086b0c5 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -42,10 +42,13 @@ type DeltaDistributionStore interface { ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram } -type histogramEntry = map[uint64]*CollectedHistogram +type histogramEntry struct { + collected map[uint64]*CollectedHistogram + mutex *sync.RWMutex +} type inMemoryDeltaDistributionStore struct { - store map[string]histogramEntry + store map[string]*histogramEntry ttl time.Duration storeMutex *sync.RWMutex logger log.Logger @@ -53,33 +56,39 @@ type inMemoryDeltaDistributionStore struct { // NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { - return inMemoryDeltaDistributionStore{ - store: map[string]histogramEntry{}, + return &inMemoryDeltaDistributionStore{ + store: map[string]*histogramEntry{}, storeMutex: &sync.RWMutex{}, logger: logger, ttl: ttl, } } -func (s inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { +func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { if currentValue == nil { return } - var entry histogramEntry + var entry *histogramEntry s.storeMutex.Lock() if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = histogramEntry{} + s.store[metricDescriptor.Name] = &histogramEntry{ + collected: map[uint64]*CollectedHistogram{}, + mutex: &sync.RWMutex{}, + } } entry = s.store[metricDescriptor.Name] s.storeMutex.Unlock() key := toHistogramKey(currentValue) - existing := entry[key] + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.collected[key] if existing == nil { level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime) - entry[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} + entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} return } @@ -133,7 +142,7 @@ func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *Histo return current } -func (s inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram { +func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram { output := map[string][]*CollectedHistogram{} now := time.Now() ttlWindowStart := now.Add(-s.ttl) @@ -146,11 +155,13 @@ func (s inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) } s.storeMutex.Unlock() - for key, collected := range entry { + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.collected { //Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.lastCollectedAt) { level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName) - delete(entry, key) + delete(entry.collected, key) continue } From aefd076d3ed8195cff7cb923521a0079cc431a61 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 9 Sep 2022 07:48:23 -0400 Subject: [PATCH 7/8] Use sync.Map to fix concurrent read/write issue Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 37 ++++++++++++-------------------- collectors/delta_distribution.go | 37 ++++++++++++-------------------- 2 files changed, 28 insertions(+), 46 deletions(-) diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index 6e72a30d..516e1767 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -48,19 +48,17 @@ type metricEntry struct { } type inMemoryDeltaCounterStore struct { - store map[string]*metricEntry - ttl time.Duration - storeMutex *sync.RWMutex - logger log.Logger + store *sync.Map + ttl time.Duration + logger log.Logger } // NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { return &inMemoryDeltaCounterStore{ - store: map[string]*metricEntry{}, - storeMutex: &sync.RWMutex{}, - logger: logger, - ttl: ttl, + store: &sync.Map{}, + logger: logger, + ttl: ttl, } } @@ -69,16 +67,11 @@ func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.Metri return } - var entry *metricEntry - s.storeMutex.Lock() - if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = &metricEntry{ - collected: map[uint64]*CollectedMetric{}, - mutex: &sync.RWMutex{}, - } - } - entry = s.store[metricDescriptor.Name] - s.storeMutex.Unlock() + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &metricEntry{ + collected: map[uint64]*CollectedMetric{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*metricEntry) key := toCounterKey(currentValue) @@ -127,13 +120,11 @@ func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map now := time.Now() ttlWindowStart := now.Add(-s.ttl) - s.storeMutex.Lock() - entry := s.store[metricDescriptorName] - if entry == nil { - s.storeMutex.Unlock() + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { return output } - s.storeMutex.Unlock() + entry := tmp.(*metricEntry) entry.mutex.Lock() defer entry.mutex.Unlock() diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index 7086b0c5..c6e46b9f 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -48,19 +48,17 @@ type histogramEntry struct { } type inMemoryDeltaDistributionStore struct { - store map[string]*histogramEntry - ttl time.Duration - storeMutex *sync.RWMutex - logger log.Logger + store *sync.Map + ttl time.Duration + logger log.Logger } // NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { return &inMemoryDeltaDistributionStore{ - store: map[string]*histogramEntry{}, - storeMutex: &sync.RWMutex{}, - logger: logger, - ttl: ttl, + store: &sync.Map{}, + logger: logger, + ttl: ttl, } } @@ -69,16 +67,11 @@ func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring. return } - var entry *histogramEntry - s.storeMutex.Lock() - if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = &histogramEntry{ - collected: map[uint64]*CollectedHistogram{}, - mutex: &sync.RWMutex{}, - } - } - entry = s.store[metricDescriptor.Name] - s.storeMutex.Unlock() + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &histogramEntry{ + collected: map[uint64]*CollectedHistogram{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*histogramEntry) key := toHistogramKey(currentValue) @@ -147,13 +140,11 @@ func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string now := time.Now() ttlWindowStart := now.Add(-s.ttl) - s.storeMutex.Lock() - entry := s.store[metricDescriptorName] - if entry == nil { - s.storeMutex.Unlock() + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { return output } - s.storeMutex.Unlock() + entry := tmp.(*histogramEntry) entry.mutex.Lock() defer entry.mutex.Unlock() From d070558b2e79cc5b358bd280421499257cde9d27 Mon Sep 17 00:00:00 2001 From: kgeckhart Date: Tue, 22 Nov 2022 13:24:28 -0500 Subject: [PATCH 8/8] Apply suggestions from code review Co-authored-by: Ben Kochie Signed-off-by: kgeckhart --- collectors/delta_counter.go | 2 +- collectors/delta_distribution.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index 516e1767..c3798ab3 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -1,4 +1,4 @@ -// Copyright 2020 The Prometheus Authors +// Copyright 2022 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index c6e46b9f..83bc96ae 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -1,4 +1,4 @@ -// Copyright 2020 The Prometheus Authors +// Copyright 2022 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at