Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 178 additions & 93 deletions metrics/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cloudwatch

import (
"fmt"
"os"
"sync"
"time"

Expand All @@ -12,12 +13,18 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
)

const (
maxConcurrentRequests = 20
)

type Percentiles []struct {
s string
f float64
}

// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
Expand All @@ -27,66 +34,106 @@ type CloudWatch struct {
mtx sync.RWMutex
sem chan struct{}
namespace string
numConcurrentRequests int
svc cloudwatchiface.CloudWatchAPI
counters map[string]*counter
gauges map[string]*gauge
histograms map[string]*histogram
counters *lv.Space
gauges *lv.Space
histograms *lv.Space
percentiles Percentiles
logger log.Logger
numConcurrentRequests int
}

type option func(*CloudWatch)

func (s *CloudWatch) apply(opt option) {
if opt != nil {
opt(s)
}
}

func WithLogger(logger log.Logger) option {
return func(c *CloudWatch) {
c.logger = logger
}
}

func WithPercentiles(p Percentiles) option {
return func(c *CloudWatch) {
validated := Percentiles{}
for _, entry := range p {
if entry.f < 0 || entry.f > 1 {
continue // illegal entry
}
validated = append(validated, entry)
}
c.percentiles = validated
}
}

func WithConcurrentRequests(n int) option {
return func(c *CloudWatch) {
if n > maxConcurrentRequests {
n = maxConcurrentRequests
}
c.numConcurrentRequests = n
}
}

// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// NumConcurrent sets the number of simultaneous requests to Amazon.
// A good default value is 10 and the maximum is 20.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch {
if numConcurrent > maxConcurrentRequests {
numConcurrent = maxConcurrentRequests
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
cw := &CloudWatch{
sem: nil, // set below
namespace: namespace,
svc: svc,
counters: lv.NewSpace(),
gauges: lv.NewSpace(),
histograms: lv.NewSpace(),
numConcurrentRequests: 10,
logger: log.NewLogfmtLogger(os.Stderr),
percentiles: Percentiles{
{"50", 0.50},
{"90", 0.90},
{"95", 0.95},
{"99", 0.99},
},
}

return &CloudWatch{
sem: make(chan struct{}, numConcurrent),
namespace: namespace,
numConcurrentRequests: numConcurrent,
svc: svc,
counters: map[string]*counter{},
gauges: map[string]*gauge{},
histograms: map[string]*histogram{},
logger: logger,
for _, optFunc := range options {
optFunc(cw)
}

cw.sem = make(chan struct{}, cw.numConcurrentRequests)

return cw
}

// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
cw.mtx.Lock()
defer cw.mtx.Unlock()
c := &counter{c: generic.NewCounter(name)}
cw.counters[name] = c
return c
return &Counter{
name: name,
obs: cw.counters.Observe,
}
}

// NewGauge returns a gauge. Observations are aggregated and emitted once per
// write invocation.
// NewGauge returns an gauge.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
cw.mtx.Lock()
defer cw.mtx.Unlock()
g := &gauge{g: generic.NewGauge(name)}
cw.gauges[name] = g
return g
return &Gauge{
name: name,
obs: cw.gauges.Observe,
add: cw.gauges.Add,
}
}

// NewHistogram returns a histogram. Observations are aggregated and emitted as
// per-quantile gauges, once per write invocation. 50 is a good default value
// for buckets.
func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram {
cw.mtx.Lock()
defer cw.mtx.Unlock()
h := &histogram{h: generic.NewHistogram(name, buckets)}
cw.histograms[name] = h
return h
// NewHistogram returns a histogram.
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
return &Histogram{
name: name,
obs: cw.histograms.Observe,
}
}

// WriteLoop is a helper method that invokes Send every time the passed
Expand All @@ -110,42 +157,46 @@ func (cw *CloudWatch) Send() error {

var datums []*cloudwatch.MetricDatum

for name, c := range cw.counters {
cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
value := sum(values)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(c.c.LabelValues()...),
Value: aws.Float64(c.c.Value()),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}
return true
})

for name, g := range cw.gauges {
cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
value := last(values)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(g.g.LabelValues()...),
Value: aws.Float64(g.g.Value()),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}
return true
})

for name, h := range cw.histograms {
for _, p := range []struct {
s string
f float64
}{
{"50", 0.50},
{"90", 0.90},
{"95", 0.95},
{"99", 0.99},
} {
cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
histogram := generic.NewHistogram(name, 50)

for _, v := range values {
histogram.Observe(v)
}

for _, p := range cw.percentiles {
value := histogram.Quantile(p.f)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)),
Dimensions: makeDimensions(h.h.LabelValues()...),
Value: aws.Float64(h.h.Quantile(p.f)),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}
}
return true
})

var batches [][]*cloudwatch.MetricDatum
for len(datums) > 0 {
Expand Down Expand Up @@ -179,64 +230,98 @@ func (cw *CloudWatch) Send() error {
return firstErr
}

func sum(a []float64) float64 {
var v float64
for _, f := range a {
v += f
}
return v
}

func last(a []float64) float64 {
return a[len(a)-1]
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

// counter is a CloudWatch counter metric.
type counter struct {
c *generic.Counter
type observeFunc func(name string, lvs lv.LabelValues, value float64)

// Counter is a counter. Observations are forwarded to a node
// object, and aggregated (summed) per timeseries.
type Counter struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements counter
func (c *counter) With(labelValues ...string) metrics.Counter {
c.c = c.c.With(labelValues...).(*generic.Counter)
return c
// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
name: c.name,
lvs: c.lvs.With(labelValues...),
obs: c.obs,
}
}

// Add implements counter.
func (c *counter) Add(delta float64) {
c.c.Add(delta)
// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.obs(c.name, c.lvs, delta)
}

// gauge is a CloudWatch gauge metric.
type gauge struct {
g *generic.Gauge
// Gauge is a gauge. Observations are forwarded to a node
// object, and aggregated (the last observation selected) per timeseries.
type Gauge struct {
name string
lvs lv.LabelValues
obs observeFunc
add observeFunc
}

// With implements gauge
func (g *gauge) With(labelValues ...string) metrics.Gauge {
g.g = g.g.With(labelValues...).(*generic.Gauge)
return g
// With implements metrics.Gauge.
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
return &Gauge{
name: g.name,
lvs: g.lvs.With(labelValues...),
obs: g.obs,
add: g.add,
}
}

// Set implements gauge
func (g *gauge) Set(value float64) {
g.g.Set(value)
// Set implements metrics.Gauge.
func (g *Gauge) Set(value float64) {
g.obs(g.name, g.lvs, value)
}

// Add implements gauge
func (g *gauge) Add(delta float64) {
g.g.Add(delta)
// Add implements metrics.Gauge.
func (g *Gauge) Add(delta float64) {
g.add(g.name, g.lvs, delta)
}

// histogram is a CloudWatch histogram metric
type histogram struct {
h *generic.Histogram
// Histogram is an Influx histrogram. Observations are aggregated into a
// generic.Histogram and emitted as per-quantile gauges to the Influx server.
type Histogram struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements histogram
func (h *histogram) With(labelValues ...string) metrics.Histogram {
h.h = h.h.With(labelValues...).(*generic.Histogram)
return h
// With implements metrics.Histogram.
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
return &Histogram{
name: h.name,
lvs: h.lvs.With(labelValues...),
obs: h.obs,
}
}

// Observe implements histogram
func (h *histogram) Observe(value float64) {
h.h.Observe(value)
// Observe implements metrics.Histogram.
func (h *Histogram) Observe(value float64) {
h.obs(h.name, h.lvs, value)
}

func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
Expand Down
Loading