From 5f24949a51550e5d18238948dbbccaa9c6cb4dd6 Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Wed, 8 Mar 2017 21:59:34 +1000 Subject: [PATCH 01/10] Start investigating cloudwatch metrics. --- metrics/cloudwatch/cloudwatch.go | 62 ++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 metrics/cloudwatch/cloudwatch.go diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go new file mode 100644 index 000000000..bf4723b30 --- /dev/null +++ b/metrics/cloudwatch/cloudwatch.go @@ -0,0 +1,62 @@ +package cloudwatch + +import ( + "sync" + + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/generic" +) + +// CloudWatch ... +type CloudWatch struct { + mtx sync.RWMutex + prefix string + svc *cloudwatch.CloudWatch + counters map[string]*Counter + //gauges map[string]*Gauge + //histograms map[string]*Histogram + logger log.Logger +} + +func New(prefix string, logger log.Logger, svc *cloudwatch.CloudWatch) *CloudWatch { + return &CloudWatch{ + prefix: prefix, + svc: svc, + counters: map[string]*Counter{}, + logger: logger, + } +} + +func (cw *CloudWatch) NewCounter(name string) *Counter { + c := NewCounter(cw.prefix, name) + cw.mtx.Lock() + cw.counters[cw.prefix+name] = c + cw.mtx.Unlock() + return c +} + +// Counter is a Graphite counter metric. +type Counter struct { + namespace string + c *generic.Counter +} + +// NewCounter returns a new usable counter metric. +func NewCounter(namespace, name string) *Counter { + return &Counter{ + namespace: namespace, + c: generic.NewCounter(name), + } +} + +// With is a no-op. +func (c *Counter) With(labelValues ...string) metrics.Counter { + return c.c.With(labelValues...) +} + +// Add implements counter. +func (c *Counter) Add(delta float64) { + c.c.Add(delta) +} From 3aad14801a280b668f23341215a50d60d173531a Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Thu, 9 Mar 2017 07:27:58 +1000 Subject: [PATCH 02/10] Use WriteLoop approach to reduce api overhead. --- metrics/cloudwatch/cloudwatch.go | 60 ++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index bf4723b30..e1106a404 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -3,6 +3,9 @@ package cloudwatch import ( "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" @@ -15,11 +18,10 @@ type CloudWatch struct { prefix string svc *cloudwatch.CloudWatch counters map[string]*Counter - //gauges map[string]*Gauge - //histograms map[string]*Histogram - logger log.Logger + logger log.Logger } +// New ... func New(prefix string, logger log.Logger, svc *cloudwatch.CloudWatch) *CloudWatch { return &CloudWatch{ prefix: prefix, @@ -30,24 +32,51 @@ func New(prefix string, logger log.Logger, svc *cloudwatch.CloudWatch) *CloudWat } func (cw *CloudWatch) NewCounter(name string) *Counter { - c := NewCounter(cw.prefix, name) + c := NewCounter(name) cw.mtx.Lock() cw.counters[cw.prefix+name] = c cw.mtx.Unlock() return c } -// Counter is a Graphite counter metric. +// WriteLoop is a helper method that invokes WriteTo to the passed writer every +// time the passed channel fires. This method blocks until the channel is +// closed, so clients probably want to run it in its own goroutine. For typical +// usage, create a time.Ticker and pass its C channel to this method. +func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { + for range c { + cw.mtx.RLock() + defer cw.mtx.RUnlock() + now := time.Now() + + for name, c := range cw.counters { + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.prefix), + MetricData: []*cloudwatch.MetricDatum{ + { + MetricName: aws.String(name), + Dimensions: makeDimensions(c.c.LabelValues()...), + Value: aws.Float64(c.c.Value()), + Timestamp: aws.Time(now), + }, + }, + }) + if err != nil { + cw.logger.Log("during", "WriteLoop", "err", err) + } + } + } +} + +// Counter is a CloudWatch counter metric. type Counter struct { - namespace string - c *generic.Counter + c *generic.Counter } // NewCounter returns a new usable counter metric. -func NewCounter(namespace, name string) *Counter { +func NewCounter(name string) *Counter { return &Counter{ - namespace: namespace, - c: generic.NewCounter(name), + c: generic.NewCounter(name), } } @@ -60,3 +89,14 @@ func (c *Counter) With(labelValues ...string) metrics.Counter { func (c *Counter) Add(delta float64) { c.c.Add(delta) } + +func makeDimensions(labelValues ...string) []*cloudwatch.Dimension { + dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2) + for i, j := 0, 0; i < len(labelValues); i, j := i+2, j+1 { + dimensions[j] = &cloudwatch.Dimension{ + Name: aws.String(labelValues[i]), + Value: aws.String(labelValues[i+1]), + } + } + return dimensions +} From 3a00cf80b4b4c11719241e510becb565b0a1110e Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Thu, 9 Mar 2017 07:54:29 +1000 Subject: [PATCH 03/10] Add gauge instrument. --- metrics/cloudwatch/cloudwatch.go | 53 ++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index e1106a404..91055e8b5 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -18,6 +18,7 @@ type CloudWatch struct { prefix string svc *cloudwatch.CloudWatch counters map[string]*Counter + gauges map[string]*Gauge logger log.Logger } @@ -27,6 +28,7 @@ func New(prefix string, logger log.Logger, svc *cloudwatch.CloudWatch) *CloudWat prefix: prefix, svc: svc, counters: map[string]*Counter{}, + gauges: map[string]*Gauge{}, logger: logger, } } @@ -39,6 +41,14 @@ func (cw *CloudWatch) NewCounter(name string) *Counter { return c } +func (cw *CloudWatch) NewGauge(name string) *Gauge { + c := NewGauge(name) + cw.mtx.Lock() + cw.counters[cw.prefix+name] = c + cw.mtx.Unlock() + return c +} + // WriteLoop is a helper method that invokes WriteTo to the passed writer every // time the passed channel fires. This method blocks until the channel is // closed, so clients probably want to run it in its own goroutine. For typical @@ -65,6 +75,23 @@ func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { cw.logger.Log("during", "WriteLoop", "err", err) } } + + for name, g := range cw.gauges { + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.prefix), + MetricData: []*cloudwatch.MetricDatum{ + { + MetricName: aws.String(name), + Dimensions: makeDimensions(g.g.LabelValues()...), + Value: aws.Float64(g.g.Value()), + Timestamp: aws.Time(now), + }, + }, + }) + if err != nil { + cw.logger.Log("during", "WriteLoop", "err", err) + } + } } } @@ -90,6 +117,32 @@ func (c *Counter) Add(delta float64) { c.c.Add(delta) } +// Gauge is a CloudWatch gauge metric. +type Gauge struct { + g *generic.Gauge +} + +// NewGauge returns a new usable gauge metric +func NewGauge(name string) *Gauge { + return &Gauge{ + g: generic.NewGauge(name), + } +} + +func (g *Gauge) With(labelValues ...string) metrics.Gauge { + return &Gauge{ + g: g.g.With(labelValues...), + } +} + +func (g *Gauge) Set(value float64) { + g.g.Set(value) +} + +func (g *Gauge) Add(delta float64) { + g.g.Add(delta) +} + func makeDimensions(labelValues ...string) []*cloudwatch.Dimension { dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2) for i, j := 0, 0; i < len(labelValues); i, j := i+2, j+1 { From f7bcf6afc08c47d9af595bcdd95da91b2311ff18 Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Thu, 9 Mar 2017 08:45:42 +1000 Subject: [PATCH 04/10] Add histogram instrument. Update WriteLoop to put all metrics at once. --- metrics/cloudwatch/cloudwatch.go | 128 +++++++++++++++++++++---------- 1 file changed, 86 insertions(+), 42 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 91055e8b5..8987dc1ad 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -5,6 +5,10 @@ import ( "time" + "strconv" + + "fmt" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/go-kit/kit/log" @@ -14,39 +18,49 @@ import ( // CloudWatch ... type CloudWatch struct { - mtx sync.RWMutex - prefix string - svc *cloudwatch.CloudWatch - counters map[string]*Counter - gauges map[string]*Gauge - logger log.Logger + mtx sync.RWMutex + namespace string + svc *cloudwatch.CloudWatch + counters map[string]*Counter + gauges map[string]*Gauge + histograms map[string]*Histogram + logger log.Logger } // New ... -func New(prefix string, logger log.Logger, svc *cloudwatch.CloudWatch) *CloudWatch { +func New(namespace string, logger log.Logger, svc *cloudwatch.CloudWatch) *CloudWatch { return &CloudWatch{ - prefix: prefix, - svc: svc, - counters: map[string]*Counter{}, - gauges: map[string]*Gauge{}, - logger: logger, + namespace: namespace, + svc: svc, + counters: map[string]*Counter{}, + gauges: map[string]*Gauge{}, + histograms: map[string]*Histogram{}, + logger: logger, } } func (cw *CloudWatch) NewCounter(name string) *Counter { c := NewCounter(name) cw.mtx.Lock() - cw.counters[cw.prefix+name] = c + cw.counters[name] = c cw.mtx.Unlock() return c } func (cw *CloudWatch) NewGauge(name string) *Gauge { - c := NewGauge(name) + g := NewGauge(name) cw.mtx.Lock() - cw.counters[cw.prefix+name] = c + cw.gauges[name] = g cw.mtx.Unlock() - return c + return g +} + +func (cw *CloudWatch) NewHistogram(name string, quantiles []float64, buckets int) *Histogram { + h := NewHistogram(name, quantiles, buckets) + cw.mtx.Lock() + cw.histograms[name] = h + cw.mtx.Unlock() + return h } // WriteLoop is a helper method that invokes WriteTo to the passed writer every @@ -59,39 +73,45 @@ func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { defer cw.mtx.RUnlock() now := time.Now() + datums := []*cloudwatch.MetricDatum{} + for name, c := range cw.counters { - _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ - Namespace: aws.String(cw.prefix), - MetricData: []*cloudwatch.MetricDatum{ - { - MetricName: aws.String(name), - Dimensions: makeDimensions(c.c.LabelValues()...), - Value: aws.Float64(c.c.Value()), - Timestamp: aws.Time(now), - }, - }, + datums = append(datums, &cloudwatch.MetricDatum{ + MetricName: aws.String(name), + Dimensions: makeDimensions(c.c.LabelValues()...), + Value: aws.Float64(c.c.Value()), + Timestamp: aws.Time(now), }) - if err != nil { - cw.logger.Log("during", "WriteLoop", "err", err) - } } for name, g := range cw.gauges { - _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ - Namespace: aws.String(cw.prefix), - MetricData: []*cloudwatch.MetricDatum{ - { - MetricName: aws.String(name), - Dimensions: makeDimensions(g.g.LabelValues()...), - Value: aws.Float64(g.g.Value()), - Timestamp: aws.Time(now), - }, - }, + datums = append(datums, &cloudwatch.MetricDatum{ + MetricName: aws.String(name), + Dimensions: makeDimensions(g.g.LabelValues()...), + Value: aws.Float64(g.g.Value()), + Timestamp: aws.Time(now), }) - if err != nil { - cw.logger.Log("during", "WriteLoop", "err", err) + } + + for name, h := range cw.histograms { + for _, quantile := range h.quantiles { + quantileStr := strconv.FormatFloat(quantile, 'f', 2, 64) + datums = append(datums, &cloudwatch.MetricDatum{ + MetricName: aws.String(fmt.Sprintf("%s_%s", name, quantileStr)), + Dimensions: makeDimensions(h.h.LabelValues()...), + Value: aws.Float64(h.h.Quantile(quantile)), + Timestamp: aws.Time(now), + }) } } + + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.namespace), + MetricData: datums, + }) + if err != nil { + cw.logger.Log("during", "WriteLoop", "err", err) + } } } @@ -131,7 +151,7 @@ func NewGauge(name string) *Gauge { func (g *Gauge) With(labelValues ...string) metrics.Gauge { return &Gauge{ - g: g.g.With(labelValues...), + g: g.g.With(labelValues...).(*generic.Gauge), } } @@ -143,6 +163,30 @@ func (g *Gauge) Add(delta float64) { g.g.Add(delta) } +// Histogram is a CloudWatch histogram metric +type Histogram struct { + quantiles []float64 + h *generic.Histogram +} + +// NewHistogram returns a new usable histogram metric +func NewHistogram(name string, quantiles []float64, buckets int) *Histogram { + return &Histogram{ + quantiles: quantiles, + h: generic.NewHistogram(name, buckets), + } +} + +func (h *Histogram) With(labelValues ...string) metrics.Histogram { + return &Histogram{ + h: h.h.With(labelValues...).(*generic.Histogram), + } +} + +func (h *Histogram) Observe(value float64) { + h.h.Observe(value) +} + func makeDimensions(labelValues ...string) []*cloudwatch.Dimension { dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2) for i, j := 0, 0; i < len(labelValues); i, j := i+2, j+1 { From 5616015c6949242c0adc5e2a27a48ebcd97f4c94 Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Thu, 9 Mar 2017 09:19:44 +1000 Subject: [PATCH 05/10] Add comments and use default quantiles. --- metrics/cloudwatch/cloudwatch.go | 125 +++++++++++++++++++------------ 1 file changed, 76 insertions(+), 49 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 8987dc1ad..e9a7a9e77 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -5,8 +5,6 @@ import ( "time" - "strconv" - "fmt" "github.com/aws/aws-sdk-go/aws" @@ -16,7 +14,11 @@ import ( "github.com/go-kit/kit/metrics/generic" ) -// CloudWatch ... +// CloudWatch receives metrics observations and forwards them to CloudWatch. +// Create a CloudWatch object, use it to create metrics, and pass those metrics as +// dependencies to the components that will use them. +// +// To regularly report metrics to CloudWatch, use the WriteLoop helper method. type CloudWatch struct { mtx sync.RWMutex namespace string @@ -27,7 +29,9 @@ type CloudWatch struct { logger log.Logger } -// New ... +// New returns a CloudWatch object that may be used to create metrics. Namespace is +// applied to all created metrics and maps to the CloudWatch namespace. +// Callers must ensure that regular calls to Send are performed, either manually or with one of the helper methods. func New(namespace string, logger log.Logger, svc *cloudwatch.CloudWatch) *CloudWatch { return &CloudWatch{ namespace: namespace, @@ -39,6 +43,8 @@ func New(namespace string, logger log.Logger, svc *cloudwatch.CloudWatch) *Cloud } } +// NewCounter returns a counter. Observations are aggregated and emitted once +// per write invocation. func (cw *CloudWatch) NewCounter(name string) *Counter { c := NewCounter(name) cw.mtx.Lock() @@ -47,6 +53,8 @@ func (cw *CloudWatch) NewCounter(name string) *Counter { return c } +// NewGauge returns a gauge. Observations are aggregated and emitted once per +// write invocation. func (cw *CloudWatch) NewGauge(name string) *Gauge { g := NewGauge(name) cw.mtx.Lock() @@ -55,64 +63,80 @@ func (cw *CloudWatch) NewGauge(name string) *Gauge { return g } -func (cw *CloudWatch) NewHistogram(name string, quantiles []float64, buckets int) *Histogram { - h := NewHistogram(name, quantiles, buckets) +// NewHistogram returns a histogram. Observations are aggregated and emitted as +// per-quantile gauges, once per write invocation. 50 is a good default value +// for buckets. +func (cw *CloudWatch) NewHistogram(name string, buckets int) *Histogram { + h := NewHistogram(name, buckets) cw.mtx.Lock() cw.histograms[name] = h cw.mtx.Unlock() return h } -// WriteLoop is a helper method that invokes WriteTo to the passed writer every +// WriteLoop is a helper method that invokes Send every // time the passed channel fires. This method blocks until the channel is // closed, so clients probably want to run it in its own goroutine. For typical // usage, create a time.Ticker and pass its C channel to this method. func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { for range c { - cw.mtx.RLock() - defer cw.mtx.RUnlock() - now := time.Now() + if err := cw.Send(); err != nil { + cw.logger.Log("during", "Send", "err", err) + } + } +} - datums := []*cloudwatch.MetricDatum{} +// Send will fire an api request to CloudWatch with the latest stats for +// all metrics. +func (cw *CloudWatch) Send() error { + cw.mtx.RLock() + defer cw.mtx.RUnlock() + now := time.Now() - for name, c := range cw.counters { - datums = append(datums, &cloudwatch.MetricDatum{ - MetricName: aws.String(name), - Dimensions: makeDimensions(c.c.LabelValues()...), - Value: aws.Float64(c.c.Value()), - Timestamp: aws.Time(now), - }) - } + datums := []*cloudwatch.MetricDatum{} + + for name, c := range cw.counters { + datums = append(datums, &cloudwatch.MetricDatum{ + MetricName: aws.String(name), + Dimensions: makeDimensions(c.c.LabelValues()...), + Value: aws.Float64(c.c.Value()), + Timestamp: aws.Time(now), + }) + } - for name, g := range cw.gauges { + for name, g := range cw.gauges { + datums = append(datums, &cloudwatch.MetricDatum{ + MetricName: aws.String(name), + Dimensions: makeDimensions(g.g.LabelValues()...), + Value: aws.Float64(g.g.Value()), + Timestamp: aws.Time(now), + }) + } + + for name, h := range cw.histograms { + for _, p := range []struct { + s string + f float64 + }{ + {"50", 0.50}, + {"90", 0.90}, + {"95", 0.95}, + {"99", 0.99}, + } { datums = append(datums, &cloudwatch.MetricDatum{ - MetricName: aws.String(name), - Dimensions: makeDimensions(g.g.LabelValues()...), - Value: aws.Float64(g.g.Value()), + MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)), + Dimensions: makeDimensions(h.h.LabelValues()...), + Value: aws.Float64(h.h.Quantile(p.f)), Timestamp: aws.Time(now), }) } - - for name, h := range cw.histograms { - for _, quantile := range h.quantiles { - quantileStr := strconv.FormatFloat(quantile, 'f', 2, 64) - datums = append(datums, &cloudwatch.MetricDatum{ - MetricName: aws.String(fmt.Sprintf("%s_%s", name, quantileStr)), - Dimensions: makeDimensions(h.h.LabelValues()...), - Value: aws.Float64(h.h.Quantile(quantile)), - Timestamp: aws.Time(now), - }) - } - } - - _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ - Namespace: aws.String(cw.namespace), - MetricData: datums, - }) - if err != nil { - cw.logger.Log("during", "WriteLoop", "err", err) - } } + + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.namespace), + MetricData: datums, + }) + return err } // Counter is a CloudWatch counter metric. @@ -127,7 +151,7 @@ func NewCounter(name string) *Counter { } } -// With is a no-op. +// With implements counter func (c *Counter) With(labelValues ...string) metrics.Counter { return c.c.With(labelValues...) } @@ -149,40 +173,43 @@ func NewGauge(name string) *Gauge { } } +// With implements gauge func (g *Gauge) With(labelValues ...string) metrics.Gauge { return &Gauge{ g: g.g.With(labelValues...).(*generic.Gauge), } } +// Set implements gauge func (g *Gauge) Set(value float64) { g.g.Set(value) } +// Add implements gauge func (g *Gauge) Add(delta float64) { g.g.Add(delta) } // Histogram is a CloudWatch histogram metric type Histogram struct { - quantiles []float64 - h *generic.Histogram + h *generic.Histogram } // NewHistogram returns a new usable histogram metric -func NewHistogram(name string, quantiles []float64, buckets int) *Histogram { +func NewHistogram(name string, buckets int) *Histogram { return &Histogram{ - quantiles: quantiles, - h: generic.NewHistogram(name, buckets), + h: generic.NewHistogram(name, buckets), } } +// With implements histogram func (h *Histogram) With(labelValues ...string) metrics.Histogram { return &Histogram{ h: h.h.With(labelValues...).(*generic.Histogram), } } +// Observe implements histogram func (h *Histogram) Observe(value float64) { h.h.Observe(value) } From 1223f9f1628da9273e72dbbfe214c539de78345b Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Thu, 9 Mar 2017 09:42:41 +1000 Subject: [PATCH 06/10] Fix build error. --- metrics/cloudwatch/cloudwatch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index e9a7a9e77..4a823eb3b 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -216,7 +216,7 @@ func (h *Histogram) Observe(value float64) { func makeDimensions(labelValues ...string) []*cloudwatch.Dimension { dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2) - for i, j := 0, 0; i < len(labelValues); i, j := i+2, j+1 { + for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 { dimensions[j] = &cloudwatch.Dimension{ Name: aws.String(labelValues[i]), Value: aws.String(labelValues[i+1]), From f99615ede4aed41ccdef49258034f2d65cf8484a Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Thu, 9 Mar 2017 14:13:17 +1000 Subject: [PATCH 07/10] Use cloudwatch interface and add tests. --- metrics/cloudwatch/cloudwatch.go | 18 ++-- metrics/cloudwatch/cloudwatch_test.go | 148 ++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 9 deletions(-) create mode 100644 metrics/cloudwatch/cloudwatch_test.go diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 4a823eb3b..b9baa7563 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -9,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/generic" @@ -22,7 +23,7 @@ import ( type CloudWatch struct { mtx sync.RWMutex namespace string - svc *cloudwatch.CloudWatch + svc cloudwatchiface.CloudWatchAPI counters map[string]*Counter gauges map[string]*Gauge histograms map[string]*Histogram @@ -32,7 +33,7 @@ type CloudWatch struct { // New returns a CloudWatch object that may be used to create metrics. Namespace is // applied to all created metrics and maps to the CloudWatch namespace. // Callers must ensure that regular calls to Send are performed, either manually or with one of the helper methods. -func New(namespace string, logger log.Logger, svc *cloudwatch.CloudWatch) *CloudWatch { +func New(namespace string, logger log.Logger, svc cloudwatchiface.CloudWatchAPI) *CloudWatch { return &CloudWatch{ namespace: namespace, svc: svc, @@ -153,7 +154,8 @@ func NewCounter(name string) *Counter { // With implements counter func (c *Counter) With(labelValues ...string) metrics.Counter { - return c.c.With(labelValues...) + c.c = c.c.With(labelValues...).(*generic.Counter) + return c } // Add implements counter. @@ -175,9 +177,8 @@ func NewGauge(name string) *Gauge { // With implements gauge func (g *Gauge) With(labelValues ...string) metrics.Gauge { - return &Gauge{ - g: g.g.With(labelValues...).(*generic.Gauge), - } + g.g = g.g.With(labelValues...).(*generic.Gauge) + return g } // Set implements gauge @@ -204,9 +205,8 @@ func NewHistogram(name string, buckets int) *Histogram { // With implements histogram func (h *Histogram) With(labelValues ...string) metrics.Histogram { - return &Histogram{ - h: h.h.With(labelValues...).(*generic.Histogram), - } + h.h = h.h.With(labelValues...).(*generic.Histogram) + return h } // Observe implements histogram diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go new file mode 100644 index 000000000..db68c3870 --- /dev/null +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -0,0 +1,148 @@ +package cloudwatch + +import ( + "errors" + "testing" + + "sync" + + "fmt" + + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics/teststat" +) + +type mockCloudWatch struct { + cloudwatchiface.CloudWatchAPI + mtx sync.RWMutex + valuesReceived map[string]float64 + dimensionsReceived map[string][]*cloudwatch.Dimension +} + +func newMockCloudWatch() *mockCloudWatch { + return &mockCloudWatch{ + valuesReceived: map[string]float64{}, + dimensionsReceived: map[string][]*cloudwatch.Dimension{}, + } +} + +func (mcw *mockCloudWatch) PutMetricData(input *cloudwatch.PutMetricDataInput) (*cloudwatch.PutMetricDataOutput, error) { + mcw.mtx.Lock() + defer mcw.mtx.Unlock() + for _, datum := range input.MetricData { + mcw.valuesReceived[*datum.MetricName] = *datum.Value + mcw.dimensionsReceived[*datum.MetricName] = datum.Dimensions + } + return nil, nil +} + +func testDimensions(svc *mockCloudWatch, name string, labelValues ...string) error { + dimensions, ok := svc.dimensionsReceived[name] + if !ok { + if len(labelValues) > 0 { + return errors.New("Expected dimensions to be available, but none were") + } + } +LabelValues: + for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 { + name, value := labelValues[i], labelValues[i+1] + for _, dimension := range dimensions { + if *dimension.Name == name { + if *dimension.Value == value { + break LabelValues + } + } + } + return fmt.Errorf("Could not find dimension with name %s and value %s", name, value) + } + + return nil +} + +func TestCounter(t *testing.T) { + namespace, name := "abc", "def" + label, value := "label", "value" + svc := newMockCloudWatch() + cw := New(namespace, log.NewNopLogger(), svc) + counter := cw.NewCounter(name).With(label, value) + valuef := func() float64 { + err := cw.Send() + if err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + return svc.valuesReceived[name] + } + if err := teststat.TestCounter(counter, valuef); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, name, label, value); err != nil { + t.Fatal(err) + } +} + +func TestGauge(t *testing.T) { + namespace, name := "abc", "def" + label, value := "label", "value" + svc := newMockCloudWatch() + cw := New(namespace, log.NewNopLogger(), svc) + gauge := cw.NewGauge(name).With(label, value) + valuef := func() float64 { + err := cw.Send() + if err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + return svc.valuesReceived[name] + } + if err := teststat.TestGauge(gauge, valuef); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, name, label, value); err != nil { + t.Fatal(err) + } +} + +func TestHistogram(t *testing.T) { + namespace, name := "abc", "def" + label, value := "label", "value" + svc := newMockCloudWatch() + cw := New(namespace, log.NewNopLogger(), svc) + histogram := cw.NewHistogram(name, 50).With(label, value) + n50 := fmt.Sprintf("%s_50", name) + n90 := fmt.Sprintf("%s_90", name) + n95 := fmt.Sprintf("%s_95", name) + n99 := fmt.Sprintf("%s_99", name) + quantiles := func() (p50, p90, p95, p99 float64) { + err := cw.Send() + if err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + p50 = svc.valuesReceived[n50] + p90 = svc.valuesReceived[n90] + p95 = svc.valuesReceived[n95] + p99 = svc.valuesReceived[n99] + return + } + if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, n50, label, value); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, n90, label, value); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, n95, label, value); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, n99, label, value); err != nil { + t.Fatal(err) + } +} From ead1f1c29ca42bfe0876f68823f837d3793cb7b0 Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Thu, 16 Mar 2017 19:43:53 +1000 Subject: [PATCH 08/10] Fixes from PR. --- metrics/cloudwatch/cloudwatch.go | 90 ++++++++++----------------- metrics/cloudwatch/cloudwatch_test.go | 13 ++-- 2 files changed, 40 insertions(+), 63 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index b9baa7563..ca0c2bd3d 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -1,15 +1,14 @@ package cloudwatch import ( + "fmt" "sync" - "time" - "fmt" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/generic" @@ -24,54 +23,54 @@ type CloudWatch struct { mtx sync.RWMutex namespace string svc cloudwatchiface.CloudWatchAPI - counters map[string]*Counter - gauges map[string]*Gauge - histograms map[string]*Histogram + counters map[string]*counter + gauges map[string]*gauge + histograms map[string]*histogram logger log.Logger } // New returns a CloudWatch object that may be used to create metrics. Namespace is // applied to all created metrics and maps to the CloudWatch namespace. // Callers must ensure that regular calls to Send are performed, either manually or with one of the helper methods. -func New(namespace string, logger log.Logger, svc cloudwatchiface.CloudWatchAPI) *CloudWatch { +func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch { return &CloudWatch{ namespace: namespace, svc: svc, - counters: map[string]*Counter{}, - gauges: map[string]*Gauge{}, - histograms: map[string]*Histogram{}, + counters: map[string]*counter{}, + gauges: map[string]*gauge{}, + histograms: map[string]*histogram{}, logger: logger, } } // NewCounter returns a counter. Observations are aggregated and emitted once // per write invocation. -func (cw *CloudWatch) NewCounter(name string) *Counter { - c := NewCounter(name) +func (cw *CloudWatch) NewCounter(name string) metrics.Counter { cw.mtx.Lock() + defer cw.mtx.Unlock() + c := &counter{c: generic.NewCounter(name)} cw.counters[name] = c - cw.mtx.Unlock() return c } // NewGauge returns a gauge. Observations are aggregated and emitted once per // write invocation. -func (cw *CloudWatch) NewGauge(name string) *Gauge { - g := NewGauge(name) +func (cw *CloudWatch) NewGauge(name string) metrics.Gauge { cw.mtx.Lock() + defer cw.mtx.Unlock() + g := &gauge{g: generic.NewGauge(name)} cw.gauges[name] = g - cw.mtx.Unlock() return g } // NewHistogram returns a histogram. Observations are aggregated and emitted as // per-quantile gauges, once per write invocation. 50 is a good default value // for buckets. -func (cw *CloudWatch) NewHistogram(name string, buckets int) *Histogram { - h := NewHistogram(name, buckets) +func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram { cw.mtx.Lock() + defer cw.mtx.Unlock() + h := &histogram{h: generic.NewHistogram(name, buckets)} cw.histograms[name] = h - cw.mtx.Unlock() return h } @@ -87,14 +86,14 @@ func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { } } -// Send will fire an api request to CloudWatch with the latest stats for -// all metrics. +// Send will fire an API request to CloudWatch with the latest stats for +// all metrics. It is preferred that the WriteLoop method is used. func (cw *CloudWatch) Send() error { cw.mtx.RLock() defer cw.mtx.RUnlock() now := time.Now() - datums := []*cloudwatch.MetricDatum{} + var datums []*cloudwatch.MetricDatum for name, c := range cw.counters { datums = append(datums, &cloudwatch.MetricDatum{ @@ -140,77 +139,56 @@ func (cw *CloudWatch) Send() error { return err } -// Counter is a CloudWatch counter metric. -type Counter struct { +// counter is a CloudWatch counter metric. +type counter struct { c *generic.Counter } -// NewCounter returns a new usable counter metric. -func NewCounter(name string) *Counter { - return &Counter{ - c: generic.NewCounter(name), - } -} - // With implements counter -func (c *Counter) With(labelValues ...string) metrics.Counter { +func (c *counter) With(labelValues ...string) metrics.Counter { c.c = c.c.With(labelValues...).(*generic.Counter) return c } // Add implements counter. -func (c *Counter) Add(delta float64) { +func (c *counter) Add(delta float64) { c.c.Add(delta) } -// Gauge is a CloudWatch gauge metric. -type Gauge struct { +// gauge is a CloudWatch gauge metric. +type gauge struct { g *generic.Gauge } -// NewGauge returns a new usable gauge metric -func NewGauge(name string) *Gauge { - return &Gauge{ - g: generic.NewGauge(name), - } -} - // With implements gauge -func (g *Gauge) With(labelValues ...string) metrics.Gauge { +func (g *gauge) With(labelValues ...string) metrics.Gauge { g.g = g.g.With(labelValues...).(*generic.Gauge) return g } // Set implements gauge -func (g *Gauge) Set(value float64) { +func (g *gauge) Set(value float64) { g.g.Set(value) } // Add implements gauge -func (g *Gauge) Add(delta float64) { +func (g *gauge) Add(delta float64) { g.g.Add(delta) } -// Histogram is a CloudWatch histogram metric -type Histogram struct { +// histogram is a CloudWatch histogram metric +type histogram struct { h *generic.Histogram } -// NewHistogram returns a new usable histogram metric -func NewHistogram(name string, buckets int) *Histogram { - return &Histogram{ - h: generic.NewHistogram(name, buckets), - } -} - // With implements histogram -func (h *Histogram) With(labelValues ...string) metrics.Histogram { +func (h *histogram) With(labelValues ...string) metrics.Histogram { h.h = h.h.With(labelValues...).(*generic.Histogram) return h } // Observe implements histogram -func (h *Histogram) Observe(value float64) { +func (h *histogram) Observe(value float64) { h.h.Observe(value) } diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index db68c3870..c0cf1d0e9 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -2,14 +2,13 @@ package cloudwatch import ( "errors" - "testing" - - "sync" - "fmt" + "sync" + "testing" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics/teststat" ) @@ -65,7 +64,7 @@ func TestCounter(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, log.NewNopLogger(), svc) + cw := New(namespace, svc, log.NewNopLogger()) counter := cw.NewCounter(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -88,7 +87,7 @@ func TestGauge(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, log.NewNopLogger(), svc) + cw := New(namespace, svc, log.NewNopLogger()) gauge := cw.NewGauge(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -111,7 +110,7 @@ func TestHistogram(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, log.NewNopLogger(), svc) + cw := New(namespace, svc, log.NewNopLogger()) histogram := cw.NewHistogram(name, 50).With(label, value) n50 := fmt.Sprintf("%s_50", name) n90 := fmt.Sprintf("%s_90", name) From 0933e61d202773bfbb96c7008faf7c7469e6e57c Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Thu, 16 Mar 2017 19:57:04 +1000 Subject: [PATCH 09/10] Fix comment wrapping. --- metrics/cloudwatch/cloudwatch.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index ca0c2bd3d..728634bc1 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -29,9 +29,10 @@ type CloudWatch struct { logger log.Logger } -// New returns a CloudWatch object that may be used to create metrics. Namespace is -// applied to all created metrics and maps to the CloudWatch namespace. -// Callers must ensure that regular calls to Send are performed, either manually or with one of the helper methods. +// New returns a CloudWatch object that may be used to create metrics. +// Namespace is applied to all created metrics and maps to the CloudWatch namespace. +// Callers must ensure that regular calls to Send are performed, either +// manually or with one of the helper methods. func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch { return &CloudWatch{ namespace: namespace, @@ -74,10 +75,10 @@ func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram { return h } -// WriteLoop is a helper method that invokes Send every -// time the passed channel fires. This method blocks until the channel is -// closed, so clients probably want to run it in its own goroutine. For typical -// usage, create a time.Ticker and pass its C channel to this method. +// WriteLoop is a helper method that invokes Send every time the passed +// channel fires. This method blocks until the channel is closed, so clients +// probably want to run it in its own goroutine. For typical usage, create a +// time.Ticker and pass its C channel to this method. func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { for range c { if err := cw.Send(); err != nil { From bb839ba5fc5722552b0e402eccba0cea6e3b095c Mon Sep 17 00:00:00 2001 From: Cameron Stitt Date: Mon, 20 Mar 2017 14:37:56 +1000 Subject: [PATCH 10/10] Update metrics doc table for cloudwatch. --- metrics/doc.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metrics/doc.go b/metrics/doc.go index 3bb7c1764..9be8e3017 100644 --- a/metrics/doc.go +++ b/metrics/doc.go @@ -92,5 +92,6 @@ // influx n custom custom custom // prometheus n native native native // pcp 1 native native native +// cloudwatch n batch push-aggregate batch push-aggregate synthetic, batch, push-aggregate // package metrics