diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index e267e0302..61cc502f1 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -2,6 +2,7 @@ package cloudwatch import ( "fmt" + "os" "sync" "time" @@ -12,12 +13,18 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/generic" + "github.com/go-kit/kit/metrics/internal/lv" ) const ( maxConcurrentRequests = 20 ) +type Percentiles []struct { + s string + f float64 +} + // CloudWatch receives metrics observations and forwards them to CloudWatch. // Create a CloudWatch object, use it to create metrics, and pass those metrics as // dependencies to the components that will use them. @@ -27,66 +34,106 @@ type CloudWatch struct { mtx sync.RWMutex sem chan struct{} namespace string - numConcurrentRequests int svc cloudwatchiface.CloudWatchAPI - counters map[string]*counter - gauges map[string]*gauge - histograms map[string]*histogram + counters *lv.Space + gauges *lv.Space + histograms *lv.Space + percentiles Percentiles logger log.Logger + numConcurrentRequests int +} + +type option func(*CloudWatch) + +func (s *CloudWatch) apply(opt option) { + if opt != nil { + opt(s) + } +} + +func WithLogger(logger log.Logger) option { + return func(c *CloudWatch) { + c.logger = logger + } +} + +func WithPercentiles(p Percentiles) option { + return func(c *CloudWatch) { + validated := Percentiles{} + for _, entry := range p { + if entry.f < 0 || entry.f > 1 { + continue // illegal entry + } + validated = append(validated, entry) + } + c.percentiles = validated + } +} + +func WithConcurrentRequests(n int) option { + return func(c *CloudWatch) { + if n > maxConcurrentRequests { + n = maxConcurrentRequests + } + c.numConcurrentRequests = n + } } // New returns a CloudWatch object that may be used to create metrics. // Namespace is applied to all created metrics and maps to the CloudWatch namespace. -// NumConcurrent sets the number of simultaneous requests to Amazon. -// A good default value is 10 and the maximum is 20. // Callers must ensure that regular calls to Send are performed, either // manually or with one of the helper methods. -func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch { - if numConcurrent > maxConcurrentRequests { - numConcurrent = maxConcurrentRequests +func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch { + cw := &CloudWatch{ + sem: nil, // set below + namespace: namespace, + svc: svc, + counters: lv.NewSpace(), + gauges: lv.NewSpace(), + histograms: lv.NewSpace(), + numConcurrentRequests: 10, + logger: log.NewLogfmtLogger(os.Stderr), + percentiles: Percentiles{ + {"50", 0.50}, + {"90", 0.90}, + {"95", 0.95}, + {"99", 0.99}, + }, } - return &CloudWatch{ - sem: make(chan struct{}, numConcurrent), - namespace: namespace, - numConcurrentRequests: numConcurrent, - svc: svc, - counters: map[string]*counter{}, - gauges: map[string]*gauge{}, - histograms: map[string]*histogram{}, - logger: logger, + for _, optFunc := range options { + optFunc(cw) } + + cw.sem = make(chan struct{}, cw.numConcurrentRequests) + + return cw } // NewCounter returns a counter. Observations are aggregated and emitted once // per write invocation. func (cw *CloudWatch) NewCounter(name string) metrics.Counter { - cw.mtx.Lock() - defer cw.mtx.Unlock() - c := &counter{c: generic.NewCounter(name)} - cw.counters[name] = c - return c + return &Counter{ + name: name, + obs: cw.counters.Observe, + } } -// NewGauge returns a gauge. Observations are aggregated and emitted once per -// write invocation. +// NewGauge returns an gauge. func (cw *CloudWatch) NewGauge(name string) metrics.Gauge { - cw.mtx.Lock() - defer cw.mtx.Unlock() - g := &gauge{g: generic.NewGauge(name)} - cw.gauges[name] = g - return g + return &Gauge{ + name: name, + obs: cw.gauges.Observe, + add: cw.gauges.Add, + } } -// NewHistogram returns a histogram. Observations are aggregated and emitted as -// per-quantile gauges, once per write invocation. 50 is a good default value -// for buckets. -func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram { - cw.mtx.Lock() - defer cw.mtx.Unlock() - h := &histogram{h: generic.NewHistogram(name, buckets)} - cw.histograms[name] = h - return h +// NewHistogram returns a histogram. +func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram { + return &Histogram{ + name: name, + obs: cw.histograms.Observe, + } } // WriteLoop is a helper method that invokes Send every time the passed @@ -110,42 +157,46 @@ func (cw *CloudWatch) Send() error { var datums []*cloudwatch.MetricDatum - for name, c := range cw.counters { + cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + value := sum(values) datums = append(datums, &cloudwatch.MetricDatum{ MetricName: aws.String(name), - Dimensions: makeDimensions(c.c.LabelValues()...), - Value: aws.Float64(c.c.Value()), + Dimensions: makeDimensions(lvs...), + Value: aws.Float64(value), Timestamp: aws.Time(now), }) - } + return true + }) - for name, g := range cw.gauges { + cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + value := last(values) datums = append(datums, &cloudwatch.MetricDatum{ MetricName: aws.String(name), - Dimensions: makeDimensions(g.g.LabelValues()...), - Value: aws.Float64(g.g.Value()), + Dimensions: makeDimensions(lvs...), + Value: aws.Float64(value), Timestamp: aws.Time(now), }) - } + return true + }) - for name, h := range cw.histograms { - for _, p := range []struct { - s string - f float64 - }{ - {"50", 0.50}, - {"90", 0.90}, - {"95", 0.95}, - {"99", 0.99}, - } { + cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + histogram := generic.NewHistogram(name, 50) + + for _, v := range values { + histogram.Observe(v) + } + + for _, p := range cw.percentiles { + value := histogram.Quantile(p.f) datums = append(datums, &cloudwatch.MetricDatum{ MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)), - Dimensions: makeDimensions(h.h.LabelValues()...), - Value: aws.Float64(h.h.Quantile(p.f)), + Dimensions: makeDimensions(lvs...), + Value: aws.Float64(value), Timestamp: aws.Time(now), }) } - } + return true + }) var batches [][]*cloudwatch.MetricDatum for len(datums) > 0 { @@ -179,6 +230,18 @@ func (cw *CloudWatch) Send() error { return firstErr } +func sum(a []float64) float64 { + var v float64 + for _, f := range a { + v += f + } + return v +} + +func last(a []float64) float64 { + return a[len(a)-1] +} + func min(a, b int) int { if a < b { return a @@ -186,57 +249,79 @@ func min(a, b int) int { return b } -// counter is a CloudWatch counter metric. -type counter struct { - c *generic.Counter +type observeFunc func(name string, lvs lv.LabelValues, value float64) + +// Counter is a counter. Observations are forwarded to a node +// object, and aggregated (summed) per timeseries. +type Counter struct { + name string + lvs lv.LabelValues + obs observeFunc } -// With implements counter -func (c *counter) With(labelValues ...string) metrics.Counter { - c.c = c.c.With(labelValues...).(*generic.Counter) - return c +// With implements metrics.Counter. +func (c *Counter) With(labelValues ...string) metrics.Counter { + return &Counter{ + name: c.name, + lvs: c.lvs.With(labelValues...), + obs: c.obs, + } } -// Add implements counter. -func (c *counter) Add(delta float64) { - c.c.Add(delta) +// Add implements metrics.Counter. +func (c *Counter) Add(delta float64) { + c.obs(c.name, c.lvs, delta) } -// gauge is a CloudWatch gauge metric. -type gauge struct { - g *generic.Gauge +// Gauge is a gauge. Observations are forwarded to a node +// object, and aggregated (the last observation selected) per timeseries. +type Gauge struct { + name string + lvs lv.LabelValues + obs observeFunc + add observeFunc } -// With implements gauge -func (g *gauge) With(labelValues ...string) metrics.Gauge { - g.g = g.g.With(labelValues...).(*generic.Gauge) - return g +// With implements metrics.Gauge. +func (g *Gauge) With(labelValues ...string) metrics.Gauge { + return &Gauge{ + name: g.name, + lvs: g.lvs.With(labelValues...), + obs: g.obs, + add: g.add, + } } -// Set implements gauge -func (g *gauge) Set(value float64) { - g.g.Set(value) +// Set implements metrics.Gauge. +func (g *Gauge) Set(value float64) { + g.obs(g.name, g.lvs, value) } -// Add implements gauge -func (g *gauge) Add(delta float64) { - g.g.Add(delta) +// Add implements metrics.Gauge. +func (g *Gauge) Add(delta float64) { + g.add(g.name, g.lvs, delta) } -// histogram is a CloudWatch histogram metric -type histogram struct { - h *generic.Histogram +// Histogram is an Influx histrogram. Observations are aggregated into a +// generic.Histogram and emitted as per-quantile gauges to the Influx server. +type Histogram struct { + name string + lvs lv.LabelValues + obs observeFunc } -// With implements histogram -func (h *histogram) With(labelValues ...string) metrics.Histogram { - h.h = h.h.With(labelValues...).(*generic.Histogram) - return h +// With implements metrics.Histogram. +func (h *Histogram) With(labelValues ...string) metrics.Histogram { + return &Histogram{ + name: h.name, + lvs: h.lvs.With(labelValues...), + obs: h.obs, + } } -// Observe implements histogram -func (h *histogram) Observe(value float64) { - h.h.Observe(value) +// Observe implements metrics.Histogram. +func (h *Histogram) Observe(value float64) { + h.obs(h.name, h.lvs, value) } func makeDimensions(labelValues ...string) []*cloudwatch.Dimension { diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index d36d9b2aa..8ed93ace3 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -39,8 +39,15 @@ func (mcw *mockCloudWatch) PutMetricData(input *cloudwatch.PutMetricDataInput) ( return nil, nil } -func testDimensions(svc *mockCloudWatch, name string, labelValues ...string) error { - dimensions, ok := svc.dimensionsReceived[name] +func (mcw *mockCloudWatch) testDimensions(name string, labelValues ...string) error { + mcw.mtx.RLock() + _, hasValue := mcw.valuesReceived[name] + if !hasValue { + return nil // nothing to check; 0 samples were received + } + dimensions, ok := mcw.dimensionsReceived[name] + mcw.mtx.RUnlock() + if !ok { if len(labelValues) > 0 { return errors.New("Expected dimensions to be available, but none were") @@ -66,7 +73,7 @@ func TestCounter(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, 10, log.NewNopLogger()) + cw := New(namespace, svc, WithLogger(log.NewNopLogger())) counter := cw.NewCounter(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -80,7 +87,10 @@ func TestCounter(t *testing.T) { if err := teststat.TestCounter(counter, valuef); err != nil { t.Fatal(err) } - if err := testDimensions(svc, name, label, value); err != nil { + if err := teststat.TestCounter(counter, valuef); err != nil { + t.Fatal("Fill and flush counter 2nd time: ", err) + } + if err := svc.testDimensions(name, label, value); err != nil { t.Fatal(err) } } @@ -95,7 +105,10 @@ func TestCounterLowSendConcurrency(t *testing.T) { values = append(values, "value"+num) } svc := newMockCloudWatch() - cw := New(namespace, svc, 2, log.NewNopLogger()) + cw := New(namespace, svc, + WithLogger(log.NewNopLogger()), + WithConcurrentRequests(2), + ) counters := make(map[string]metrics.Counter) var wants []float64 @@ -113,7 +126,7 @@ func TestCounterLowSendConcurrency(t *testing.T) { if svc.valuesReceived[name] != wants[i] { t.Fatalf("want %f, have %f", wants[i], svc.valuesReceived[name]) } - if err := testDimensions(svc, name, labels[i], values[i]); err != nil { + if err := svc.testDimensions(name, labels[i], values[i]); err != nil { t.Fatal(err) } } @@ -123,7 +136,7 @@ func TestGauge(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, 10, log.NewNopLogger()) + cw := New(namespace, svc, WithLogger(log.NewNopLogger())) gauge := cw.NewGauge(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -137,7 +150,7 @@ func TestGauge(t *testing.T) { if err := teststat.TestGauge(gauge, valuef); err != nil { t.Fatal(err) } - if err := testDimensions(svc, name, label, value); err != nil { + if err := svc.testDimensions(name, label, value); err != nil { t.Fatal(err) } } @@ -146,8 +159,8 @@ func TestHistogram(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, 10, log.NewNopLogger()) - histogram := cw.NewHistogram(name, 50).With(label, value) + cw := New(namespace, svc, WithLogger(log.NewNopLogger())) + histogram := cw.NewHistogram(name).With(label, value) n50 := fmt.Sprintf("%s_50", name) n90 := fmt.Sprintf("%s_90", name) n95 := fmt.Sprintf("%s_95", name) @@ -168,16 +181,16 @@ func TestHistogram(t *testing.T) { if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { t.Fatal(err) } - if err := testDimensions(svc, n50, label, value); err != nil { + if err := svc.testDimensions(n50, label, value); err != nil { t.Fatal(err) } - if err := testDimensions(svc, n90, label, value); err != nil { + if err := svc.testDimensions(n90, label, value); err != nil { t.Fatal(err) } - if err := testDimensions(svc, n95, label, value); err != nil { + if err := svc.testDimensions(n95, label, value); err != nil { t.Fatal(err) } - if err := testDimensions(svc, n99, label, value); err != nil { + if err := svc.testDimensions(n99, label, value); err != nil { t.Fatal(err) } }