Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions metrics/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package cloudwatch

import (
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
)

// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
mtx sync.RWMutex
namespace string
svc cloudwatchiface.CloudWatchAPI
counters map[string]*counter
gauges map[string]*gauge
histograms map[string]*histogram
logger log.Logger
}

// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch {
return &CloudWatch{
namespace: namespace,
svc: svc,
counters: map[string]*counter{},
gauges: map[string]*gauge{},
histograms: map[string]*histogram{},
logger: logger,
}
}

// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
cw.mtx.Lock()
defer cw.mtx.Unlock()
c := &counter{c: generic.NewCounter(name)}
cw.counters[name] = c
return c
}

// NewGauge returns a gauge. Observations are aggregated and emitted once per
// write invocation.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
cw.mtx.Lock()
defer cw.mtx.Unlock()
g := &gauge{g: generic.NewGauge(name)}
cw.gauges[name] = g
return g
}

// NewHistogram returns a histogram. Observations are aggregated and emitted as
// per-quantile gauges, once per write invocation. 50 is a good default value
// for buckets.
func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram {
cw.mtx.Lock()
defer cw.mtx.Unlock()
h := &histogram{h: generic.NewHistogram(name, buckets)}
cw.histograms[name] = h
return h
}

// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
}
}

// Send will fire an API request to CloudWatch with the latest stats for
// all metrics. It is preferred that the WriteLoop method is used.
func (cw *CloudWatch) Send() error {
cw.mtx.RLock()
defer cw.mtx.RUnlock()
now := time.Now()

var datums []*cloudwatch.MetricDatum

for name, c := range cw.counters {
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(c.c.LabelValues()...),
Value: aws.Float64(c.c.Value()),
Timestamp: aws.Time(now),
})
}

for name, g := range cw.gauges {
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(g.g.LabelValues()...),
Value: aws.Float64(g.g.Value()),
Timestamp: aws.Time(now),
})
}

for name, h := range cw.histograms {
for _, p := range []struct {
s string
f float64
}{
{"50", 0.50},
{"90", 0.90},
{"95", 0.95},
{"99", 0.99},
} {
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)),
Dimensions: makeDimensions(h.h.LabelValues()...),
Value: aws.Float64(h.h.Quantile(p.f)),
Timestamp: aws.Time(now),
})
}
}

_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.namespace),
MetricData: datums,
})
return err
}

// counter is a CloudWatch counter metric.
type counter struct {
c *generic.Counter
}

// With implements counter
func (c *counter) With(labelValues ...string) metrics.Counter {
c.c = c.c.With(labelValues...).(*generic.Counter)
return c
}

// Add implements counter.
func (c *counter) Add(delta float64) {
c.c.Add(delta)
}

// gauge is a CloudWatch gauge metric.
type gauge struct {
g *generic.Gauge
}

// With implements gauge
func (g *gauge) With(labelValues ...string) metrics.Gauge {
g.g = g.g.With(labelValues...).(*generic.Gauge)
return g
}

// Set implements gauge
func (g *gauge) Set(value float64) {
g.g.Set(value)
}

// Add implements gauge
func (g *gauge) Add(delta float64) {
g.g.Add(delta)
}

// histogram is a CloudWatch histogram metric
type histogram struct {
h *generic.Histogram
}

// With implements histogram
func (h *histogram) With(labelValues ...string) metrics.Histogram {
h.h = h.h.With(labelValues...).(*generic.Histogram)
return h
}

// Observe implements histogram
func (h *histogram) Observe(value float64) {
h.h.Observe(value)
}

func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2)
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
dimensions[j] = &cloudwatch.Dimension{
Name: aws.String(labelValues[i]),
Value: aws.String(labelValues[i+1]),
}
}
return dimensions
}
147 changes: 147 additions & 0 deletions metrics/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package cloudwatch

import (
"errors"
"fmt"
"sync"
"testing"

"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/teststat"
)

type mockCloudWatch struct {
cloudwatchiface.CloudWatchAPI
mtx sync.RWMutex
valuesReceived map[string]float64
dimensionsReceived map[string][]*cloudwatch.Dimension
}

func newMockCloudWatch() *mockCloudWatch {
return &mockCloudWatch{
valuesReceived: map[string]float64{},
dimensionsReceived: map[string][]*cloudwatch.Dimension{},
}
}

func (mcw *mockCloudWatch) PutMetricData(input *cloudwatch.PutMetricDataInput) (*cloudwatch.PutMetricDataOutput, error) {
mcw.mtx.Lock()
defer mcw.mtx.Unlock()
for _, datum := range input.MetricData {
mcw.valuesReceived[*datum.MetricName] = *datum.Value
mcw.dimensionsReceived[*datum.MetricName] = datum.Dimensions
}
return nil, nil
}

func testDimensions(svc *mockCloudWatch, name string, labelValues ...string) error {
dimensions, ok := svc.dimensionsReceived[name]
if !ok {
if len(labelValues) > 0 {
return errors.New("Expected dimensions to be available, but none were")
}
}
LabelValues:
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
name, value := labelValues[i], labelValues[i+1]
for _, dimension := range dimensions {
if *dimension.Name == name {
if *dimension.Value == value {
break LabelValues
}
}
}
return fmt.Errorf("Could not find dimension with name %s and value %s", name, value)
}

return nil
}

func TestCounter(t *testing.T) {
namespace, name := "abc", "def"
label, value := "label", "value"
svc := newMockCloudWatch()
cw := New(namespace, svc, log.NewNopLogger())
counter := cw.NewCounter(name).With(label, value)
valuef := func() float64 {
err := cw.Send()
if err != nil {
t.Fatal(err)
}
svc.mtx.RLock()
defer svc.mtx.RUnlock()
return svc.valuesReceived[name]
}
if err := teststat.TestCounter(counter, valuef); err != nil {
t.Fatal(err)
}
if err := testDimensions(svc, name, label, value); err != nil {
t.Fatal(err)
}
}

func TestGauge(t *testing.T) {
namespace, name := "abc", "def"
label, value := "label", "value"
svc := newMockCloudWatch()
cw := New(namespace, svc, log.NewNopLogger())
gauge := cw.NewGauge(name).With(label, value)
valuef := func() float64 {
err := cw.Send()
if err != nil {
t.Fatal(err)
}
svc.mtx.RLock()
defer svc.mtx.RUnlock()
return svc.valuesReceived[name]
}
if err := teststat.TestGauge(gauge, valuef); err != nil {
t.Fatal(err)
}
if err := testDimensions(svc, name, label, value); err != nil {
t.Fatal(err)
}
}

func TestHistogram(t *testing.T) {
namespace, name := "abc", "def"
label, value := "label", "value"
svc := newMockCloudWatch()
cw := New(namespace, svc, log.NewNopLogger())
histogram := cw.NewHistogram(name, 50).With(label, value)
n50 := fmt.Sprintf("%s_50", name)
n90 := fmt.Sprintf("%s_90", name)
n95 := fmt.Sprintf("%s_95", name)
n99 := fmt.Sprintf("%s_99", name)
quantiles := func() (p50, p90, p95, p99 float64) {
err := cw.Send()
if err != nil {
t.Fatal(err)
}
svc.mtx.RLock()
defer svc.mtx.RUnlock()
p50 = svc.valuesReceived[n50]
p90 = svc.valuesReceived[n90]
p95 = svc.valuesReceived[n95]
p99 = svc.valuesReceived[n99]
return
}
if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
t.Fatal(err)
}
if err := testDimensions(svc, n50, label, value); err != nil {
t.Fatal(err)
}
if err := testDimensions(svc, n90, label, value); err != nil {
t.Fatal(err)
}
if err := testDimensions(svc, n95, label, value); err != nil {
t.Fatal(err)
}
if err := testDimensions(svc, n99, label, value); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions metrics/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,6 @@
// influx n custom custom custom
// prometheus n native native native
// pcp 1 native native native
// cloudwatch n batch push-aggregate batch push-aggregate synthetic, batch, push-aggregate
//
package metrics