diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go new file mode 100644 index 000000000..728634bc1 --- /dev/null +++ b/metrics/cloudwatch/cloudwatch.go @@ -0,0 +1,205 @@ +package cloudwatch + +import ( + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/generic" +) + +// CloudWatch receives metrics observations and forwards them to CloudWatch. +// Create a CloudWatch object, use it to create metrics, and pass those metrics as +// dependencies to the components that will use them. +// +// To regularly report metrics to CloudWatch, use the WriteLoop helper method. +type CloudWatch struct { + mtx sync.RWMutex + namespace string + svc cloudwatchiface.CloudWatchAPI + counters map[string]*counter + gauges map[string]*gauge + histograms map[string]*histogram + logger log.Logger +} + +// New returns a CloudWatch object that may be used to create metrics. +// Namespace is applied to all created metrics and maps to the CloudWatch namespace. +// Callers must ensure that regular calls to Send are performed, either +// manually or with one of the helper methods. +func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch { + return &CloudWatch{ + namespace: namespace, + svc: svc, + counters: map[string]*counter{}, + gauges: map[string]*gauge{}, + histograms: map[string]*histogram{}, + logger: logger, + } +} + +// NewCounter returns a counter. Observations are aggregated and emitted once +// per write invocation. +func (cw *CloudWatch) NewCounter(name string) metrics.Counter { + cw.mtx.Lock() + defer cw.mtx.Unlock() + c := &counter{c: generic.NewCounter(name)} + cw.counters[name] = c + return c +} + +// NewGauge returns a gauge. Observations are aggregated and emitted once per +// write invocation. +func (cw *CloudWatch) NewGauge(name string) metrics.Gauge { + cw.mtx.Lock() + defer cw.mtx.Unlock() + g := &gauge{g: generic.NewGauge(name)} + cw.gauges[name] = g + return g +} + +// NewHistogram returns a histogram. Observations are aggregated and emitted as +// per-quantile gauges, once per write invocation. 50 is a good default value +// for buckets. +func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram { + cw.mtx.Lock() + defer cw.mtx.Unlock() + h := &histogram{h: generic.NewHistogram(name, buckets)} + cw.histograms[name] = h + return h +} + +// WriteLoop is a helper method that invokes Send every time the passed +// channel fires. This method blocks until the channel is closed, so clients +// probably want to run it in its own goroutine. For typical usage, create a +// time.Ticker and pass its C channel to this method. +func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { + for range c { + if err := cw.Send(); err != nil { + cw.logger.Log("during", "Send", "err", err) + } + } +} + +// Send will fire an API request to CloudWatch with the latest stats for +// all metrics. It is preferred that the WriteLoop method is used. +func (cw *CloudWatch) Send() error { + cw.mtx.RLock() + defer cw.mtx.RUnlock() + now := time.Now() + + var datums []*cloudwatch.MetricDatum + + for name, c := range cw.counters { + datums = append(datums, &cloudwatch.MetricDatum{ + MetricName: aws.String(name), + Dimensions: makeDimensions(c.c.LabelValues()...), + Value: aws.Float64(c.c.Value()), + Timestamp: aws.Time(now), + }) + } + + for name, g := range cw.gauges { + datums = append(datums, &cloudwatch.MetricDatum{ + MetricName: aws.String(name), + Dimensions: makeDimensions(g.g.LabelValues()...), + Value: aws.Float64(g.g.Value()), + Timestamp: aws.Time(now), + }) + } + + for name, h := range cw.histograms { + for _, p := range []struct { + s string + f float64 + }{ + {"50", 0.50}, + {"90", 0.90}, + {"95", 0.95}, + {"99", 0.99}, + } { + datums = append(datums, &cloudwatch.MetricDatum{ + MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)), + Dimensions: makeDimensions(h.h.LabelValues()...), + Value: aws.Float64(h.h.Quantile(p.f)), + Timestamp: aws.Time(now), + }) + } + } + + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.namespace), + MetricData: datums, + }) + return err +} + +// counter is a CloudWatch counter metric. +type counter struct { + c *generic.Counter +} + +// With implements counter +func (c *counter) With(labelValues ...string) metrics.Counter { + c.c = c.c.With(labelValues...).(*generic.Counter) + return c +} + +// Add implements counter. +func (c *counter) Add(delta float64) { + c.c.Add(delta) +} + +// gauge is a CloudWatch gauge metric. +type gauge struct { + g *generic.Gauge +} + +// With implements gauge +func (g *gauge) With(labelValues ...string) metrics.Gauge { + g.g = g.g.With(labelValues...).(*generic.Gauge) + return g +} + +// Set implements gauge +func (g *gauge) Set(value float64) { + g.g.Set(value) +} + +// Add implements gauge +func (g *gauge) Add(delta float64) { + g.g.Add(delta) +} + +// histogram is a CloudWatch histogram metric +type histogram struct { + h *generic.Histogram +} + +// With implements histogram +func (h *histogram) With(labelValues ...string) metrics.Histogram { + h.h = h.h.With(labelValues...).(*generic.Histogram) + return h +} + +// Observe implements histogram +func (h *histogram) Observe(value float64) { + h.h.Observe(value) +} + +func makeDimensions(labelValues ...string) []*cloudwatch.Dimension { + dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2) + for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 { + dimensions[j] = &cloudwatch.Dimension{ + Name: aws.String(labelValues[i]), + Value: aws.String(labelValues[i+1]), + } + } + return dimensions +} diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go new file mode 100644 index 000000000..c0cf1d0e9 --- /dev/null +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -0,0 +1,147 @@ +package cloudwatch + +import ( + "errors" + "fmt" + "sync" + "testing" + + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics/teststat" +) + +type mockCloudWatch struct { + cloudwatchiface.CloudWatchAPI + mtx sync.RWMutex + valuesReceived map[string]float64 + dimensionsReceived map[string][]*cloudwatch.Dimension +} + +func newMockCloudWatch() *mockCloudWatch { + return &mockCloudWatch{ + valuesReceived: map[string]float64{}, + dimensionsReceived: map[string][]*cloudwatch.Dimension{}, + } +} + +func (mcw *mockCloudWatch) PutMetricData(input *cloudwatch.PutMetricDataInput) (*cloudwatch.PutMetricDataOutput, error) { + mcw.mtx.Lock() + defer mcw.mtx.Unlock() + for _, datum := range input.MetricData { + mcw.valuesReceived[*datum.MetricName] = *datum.Value + mcw.dimensionsReceived[*datum.MetricName] = datum.Dimensions + } + return nil, nil +} + +func testDimensions(svc *mockCloudWatch, name string, labelValues ...string) error { + dimensions, ok := svc.dimensionsReceived[name] + if !ok { + if len(labelValues) > 0 { + return errors.New("Expected dimensions to be available, but none were") + } + } +LabelValues: + for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 { + name, value := labelValues[i], labelValues[i+1] + for _, dimension := range dimensions { + if *dimension.Name == name { + if *dimension.Value == value { + break LabelValues + } + } + } + return fmt.Errorf("Could not find dimension with name %s and value %s", name, value) + } + + return nil +} + +func TestCounter(t *testing.T) { + namespace, name := "abc", "def" + label, value := "label", "value" + svc := newMockCloudWatch() + cw := New(namespace, svc, log.NewNopLogger()) + counter := cw.NewCounter(name).With(label, value) + valuef := func() float64 { + err := cw.Send() + if err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + return svc.valuesReceived[name] + } + if err := teststat.TestCounter(counter, valuef); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, name, label, value); err != nil { + t.Fatal(err) + } +} + +func TestGauge(t *testing.T) { + namespace, name := "abc", "def" + label, value := "label", "value" + svc := newMockCloudWatch() + cw := New(namespace, svc, log.NewNopLogger()) + gauge := cw.NewGauge(name).With(label, value) + valuef := func() float64 { + err := cw.Send() + if err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + return svc.valuesReceived[name] + } + if err := teststat.TestGauge(gauge, valuef); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, name, label, value); err != nil { + t.Fatal(err) + } +} + +func TestHistogram(t *testing.T) { + namespace, name := "abc", "def" + label, value := "label", "value" + svc := newMockCloudWatch() + cw := New(namespace, svc, log.NewNopLogger()) + histogram := cw.NewHistogram(name, 50).With(label, value) + n50 := fmt.Sprintf("%s_50", name) + n90 := fmt.Sprintf("%s_90", name) + n95 := fmt.Sprintf("%s_95", name) + n99 := fmt.Sprintf("%s_99", name) + quantiles := func() (p50, p90, p95, p99 float64) { + err := cw.Send() + if err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + p50 = svc.valuesReceived[n50] + p90 = svc.valuesReceived[n90] + p95 = svc.valuesReceived[n95] + p99 = svc.valuesReceived[n99] + return + } + if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, n50, label, value); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, n90, label, value); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, n95, label, value); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, n99, label, value); err != nil { + t.Fatal(err) + } +} diff --git a/metrics/doc.go b/metrics/doc.go index 3bb7c1764..9be8e3017 100644 --- a/metrics/doc.go +++ b/metrics/doc.go @@ -92,5 +92,6 @@ // influx n custom custom custom // prometheus n native native native // pcp 1 native native native +// cloudwatch n batch push-aggregate batch push-aggregate synthetic, batch, push-aggregate // package metrics