From b04b2e2128fd62e048a6efaf426610316209ed74 Mon Sep 17 00:00:00 2001 From: Eric Feliksik Date: Mon, 31 Jul 2017 13:06:33 +0200 Subject: [PATCH 1/4] add failing test for cloudwatch metrics, that are not reset --- metrics/cloudwatch/cloudwatch_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index d36d9b2aa..025242a84 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -80,6 +80,9 @@ func TestCounter(t *testing.T) { if err := teststat.TestCounter(counter, valuef); err != nil { t.Fatal(err) } + if err := teststat.TestCounter(counter, valuef); err != nil { + t.Fatal("Fill and flush counter 2nd time: ", err) + } if err := testDimensions(svc, name, label, value); err != nil { t.Fatal(err) } From 2a3c64e00d64908242454c8a55f6826e30f1ba12 Mon Sep 17 00:00:00 2001 From: Eric Feliksik Date: Mon, 31 Jul 2017 16:20:51 +0200 Subject: [PATCH 2/4] Refactor cloudwatch: Reset().Walk() on every Send(), like influx impl does. Note there is a breaking API change, as the cloudwatch object now has optional parameters. --- metrics/cloudwatch/cloudwatch.go | 279 +++++++++++++++++--------- metrics/cloudwatch/cloudwatch_test.go | 34 ++-- 2 files changed, 204 insertions(+), 109 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index e267e0302..8336dbc80 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -2,6 +2,7 @@ package cloudwatch import ( "fmt" + "os" "sync" "time" @@ -12,81 +13,131 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/generic" + "github.com/go-kit/kit/metrics/internal/lv" ) const ( maxConcurrentRequests = 20 ) +type Percentiles []struct { + s string + f float64 +} + // CloudWatch receives metrics observations and forwards them to CloudWatch. // Create a CloudWatch object, use it to create metrics, and pass those metrics as // dependencies to the components that will use them. // // To regularly report metrics to CloudWatch, use the WriteLoop helper method. type CloudWatch struct { - mtx sync.RWMutex - sem chan struct{} - namespace string - numConcurrentRequests int - svc cloudwatchiface.CloudWatchAPI - counters map[string]*counter - gauges map[string]*gauge - histograms map[string]*histogram + mtx sync.RWMutex + sem chan struct{} + namespace string + svc cloudwatchiface.CloudWatchAPI + counters *lv.Space + gauges *lv.Space + histograms *lv.Space + *cwoptions +} + +type cwoptions struct { + percentiles Percentiles logger log.Logger + numConcurrentRequests int +} + +type option func(*cwoptions) + +func (s *cwoptions) apply(opt option) { + if opt != nil { + opt(s) + } +} + +func WithLogger(logger log.Logger) option { + return func(o *cwoptions) { + o.logger = logger + } +} + +func WithPercentiles(p Percentiles) option { + return func(o *cwoptions) { + validated := Percentiles{} + for _, entry := range p { + if entry.f < 0 || entry.f > 1 { + continue // illegal entry + } + validated = append(validated, entry) + } + o.percentiles = validated + } +} + +func WithConcurrentRequests(n int) option { + return func(o *cwoptions) { + if n > maxConcurrentRequests { + n = maxConcurrentRequests + } + o.numConcurrentRequests = n + } } // New returns a CloudWatch object that may be used to create metrics. // Namespace is applied to all created metrics and maps to the CloudWatch namespace. -// NumConcurrent sets the number of simultaneous requests to Amazon. -// A good default value is 10 and the maximum is 20. // Callers must ensure that regular calls to Send are performed, either // manually or with one of the helper methods. -func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch { - if numConcurrent > maxConcurrentRequests { - numConcurrent = maxConcurrentRequests +func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch { + useOptions := &cwoptions{ + numConcurrentRequests: 10, + logger: log.NewLogfmtLogger(os.Stderr), + percentiles: Percentiles{ + {"50", 0.50}, + {"90", 0.90}, + {"95", 0.95}, + {"99", 0.99}, + }, + } + + for _, opt := range options { + useOptions.apply(opt) } return &CloudWatch{ - sem: make(chan struct{}, numConcurrent), - namespace: namespace, - numConcurrentRequests: numConcurrent, + sem: make(chan struct{}, useOptions.numConcurrentRequests), + namespace: namespace, svc: svc, - counters: map[string]*counter{}, - gauges: map[string]*gauge{}, - histograms: map[string]*histogram{}, - logger: logger, + counters: lv.NewSpace(), + gauges: lv.NewSpace(), + histograms: lv.NewSpace(), + cwoptions: useOptions, } } // NewCounter returns a counter. Observations are aggregated and emitted once // per write invocation. func (cw *CloudWatch) NewCounter(name string) metrics.Counter { - cw.mtx.Lock() - defer cw.mtx.Unlock() - c := &counter{c: generic.NewCounter(name)} - cw.counters[name] = c - return c + return &Counter{ + name: name, + obs: cw.counters.Observe, + } } -// NewGauge returns a gauge. Observations are aggregated and emitted once per -// write invocation. +// NewGauge returns an gauge. func (cw *CloudWatch) NewGauge(name string) metrics.Gauge { - cw.mtx.Lock() - defer cw.mtx.Unlock() - g := &gauge{g: generic.NewGauge(name)} - cw.gauges[name] = g - return g + return &Gauge{ + name: name, + obs: cw.gauges.Observe, + add: cw.gauges.Add, + } } -// NewHistogram returns a histogram. Observations are aggregated and emitted as -// per-quantile gauges, once per write invocation. 50 is a good default value -// for buckets. -func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram { - cw.mtx.Lock() - defer cw.mtx.Unlock() - h := &histogram{h: generic.NewHistogram(name, buckets)} - cw.histograms[name] = h - return h +// NewHistogram returns a histogram. +func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram { + return &Histogram{ + name: name, + obs: cw.histograms.Observe, + } } // WriteLoop is a helper method that invokes Send every time the passed @@ -110,42 +161,46 @@ func (cw *CloudWatch) Send() error { var datums []*cloudwatch.MetricDatum - for name, c := range cw.counters { + cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + value := sum(values) datums = append(datums, &cloudwatch.MetricDatum{ MetricName: aws.String(name), - Dimensions: makeDimensions(c.c.LabelValues()...), - Value: aws.Float64(c.c.Value()), + Dimensions: makeDimensions(lvs...), + Value: aws.Float64(value), Timestamp: aws.Time(now), }) - } + return true + }) - for name, g := range cw.gauges { + cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + value := last(values) datums = append(datums, &cloudwatch.MetricDatum{ MetricName: aws.String(name), - Dimensions: makeDimensions(g.g.LabelValues()...), - Value: aws.Float64(g.g.Value()), + Dimensions: makeDimensions(lvs...), + Value: aws.Float64(value), Timestamp: aws.Time(now), }) - } + return true + }) - for name, h := range cw.histograms { - for _, p := range []struct { - s string - f float64 - }{ - {"50", 0.50}, - {"90", 0.90}, - {"95", 0.95}, - {"99", 0.99}, - } { + cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + histogram := generic.NewHistogram(name, 50) + + for _, v := range values { + histogram.Observe(v) + } + + for _, p := range cw.percentiles { + value := histogram.Quantile(p.f) datums = append(datums, &cloudwatch.MetricDatum{ MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)), - Dimensions: makeDimensions(h.h.LabelValues()...), - Value: aws.Float64(h.h.Quantile(p.f)), + Dimensions: makeDimensions(lvs...), + Value: aws.Float64(value), Timestamp: aws.Time(now), }) } - } + return true + }) var batches [][]*cloudwatch.MetricDatum for len(datums) > 0 { @@ -179,6 +234,18 @@ func (cw *CloudWatch) Send() error { return firstErr } +func sum(a []float64) float64 { + var v float64 + for _, f := range a { + v += f + } + return v +} + +func last(a []float64) float64 { + return a[len(a)-1] +} + func min(a, b int) int { if a < b { return a @@ -186,57 +253,79 @@ func min(a, b int) int { return b } -// counter is a CloudWatch counter metric. -type counter struct { - c *generic.Counter +type observeFunc func(name string, lvs lv.LabelValues, value float64) + +// Counter is a counter. Observations are forwarded to a node +// object, and aggregated (summed) per timeseries. +type Counter struct { + name string + lvs lv.LabelValues + obs observeFunc } -// With implements counter -func (c *counter) With(labelValues ...string) metrics.Counter { - c.c = c.c.With(labelValues...).(*generic.Counter) - return c +// With implements metrics.Counter. +func (c *Counter) With(labelValues ...string) metrics.Counter { + return &Counter{ + name: c.name, + lvs: c.lvs.With(labelValues...), + obs: c.obs, + } } -// Add implements counter. -func (c *counter) Add(delta float64) { - c.c.Add(delta) +// Add implements metrics.Counter. +func (c *Counter) Add(delta float64) { + c.obs(c.name, c.lvs, delta) } -// gauge is a CloudWatch gauge metric. -type gauge struct { - g *generic.Gauge +// Gauge is a gauge. Observations are forwarded to a node +// object, and aggregated (the last observation selected) per timeseries. +type Gauge struct { + name string + lvs lv.LabelValues + obs observeFunc + add observeFunc } -// With implements gauge -func (g *gauge) With(labelValues ...string) metrics.Gauge { - g.g = g.g.With(labelValues...).(*generic.Gauge) - return g +// With implements metrics.Gauge. +func (g *Gauge) With(labelValues ...string) metrics.Gauge { + return &Gauge{ + name: g.name, + lvs: g.lvs.With(labelValues...), + obs: g.obs, + add: g.add, + } } -// Set implements gauge -func (g *gauge) Set(value float64) { - g.g.Set(value) +// Set implements metrics.Gauge. +func (g *Gauge) Set(value float64) { + g.obs(g.name, g.lvs, value) } -// Add implements gauge -func (g *gauge) Add(delta float64) { - g.g.Add(delta) +// Add implements metrics.Gauge. +func (g *Gauge) Add(delta float64) { + g.add(g.name, g.lvs, delta) } -// histogram is a CloudWatch histogram metric -type histogram struct { - h *generic.Histogram +// Histogram is an Influx histrogram. Observations are aggregated into a +// generic.Histogram and emitted as per-quantile gauges to the Influx server. +type Histogram struct { + name string + lvs lv.LabelValues + obs observeFunc } -// With implements histogram -func (h *histogram) With(labelValues ...string) metrics.Histogram { - h.h = h.h.With(labelValues...).(*generic.Histogram) - return h +// With implements metrics.Histogram. +func (h *Histogram) With(labelValues ...string) metrics.Histogram { + return &Histogram{ + name: h.name, + lvs: h.lvs.With(labelValues...), + obs: h.obs, + } } -// Observe implements histogram -func (h *histogram) Observe(value float64) { - h.h.Observe(value) +// Observe implements metrics.Histogram. +func (h *Histogram) Observe(value float64) { + h.obs(h.name, h.lvs, value) } func makeDimensions(labelValues ...string) []*cloudwatch.Dimension { diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index 025242a84..2135a9119 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -39,8 +39,11 @@ func (mcw *mockCloudWatch) PutMetricData(input *cloudwatch.PutMetricDataInput) ( return nil, nil } -func testDimensions(svc *mockCloudWatch, name string, labelValues ...string) error { - dimensions, ok := svc.dimensionsReceived[name] +func (mcw *mockCloudWatch) testDimensions(name string, labelValues ...string) error { + mcw.mtx.RLock() + dimensions, ok := mcw.dimensionsReceived[name] + mcw.mtx.RUnlock() + if !ok { if len(labelValues) > 0 { return errors.New("Expected dimensions to be available, but none were") @@ -66,7 +69,7 @@ func TestCounter(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, 10, log.NewNopLogger()) + cw := New(namespace, svc, WithLogger(log.NewNopLogger())) counter := cw.NewCounter(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -83,7 +86,7 @@ func TestCounter(t *testing.T) { if err := teststat.TestCounter(counter, valuef); err != nil { t.Fatal("Fill and flush counter 2nd time: ", err) } - if err := testDimensions(svc, name, label, value); err != nil { + if err := svc.testDimensions(name, label, value); err != nil { t.Fatal(err) } } @@ -98,7 +101,10 @@ func TestCounterLowSendConcurrency(t *testing.T) { values = append(values, "value"+num) } svc := newMockCloudWatch() - cw := New(namespace, svc, 2, log.NewNopLogger()) + cw := New(namespace, svc, + WithLogger(log.NewNopLogger()), + WithConcurrentRequests(2), + ) counters := make(map[string]metrics.Counter) var wants []float64 @@ -116,7 +122,7 @@ func TestCounterLowSendConcurrency(t *testing.T) { if svc.valuesReceived[name] != wants[i] { t.Fatalf("want %f, have %f", wants[i], svc.valuesReceived[name]) } - if err := testDimensions(svc, name, labels[i], values[i]); err != nil { + if err := svc.testDimensions(name, labels[i], values[i]); err != nil { t.Fatal(err) } } @@ -126,7 +132,7 @@ func TestGauge(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, 10, log.NewNopLogger()) + cw := New(namespace, svc, WithLogger(log.NewNopLogger())) gauge := cw.NewGauge(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -140,7 +146,7 @@ func TestGauge(t *testing.T) { if err := teststat.TestGauge(gauge, valuef); err != nil { t.Fatal(err) } - if err := testDimensions(svc, name, label, value); err != nil { + if err := svc.testDimensions(name, label, value); err != nil { t.Fatal(err) } } @@ -149,8 +155,8 @@ func TestHistogram(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, 10, log.NewNopLogger()) - histogram := cw.NewHistogram(name, 50).With(label, value) + cw := New(namespace, svc, WithLogger(log.NewNopLogger())) + histogram := cw.NewHistogram(name).With(label, value) n50 := fmt.Sprintf("%s_50", name) n90 := fmt.Sprintf("%s_90", name) n95 := fmt.Sprintf("%s_95", name) @@ -171,16 +177,16 @@ func TestHistogram(t *testing.T) { if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { t.Fatal(err) } - if err := testDimensions(svc, n50, label, value); err != nil { + if err := svc.testDimensions(n50, label, value); err != nil { t.Fatal(err) } - if err := testDimensions(svc, n90, label, value); err != nil { + if err := svc.testDimensions(n90, label, value); err != nil { t.Fatal(err) } - if err := testDimensions(svc, n95, label, value); err != nil { + if err := svc.testDimensions(n95, label, value); err != nil { t.Fatal(err) } - if err := testDimensions(svc, n99, label, value); err != nil { + if err := svc.testDimensions(n99, label, value); err != nil { t.Fatal(err) } } From 1f238bf46e286175c2ee193d80fa751556552820 Mon Sep 17 00:00:00 2001 From: Eric Feliksik Date: Mon, 31 Jul 2017 16:56:15 +0200 Subject: [PATCH 3/4] Tolerate that there may not be any lables, if the teststat.FillCounter() did not add any samples. --- metrics/cloudwatch/cloudwatch_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index 2135a9119..8ed93ace3 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -41,6 +41,10 @@ func (mcw *mockCloudWatch) PutMetricData(input *cloudwatch.PutMetricDataInput) ( func (mcw *mockCloudWatch) testDimensions(name string, labelValues ...string) error { mcw.mtx.RLock() + _, hasValue := mcw.valuesReceived[name] + if !hasValue { + return nil // nothing to check; 0 samples were received + } dimensions, ok := mcw.dimensionsReceived[name] mcw.mtx.RUnlock() From 002c376f61b4a4c0c285dea212f0f008d3104702 Mon Sep 17 00:00:00 2001 From: Eric Feliksik Date: Tue, 1 Aug 2017 13:15:45 +0200 Subject: [PATCH 4/4] Use Cloudwatch options in the struct itself, which is cleaner --- metrics/cloudwatch/cloudwatch.go | 58 +++++++++++++++----------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 8336dbc80..61cc502f1 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -31,38 +31,34 @@ type Percentiles []struct { // // To regularly report metrics to CloudWatch, use the WriteLoop helper method. type CloudWatch struct { - mtx sync.RWMutex - sem chan struct{} - namespace string - svc cloudwatchiface.CloudWatchAPI - counters *lv.Space - gauges *lv.Space - histograms *lv.Space - *cwoptions -} - -type cwoptions struct { + mtx sync.RWMutex + sem chan struct{} + namespace string + svc cloudwatchiface.CloudWatchAPI + counters *lv.Space + gauges *lv.Space + histograms *lv.Space percentiles Percentiles logger log.Logger numConcurrentRequests int } -type option func(*cwoptions) +type option func(*CloudWatch) -func (s *cwoptions) apply(opt option) { +func (s *CloudWatch) apply(opt option) { if opt != nil { opt(s) } } func WithLogger(logger log.Logger) option { - return func(o *cwoptions) { - o.logger = logger + return func(c *CloudWatch) { + c.logger = logger } } func WithPercentiles(p Percentiles) option { - return func(o *cwoptions) { + return func(c *CloudWatch) { validated := Percentiles{} for _, entry := range p { if entry.f < 0 || entry.f > 1 { @@ -70,16 +66,16 @@ func WithPercentiles(p Percentiles) option { } validated = append(validated, entry) } - o.percentiles = validated + c.percentiles = validated } } func WithConcurrentRequests(n int) option { - return func(o *cwoptions) { + return func(c *CloudWatch) { if n > maxConcurrentRequests { n = maxConcurrentRequests } - o.numConcurrentRequests = n + c.numConcurrentRequests = n } } @@ -88,7 +84,13 @@ func WithConcurrentRequests(n int) option { // Callers must ensure that regular calls to Send are performed, either // manually or with one of the helper methods. func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch { - useOptions := &cwoptions{ + cw := &CloudWatch{ + sem: nil, // set below + namespace: namespace, + svc: svc, + counters: lv.NewSpace(), + gauges: lv.NewSpace(), + histograms: lv.NewSpace(), numConcurrentRequests: 10, logger: log.NewLogfmtLogger(os.Stderr), percentiles: Percentiles{ @@ -99,19 +101,13 @@ func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) }, } - for _, opt := range options { - useOptions.apply(opt) + for _, optFunc := range options { + optFunc(cw) } - return &CloudWatch{ - sem: make(chan struct{}, useOptions.numConcurrentRequests), - namespace: namespace, - svc: svc, - counters: lv.NewSpace(), - gauges: lv.NewSpace(), - histograms: lv.NewSpace(), - cwoptions: useOptions, - } + cw.sem = make(chan struct{}, cw.numConcurrentRequests) + + return cw } // NewCounter returns a counter. Observations are aggregated and emitted once