diff --git a/README.md b/README.md index e19150b1..c14ef6c1 100644 --- a/README.md +++ b/README.md @@ -61,16 +61,18 @@ If you are still using the legacy [Access scopes][access-scopes], the `https://w ### Flags -| Flag | Required | Default | Description | -| --------------------------------- | -------- | ------- | ----------- | -| `google.project-id` | No | GCloud SDK auto-discovery | Comma seperated list of Google Project IDs | -| `monitoring.metrics-ingest-delay` | No | | Offsets metric collection by a delay appropriate for each metric type, e.g. because bigquery metrics are slow to appear | -| `monitoring.metrics-type-prefixes` | Yes | | Comma separated Google Stackdriver Monitoring Metric Type prefixes (see [example][metrics-prefix-example] and [available metrics][metrics-list]) | -| `monitoring.metrics-interval` | No | `5m` | Metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API. Only the most recent data point is used | -| `monitoring.metrics-offset` | No | `0s` | Offset (into the past) for the metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API, to handle latency in published metrics | -| `monitoring.filters` | No | | Formatted string to allow filtering on certain metrics type | -| `web.listen-address` | No | `:9255` | Address to listen on for web interface and telemetry | -| `web.telemetry-path` | No | `/metrics` | Path under which to expose Prometheus metrics | +| Flag | Required | Default | Description | +| --------------------------------- | -------- |---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `google.project-id` | No | GCloud SDK auto-discovery | Comma seperated list of Google Project IDs | +| `monitoring.metrics-ingest-delay` | No | | Offsets metric collection by a delay appropriate for each metric type, e.g. because bigquery metrics are slow to appear | +| `monitoring.metrics-type-prefixes` | Yes | | Comma separated Google Stackdriver Monitoring Metric Type prefixes (see [example][metrics-prefix-example] and [available metrics][metrics-list]) | +| `monitoring.metrics-interval` | No | `5m` | Metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API. Only the most recent data point is used | +| `monitoring.metrics-offset` | No | `0s` | Offset (into the past) for the metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API, to handle latency in published metrics | +| `monitoring.filters` | No | | Formatted string to allow filtering on certain metrics type | +| `monitoring.aggregate-deltas` | No | | If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge. Be sure to read [what to know about aggregating DELTA metrics](#what-to-know-about-aggregating-delta-metrics) | +| `monitoring.aggregate-deltas-ttl` | No | `30m` | How long should a delta metric continue to be exported and stored after GCP stops producing it. Read [slow moving metrics](#slow-moving-metrics) to understand the problem this attempts to solve | +| `web.listen-address` | No | `:9255` | Address to listen on for web interface and telemetry | +| `web.telemetry-path` | No | `/metrics` | Path under which to expose Prometheus metrics | ### Metrics @@ -95,7 +97,9 @@ Metrics gathered from Google Stackdriver Monitoring are converted to Prometheus 3. the metric type labels (see [Metrics List][metrics-list]) 4. the monitored resource labels (see [Monitored Resource Types][monitored-resources]) * For each timeseries, only the most recent data point is exported. -* Stackdriver `GAUGE` and `DELTA` metric kinds are reported as Prometheus `Gauge` metrics; Stackdriver `CUMULATIVE` metric kinds are reported as Prometheus `Counter` metrics. +* Stackdriver `GAUGE` metric kinds are reported as Prometheus `Gauge` metrics +* Stackdriver `CUMULATIVE` metric kinds are reported as Prometheus `Counter` metrics. +* Stackdriver `DELTA` metric kinds are reported as Prometheus `Gauge` metrics or an accumulating `Counter` if `monitoring.aggregate-deltas` is set * Only `BOOL`, `INT64`, `DOUBLE` and `DISTRIBUTION` metric types are supported, other types (`STRING` and `MONEY`) are discarded. * `DISTRIBUTION` metric type is reported as a Prometheus `Histogram`, except the `_sum` time series is not supported. @@ -118,7 +122,7 @@ stackdriver_exporter \ --monitoring.filters='pubsub.googleapis.com/subscription:resource.labels.subscription_id=monitoring.regex.full_match("us-west4.*my-team-subs.*")' ``` -## Filtering enabled collectors +### Filtering enabled collectors The `stackdriver_exporter` collects all metrics type prefixes by default. @@ -132,6 +136,31 @@ params: - compute.googleapis.com/instance/disk ``` +### What to know about Aggregating DELTA Metrics + +Treating DELTA Metrics as a gauge produces data which is wildly inaccurate/not very useful (see https://github.com/prometheus-community/stackdriver_exporter/issues/116). However, aggregating the DELTA metrics overtime is not a perfect solution and is intended to produce data which mirrors GCP's data as close as possible. + +The biggest challenge to producing a correct result is that a counter for prometheus does not start at 0, it starts at the first value which is exported. This can cause inconsistencies when the exporter first starts and for slow moving metrics which are described below. + +#### Start-up Delay + +When the exporter first starts it has no persisted counter information and the stores will be empty. When the first sample is received for a series it is intended to be a change from a previous value according to GCP, a delta. But the prometheus counter is not initialized to 0 so it does not export this as a change from 0, it exports that the counter started at the sample value. Since the series exported are dynamic it's not possible to export an [initial 0 value](https://prometheus.io/docs/practices/instrumentation/#avoid-missing-metrics) in order to account for this issue. The end result is that it can take a few cycles for aggregated metrics to start showing rates exactly as GCP. + +As an example consider a prometheus query, `sum by(backend_target_name) (rate(stackdriver_https_lb_rule_loadbalancing_googleapis_com_https_request_bytes_count[1m]))` which is aggregating 5 series. All 5 series will need to have two samples from GCP in order for the query to produce the same result as GCP. + +#### Slow Moving Metrics + +A slow moving metric would be a metric which is not constantly changing with every sample from GCP. GCP does not consistently report slow moving metrics DELTA metrics. If this occurs for too long (default 5m) prometheus will mark the series as [stale](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness). The end result is that the next reported sample will be treated as the start of a new series and not an increment from the previous value. Here's an example of this in action, ![](https://user-images.githubusercontent.com/4571540/184961445-ed40237b-108e-4177-9d06-aafe61f92430.png) + +There are two features which attempt to combat this issue, + +1. `monitoring.aggregate-deltas-ttl` which controls how long a metric is persisted in the data store after its no longer being reported by GCP +1. Metrics which were not collected during a scrape are still exported at their current counter value + +The configuration when using `monitoring.aggregate-deltas` gives a 30 minute buffer to slower moving metrics and `monitoring.aggregate-deltas-ttl` can be adjusted to tune memory requirements vs correctness. Storing the data for longer results in a higher memory cost. + +The feature which continues to export metrics which are not collected can cause `the sample has been rejected because another sample with the same timestamp, but a different value, has already been ingested` if your [scrape config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) for the exporter has `honor_timestamps` enabled (this is the default value). This is caused by the fact that it's not possible to know the different between GCP having late arriving data and GCP not exporting a value. The underlying counter is still incremented when this happens so the next reported sample will show a higher rate than expected. + ## Contributing Refer to the [contributing guidelines][contributing]. diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go new file mode 100644 index 00000000..c3798ab3 --- /dev/null +++ b/collectors/delta_counter.go @@ -0,0 +1,152 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collectors + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" +) + +type CollectedMetric struct { + metric *ConstMetric + lastCollectedAt time.Time +} + +// DeltaCounterStore defines a set of functions which must be implemented in order to be used as a DeltaCounterStore +// which accumulates DELTA Counter metrics over time +type DeltaCounterStore interface { + + // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming + // value to an existing entry in the underlying store + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) + + // ListMetrics will return all known entries in the store for a metricDescriptorName + ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric +} + +type metricEntry struct { + collected map[uint64]*CollectedMetric + mutex *sync.RWMutex +} + +type inMemoryDeltaCounterStore struct { + store *sync.Map + ttl time.Duration + logger log.Logger +} + +// NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory +func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { + return &inMemoryDeltaCounterStore{ + store: &sync.Map{}, + logger: logger, + ttl: ttl, + } +} + +func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { + if currentValue == nil { + return + } + + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &metricEntry{ + collected: map[uint64]*CollectedMetric{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*metricEntry) + + key := toCounterKey(currentValue) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) + entry.collected[key] = &CollectedMetric{currentValue, time.Now()} + return + } + + if existing.metric.reportTime.Before(currentValue.reportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry.collected[key].metric.reportTime, "incoming_time", currentValue.reportTime) + currentValue.value = currentValue.value + existing.metric.value + existing.metric = currentValue + existing.lastCollectedAt = time.Now() + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.metric.reportTime, "incoming_time", currentValue.reportTime) +} + +func toCounterKey(c *ConstMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, c.labelKeys...) + for i := range c.labelKeys { + labels[c.labelKeys[i]] = c.labelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", c.fqName, strings.Join(keyParts, "|")) + h := hashNew() + h = hashAdd(h, hashText) + + return h +} + +func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric { + output := map[string][]*CollectedMetric{} + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { + return output + } + entry := tmp.(*metricEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.lastCollectedAt) { + level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.fqName) + delete(entry.collected, key) + continue + } + + metrics, exists := output[collected.metric.fqName] + if !exists { + metrics = make([]*CollectedMetric, 0) + } + metricCopy := *collected.metric + outputEntry := CollectedMetric{ + metric: &metricCopy, + lastCollectedAt: collected.lastCollectedAt, + } + output[collected.metric.fqName] = append(metrics, &outputEntry) + } + + return output +} diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go new file mode 100644 index 00000000..83bc96ae --- /dev/null +++ b/collectors/delta_distribution.go @@ -0,0 +1,172 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collectors + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" +) + +type CollectedHistogram struct { + histogram *HistogramMetric + lastCollectedAt time.Time +} + +// DeltaDistributionStore defines a set of functions which must be implemented in order to be used as a DeltaDistributionStore +// which accumulates DELTA histogram metrics over time +type DeltaDistributionStore interface { + + // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming + // value to an existing entry in the underlying store + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) + + // ListMetrics will return all known entries in the store for a metricDescriptorName + ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram +} + +type histogramEntry struct { + collected map[uint64]*CollectedHistogram + mutex *sync.RWMutex +} + +type inMemoryDeltaDistributionStore struct { + store *sync.Map + ttl time.Duration + logger log.Logger +} + +// NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory +func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { + return &inMemoryDeltaDistributionStore{ + store: &sync.Map{}, + logger: logger, + ttl: ttl, + } +} + +func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { + if currentValue == nil { + return + } + + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &histogramEntry{ + collected: map[uint64]*CollectedHistogram{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*histogramEntry) + + key := toHistogramKey(currentValue) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime) + entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} + return + } + + if existing.histogram.reportTime.Before(currentValue.reportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime) + existing.histogram = mergeHistograms(existing.histogram, currentValue) + existing.lastCollectedAt = time.Now() + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime) +} + +func toHistogramKey(hist *HistogramMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, hist.labelKeys...) + for i := range hist.labelKeys { + labels[hist.labelKeys[i]] = hist.labelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", hist.fqName, strings.Join(keyParts, "|")) + h := hashNew() + h = hashAdd(h, hashText) + + return h +} + +func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *HistogramMetric { + for key, value := range existing.buckets { + current.buckets[key] += value + } + + // Calculate a new mean and overall count + mean := existing.dist.Mean + mean += current.dist.Mean + mean /= 2 + + var count uint64 + for _, v := range current.buckets { + count += v + } + + current.dist.Mean = mean + current.dist.Count = int64(count) + + return current +} + +func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram { + output := map[string][]*CollectedHistogram{} + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { + return output + } + entry := tmp.(*histogramEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.lastCollectedAt) { + level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName) + delete(entry.collected, key) + continue + } + + metrics, exists := output[collected.histogram.fqName] + if !exists { + metrics = make([]*CollectedHistogram, 0) + } + histCopy := *collected.histogram + outputEntry := CollectedHistogram{ + histogram: &histCopy, + lastCollectedAt: collected.lastCollectedAt, + } + output[collected.histogram.fqName] = append(metrics, &outputEntry) + } + + return output +} diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index 80390e14..f58d1369 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -52,6 +52,9 @@ type MonitoringCollector struct { collectorFillMissingLabels bool monitoringDropDelegatedProjects bool logger log.Logger + deltaCounterStore DeltaCounterStore + deltaDistributionStore DeltaDistributionStore + aggregateDeltas bool } type MonitoringCollectorOptions struct { @@ -73,9 +76,11 @@ type MonitoringCollectorOptions struct { FillMissingLabels bool // DropDelegatedProjects decides if only metrics matching the collector's projectID should be retrieved. DropDelegatedProjects bool + // AggregateDeltas decides if DELTA metrics should be treated as a counter using the provided counterStore/distributionStore or a gauge + AggregateDeltas bool } -func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger) (*MonitoringCollector, error) { +func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, distributionStore DeltaDistributionStore) (*MonitoringCollector, error) { apiCallsTotalMetric := prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "stackdriver", @@ -153,6 +158,9 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv collectorFillMissingLabels: opts.FillMissingLabels, monitoringDropDelegatedProjects: opts.DropDelegatedProjects, logger: logger, + deltaCounterStore: counterStore, + deltaDistributionStore: distributionStore, + aggregateDeltas: opts.AggregateDeltas, } return monitoringCollector, nil @@ -171,7 +179,7 @@ func (c *MonitoringCollector) Collect(ch chan<- prometheus.Metric) { var begun = time.Now() errorMetric := float64(0) - if err := c.reportMonitoringMetrics(ch); err != nil { + if err := c.reportMonitoringMetrics(ch, begun); err != nil { errorMetric = float64(1) c.scrapeErrorsTotalMetric.Inc() level.Error(c.logger).Log("msg", "Error while getting Google Stackdriver Monitoring metrics", "err", err) @@ -193,7 +201,7 @@ func (c *MonitoringCollector) Collect(ch chan<- prometheus.Metric) { c.lastScrapeDurationSecondsMetric.Collect(ch) } -func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metric) error { +func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metric, begun time.Time) error { metricDescriptorsFunction := func(page *monitoring.ListMetricDescriptorsResponse) error { var wg = &sync.WaitGroup{} @@ -270,8 +278,8 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri if page == nil { break } - if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch); err != nil { - level.Error(c.logger).Log("msg", "error reporting Time Series metrics for descripto", "descriptor", metricDescriptor.Type, "err", err) + if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch, begun); err != nil { + level.Error(c.logger).Log("msg", "error reporting Time Series metrics for descriptor", "descriptor", metricDescriptor.Type, "err", err) errChannel <- err break } @@ -317,6 +325,7 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri wg.Wait() close(errChannel) + level.Debug(c.logger).Log("msg", "Done reporting monitoring metrics") return <-errChannel } @@ -324,17 +333,21 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( page *monitoring.ListTimeSeriesResponse, metricDescriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric, + begun time.Time, ) error { var metricValue float64 var metricValueType prometheus.ValueType var newestTSPoint *monitoring.Point - timeSeriesMetrics := &TimeSeriesMetrics{ - metricDescriptor: metricDescriptor, - ch: ch, - fillMissingLabels: c.collectorFillMissingLabels, - constMetrics: make(map[string][]ConstMetric), - histogramMetrics: make(map[string][]HistogramMetric), + timeSeriesMetrics, err := NewTimeSeriesMetrics(metricDescriptor, + ch, + c.collectorFillMissingLabels, + c.deltaCounterStore, + c.deltaDistributionStore, + c.aggregateDeltas, + ) + if err != nil { + return fmt.Errorf("error creating the TimeSeriesMetrics %v", err) } for _, timeSeries := range page.TimeSeries { newestEndTime := time.Unix(0, 0) @@ -388,7 +401,11 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( case "GAUGE": metricValueType = prometheus.GaugeValue case "DELTA": - metricValueType = prometheus.GaugeValue + if c.aggregateDeltas { + metricValueType = prometheus.CounterValue + } else { + metricValueType = prometheus.GaugeValue + } case "CUMULATIVE": metricValueType = prometheus.CounterValue default: @@ -408,10 +425,12 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( case "DISTRIBUTION": dist := newestTSPoint.Value.DistributionValue buckets, err := c.generateHistogramBuckets(dist) + if err == nil { - timeSeriesMetrics.CollectNewConstHistogram(timeSeries, newestEndTime, labelKeys, dist, buckets, labelValues) + timeSeriesMetrics.CollectNewConstHistogram(timeSeries, newestEndTime, labelKeys, dist, buckets, labelValues, timeSeries.MetricKind) } else { - level.Debug(c.logger).Log("msg", "discarding", "resource", timeSeries.Resource.Type, "metric", timeSeries.Metric.Type, "err", err) + level.Debug(c.logger).Log("msg", "discarding", "resource", timeSeries.Resource.Type, "metric", + timeSeries.Metric.Type, "err", err) } continue default: @@ -419,9 +438,9 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( continue } - timeSeriesMetrics.CollectNewConstMetric(timeSeries, newestEndTime, labelKeys, metricValueType, metricValue, labelValues) + timeSeriesMetrics.CollectNewConstMetric(timeSeries, newestEndTime, labelKeys, metricValueType, metricValue, labelValues, timeSeries.MetricKind) } - timeSeriesMetrics.Complete() + timeSeriesMetrics.Complete(begun) return nil } diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 96aee6e6..496765c1 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -32,16 +32,40 @@ func buildFQName(timeSeries *monitoring.TimeSeries) string { return prometheus.BuildFQName("stackdriver", utils.NormalizeMetricName(timeSeries.Resource.Type), utils.NormalizeMetricName(timeSeries.Metric.Type)) } -type TimeSeriesMetrics struct { +type timeSeriesMetrics struct { metricDescriptor *monitoring.MetricDescriptor - ch chan<- prometheus.Metric + + ch chan<- prometheus.Metric fillMissingLabels bool - constMetrics map[string][]ConstMetric - histogramMetrics map[string][]HistogramMetric + constMetrics map[string][]*ConstMetric + histogramMetrics map[string][]*HistogramMetric + + deltaCounterStore DeltaCounterStore + deltaDistributionStore DeltaDistributionStore + aggregateDeltas bool +} + +func NewTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, + ch chan<- prometheus.Metric, + fillMissingLabels bool, + deltaCounterStore DeltaCounterStore, + deltaDistributionStore DeltaDistributionStore, + aggregateDeltas bool) (*timeSeriesMetrics, error) { + + return &timeSeriesMetrics{ + metricDescriptor: descriptor, + ch: ch, + fillMissingLabels: fillMissingLabels, + constMetrics: make(map[string][]*ConstMetric), + histogramMetrics: make(map[string][]*HistogramMetric), + deltaCounterStore: deltaCounterStore, + deltaDistributionStore: deltaDistributionStore, + aggregateDeltas: aggregateDeltas, + }, nil } -func (t *TimeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *prometheus.Desc { +func (t *timeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *prometheus.Desc { return prometheus.NewDesc( fqName, t.metricDescriptor.Description, @@ -72,15 +96,12 @@ type HistogramMetric struct { keysHash uint64 } -func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) { +func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) { fqName := buildFQName(timeSeries) - if t.fillMissingLabels { - vs, ok := t.histogramMetrics[fqName] - if !ok { - vs = make([]HistogramMetric, 0) - } - v := HistogramMetric{ + var v HistogramMetric + if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { + v = HistogramMetric{ fqName: fqName, labelKeys: labelKeys, dist: dist, @@ -90,13 +111,26 @@ func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time keysHash: hashLabelKeys(labelKeys), } - t.histogramMetrics[fqName] = append(vs, v) + } + + if metricKind == "DELTA" && t.aggregateDeltas { + t.deltaDistributionStore.Increment(t.metricDescriptor, &v) + return + } + + if t.fillMissingLabels { + vs, ok := t.histogramMetrics[fqName] + if !ok { + vs = make([]*HistogramMetric, 0) + } + t.histogramMetrics[fqName] = append(vs, &v) return } + t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist, buckets, labelValues) } -func (t *TimeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { +func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { return prometheus.NewMetricWithTimestamp( reportTime, prometheus.MustNewConstHistogram( @@ -109,15 +143,12 @@ func (t *TimeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Tim ) } -func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) { +func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string, metricKind string) { fqName := buildFQName(timeSeries) - if t.fillMissingLabels { - vs, ok := t.constMetrics[fqName] - if !ok { - vs = make([]ConstMetric, 0) - } - v := ConstMetric{ + var v ConstMetric + if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { + v = ConstMetric{ fqName: fqName, labelKeys: labelKeys, valueType: metricValueType, @@ -127,13 +158,26 @@ func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer keysHash: hashLabelKeys(labelKeys), } - t.constMetrics[fqName] = append(vs, v) + } + + if metricKind == "DELTA" && t.aggregateDeltas { + t.deltaCounterStore.Increment(t.metricDescriptor, &v) return } + + if t.fillMissingLabels { + vs, ok := t.constMetrics[fqName] + if !ok { + vs = make([]*ConstMetric, 0) + } + t.constMetrics[fqName] = append(vs, &v) + return + } + t.ch <- t.newConstMetric(fqName, reportTime, labelKeys, metricValueType, metricValue, labelValues) } -func (t *TimeSeriesMetrics) newConstMetric(fqName string, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) prometheus.Metric { +func (t *timeSeriesMetrics) newConstMetric(fqName string, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) prometheus.Metric { return prometheus.NewMetricWithTimestamp( reportTime, prometheus.MustNewConstMetric( @@ -157,13 +201,15 @@ func hashLabelKeys(labelKeys []string) uint64 { return dh } -func (t *TimeSeriesMetrics) Complete() { - t.completeConstMetrics() - t.completeHistogramMetrics() +func (t *timeSeriesMetrics) Complete(reportingStartTime time.Time) { + t.completeDeltaConstMetrics(reportingStartTime) + t.completeDeltaHistogramMetrics(reportingStartTime) + t.completeConstMetrics(t.constMetrics) + t.completeHistogramMetrics(t.histogramMetrics) } -func (t *TimeSeriesMetrics) completeConstMetrics() { - for _, vs := range t.constMetrics { +func (t *timeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*ConstMetric) { + for _, vs := range constMetrics { if len(vs) > 1 { var needFill bool for i := 1; i < len(vs); i++ { @@ -182,8 +228,8 @@ func (t *TimeSeriesMetrics) completeConstMetrics() { } } -func (t *TimeSeriesMetrics) completeHistogramMetrics() { - for _, vs := range t.histogramMetrics { +func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*HistogramMetric) { + for _, vs := range histograms { if len(vs) > 1 { var needFill bool for i := 1; i < len(vs); i++ { @@ -201,15 +247,91 @@ func (t *TimeSeriesMetrics) completeHistogramMetrics() { } } -func fillConstMetricsLabels(metrics []ConstMetric) []ConstMetric { +func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Time) { + descriptorMetrics := t.deltaCounterStore.ListMetrics(t.metricDescriptor.Name) + now := time.Now().Truncate(time.Minute) + + constMetrics := map[string][]*ConstMetric{} + for _, metrics := range descriptorMetrics { + for _, collected := range metrics { + // If the metric wasn't collected we should still export it at the next sample time to avoid staleness + if reportingStartTime.After(collected.lastCollectedAt) { + // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many + // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional + // for a monitoring.MetricDescriptor + reportingLag := collected.lastCollectedAt.Sub(collected.metric.reportTime).Truncate(time.Minute) + collected.metric.reportTime = now.Add(-reportingLag) + } + if t.fillMissingLabels { + if _, exists := constMetrics[collected.metric.fqName]; !exists { + constMetrics[collected.metric.fqName] = []*ConstMetric{} + } + constMetrics[collected.metric.fqName] = append(constMetrics[collected.metric.fqName], collected.metric) + } else { + t.ch <- t.newConstMetric( + collected.metric.fqName, + collected.metric.reportTime, + collected.metric.labelKeys, + collected.metric.valueType, + collected.metric.value, + collected.metric.labelValues, + ) + } + } + } + + if t.fillMissingLabels { + t.completeConstMetrics(constMetrics) + } +} + +func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime time.Time) { + descriptorMetrics := t.deltaDistributionStore.ListMetrics(t.metricDescriptor.Name) + now := time.Now().Truncate(time.Minute) + + histograms := map[string][]*HistogramMetric{} + for _, metrics := range descriptorMetrics { + for _, collected := range metrics { + // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness + if reportingStartTime.After(collected.lastCollectedAt) { + // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many + // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional + // for a monitoring.MetricDescriptor + reportingLag := collected.lastCollectedAt.Sub(collected.histogram.reportTime).Truncate(time.Minute) + collected.histogram.reportTime = now.Add(-reportingLag) + } + if t.fillMissingLabels { + if _, exists := histograms[collected.histogram.fqName]; !exists { + histograms[collected.histogram.fqName] = []*HistogramMetric{} + } + histograms[collected.histogram.fqName] = append(histograms[collected.histogram.fqName], collected.histogram) + } else { + t.ch <- t.newConstHistogram( + collected.histogram.fqName, + collected.histogram.reportTime, + collected.histogram.labelKeys, + collected.histogram.dist, + collected.histogram.buckets, + collected.histogram.labelValues, + ) + } + } + } + + if t.fillMissingLabels { + t.completeHistogramMetrics(histograms) + } +} + +func fillConstMetricsLabels(metrics []*ConstMetric) []*ConstMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { for _, key := range metric.labelKeys { allKeys[key] = struct{}{} } } - result := make([]ConstMetric, len(metrics)) - for i, metric := range metrics { + + for _, metric := range metrics { if len(metric.labelKeys) != len(allKeys) { metricKeys := make(map[string]struct{}) for _, key := range metric.labelKeys { @@ -222,21 +344,20 @@ func fillConstMetricsLabels(metrics []ConstMetric) []ConstMetric { } } } - result[i] = metric } - return result + return metrics } -func fillHistogramMetricsLabels(metrics []HistogramMetric) []HistogramMetric { +func fillHistogramMetricsLabels(metrics []*HistogramMetric) []*HistogramMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { for _, key := range metric.labelKeys { allKeys[key] = struct{}{} } } - result := make([]HistogramMetric, len(metrics)) - for i, metric := range metrics { + + for _, metric := range metrics { if len(metric.labelKeys) != len(allKeys) { metricKeys := make(map[string]struct{}) for _, key := range metric.labelKeys { @@ -249,8 +370,7 @@ func fillHistogramMetricsLabels(metrics []HistogramMetric) []HistogramMetric { } } } - result[i] = metric } - return result + return metrics } diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 06b61311..ab8b49b9 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -101,6 +101,14 @@ var ( monitoringMetricsExtraFilter = kingpin.Flag( "monitoring.filters", "Filters. i.e: pubsub.googleapis.com/subscription:resource.labels.subscription_id=monitoring.regex.full_match(\"my-subs-prefix.*\")").Strings() + + monitoringMetricsAggregateDeltas = kingpin.Flag( + "monitoring.aggregate-deltas", "If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge", + ).Default("false").Bool() + + monitoringMetricsDeltasTTL = kingpin.Flag( + "monitoring.aggregate-deltas-ttl", "How long should a delta metric continue to be exported after GCP stops producing a metric", + ).Default("30m").Duration() ) func init() { @@ -191,7 +199,8 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { IngestDelay: *monitoringMetricsIngestDelay, FillMissingLabels: *collectorFillMissingLabels, DropDelegatedProjects: *monitoringDropDelegatedProjects, - }, h.logger) + AggregateDeltas: *monitoringMetricsAggregateDeltas, + }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, *monitoringMetricsDeltasTTL), collectors.NewInMemoryDeltaDistributionStore(h.logger, *monitoringMetricsDeltasTTL)) if err != nil { level.Error(h.logger).Log("err", err) os.Exit(1)