diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 728634bc1..e267e0302 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -14,28 +14,42 @@ import ( "github.com/go-kit/kit/metrics/generic" ) +const ( + maxConcurrentRequests = 20 +) + // CloudWatch receives metrics observations and forwards them to CloudWatch. // Create a CloudWatch object, use it to create metrics, and pass those metrics as // dependencies to the components that will use them. // // To regularly report metrics to CloudWatch, use the WriteLoop helper method. type CloudWatch struct { - mtx sync.RWMutex - namespace string - svc cloudwatchiface.CloudWatchAPI - counters map[string]*counter - gauges map[string]*gauge - histograms map[string]*histogram - logger log.Logger + mtx sync.RWMutex + sem chan struct{} + namespace string + numConcurrentRequests int + svc cloudwatchiface.CloudWatchAPI + counters map[string]*counter + gauges map[string]*gauge + histograms map[string]*histogram + logger log.Logger } // New returns a CloudWatch object that may be used to create metrics. // Namespace is applied to all created metrics and maps to the CloudWatch namespace. +// NumConcurrent sets the number of simultaneous requests to Amazon. +// A good default value is 10 and the maximum is 20. // Callers must ensure that regular calls to Send are performed, either // manually or with one of the helper methods. -func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch { +func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch { + if numConcurrent > maxConcurrentRequests { + numConcurrent = maxConcurrentRequests + } + return &CloudWatch{ - namespace: namespace, + sem: make(chan struct{}, numConcurrent), + namespace: namespace, + numConcurrentRequests: numConcurrent, svc: svc, counters: map[string]*counter{}, gauges: map[string]*gauge{}, @@ -133,11 +147,43 @@ func (cw *CloudWatch) Send() error { } } - _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ - Namespace: aws.String(cw.namespace), - MetricData: datums, - }) - return err + var batches [][]*cloudwatch.MetricDatum + for len(datums) > 0 { + var batch []*cloudwatch.MetricDatum + lim := min(len(datums), maxConcurrentRequests) + batch, datums = datums[:lim], datums[lim:] + batches = append(batches, batch) + } + + var errors = make(chan error, len(batches)) + for _, batch := range batches { + go func(batch []*cloudwatch.MetricDatum) { + cw.sem <- struct{}{} + defer func() { + <-cw.sem + }() + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.namespace), + MetricData: batch, + }) + errors <- err + }(batch) + } + var firstErr error + for i := 0; i < cap(errors); i++ { + if err := <-errors; err != nil && firstErr != nil { + firstErr = err + } + } + + return firstErr +} + +func min(a, b int) int { + if a < b { + return a + } + return b } // counter is a CloudWatch counter metric. diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index c0cf1d0e9..c5f11db64 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -3,6 +3,7 @@ package cloudwatch import ( "errors" "fmt" + "strconv" "sync" "testing" @@ -10,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/teststat" ) @@ -64,7 +66,7 @@ func TestCounter(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) counter := cw.NewCounter(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -83,11 +85,45 @@ func TestCounter(t *testing.T) { } } +func TestCounterLowSendConcurrency(t *testing.T) { + namespace := "abc" + var names, labels, values []string + for i := 1; i <= 45; i++ { + num := strconv.Itoa(i) + names = append(names, "name"+num) + labels = append(labels, "label"+num) + values = append(values, "value"+num) + } + svc := newMockCloudWatch() + cw := New(namespace, svc, 2, log.NewNopLogger()) + + counters := make(map[string]metrics.Counter) + var wants []float64 + for i, name := range names { + counters[name] = cw.NewCounter(name).With(labels[i], values[i]) + wants = append(wants, teststat.FillCounter(counters[name])) + } + + err := cw.Send() + if err != nil { + t.Fatal(err) + } + + for i, name := range names { + if svc.valuesReceived[name] != wants[i] { + t.Fatal("want %f, have %f", wants[i], svc.valuesReceived[name]) + } + if err := testDimensions(svc, name, labels[i], values[i]); err != nil { + t.Fatal(err) + } + } +} + func TestGauge(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) gauge := cw.NewGauge(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -110,7 +146,7 @@ func TestHistogram(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) histogram := cw.NewHistogram(name, 50).With(label, value) n50 := fmt.Sprintf("%s_50", name) n90 := fmt.Sprintf("%s_90", name) diff --git a/metrics/teststat/teststat.go b/metrics/teststat/teststat.go index 5a1885a66..74019f5b2 100644 --- a/metrics/teststat/teststat.go +++ b/metrics/teststat/teststat.go @@ -14,6 +14,16 @@ import ( // TestCounter puts some deltas through the counter, and then calls the value // func to check that the counter has the correct final value. func TestCounter(counter metrics.Counter, value func() float64) error { + want := FillCounter(counter) + if have := value(); want != have { + return fmt.Errorf("want %f, have %f", want, have) + } + + return nil +} + +// FillCounter puts some deltas through the counter and returns the total value. +func FillCounter(counter metrics.Counter) float64 { a := rand.Perm(100) n := rand.Intn(len(a)) @@ -23,12 +33,7 @@ func TestCounter(counter metrics.Counter, value func() float64) error { counter.Add(f) want += f } - - if have := value(); want != have { - return fmt.Errorf("want %f, have %f", want, have) - } - - return nil + return want } // TestGauge puts some values through the gauge, and then calls the value func