From b1c29e97e7dd90896e32390336ef63e0fb842513 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 13 Apr 2018 13:50:27 +0000 Subject: [PATCH 1/5] Make backoff parameters configurable --- pkg/chunk/aws/dynamodb_table_client.go | 4 +++- pkg/chunk/aws/fixtures.go | 10 +++++++++- pkg/chunk/aws/storage_client.go | 18 +++++++----------- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/chunk/aws/dynamodb_table_client.go b/pkg/chunk/aws/dynamodb_table_client.go index b134965edb8..be443040ccf 100644 --- a/pkg/chunk/aws/dynamodb_table_client.go +++ b/pkg/chunk/aws/dynamodb_table_client.go @@ -41,6 +41,7 @@ type dynamoTableClient struct { DynamoDB dynamodbiface.DynamoDBAPI ApplicationAutoScaling applicationautoscalingiface.ApplicationAutoScalingAPI limiter *rate.Limiter + backoffConfig util.BackoffConfig } // NewDynamoDBTableClient makes a new DynamoTableClient. @@ -63,6 +64,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { DynamoDB: dynamoDB, ApplicationAutoScaling: applicationAutoScaling, limiter: rate.NewLimiter(rate.Limit(cfg.APILimit), 1), + backoffConfig: cfg.backoffConfig, }, nil } @@ -71,7 +73,7 @@ func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context. d.limiter.Wait(ctx) } - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(ctx, d.backoffConfig) for backoff.Ongoing() { if err := fn(ctx); err != nil { if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "ThrottlingException" { diff --git a/pkg/chunk/aws/fixtures.go b/pkg/chunk/aws/fixtures.go index 7665b6d86aa..d38be932822 100644 --- a/pkg/chunk/aws/fixtures.go +++ b/pkg/chunk/aws/fixtures.go @@ -70,7 +70,15 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) chunk.Fixture } storage := &storageClient{ cfg: StorageConfig{ - DynamoDBConfig: DynamoDBConfig{ChunkGangSize: gangsize, ChunkGetMaxParallelism: maxParallelism}, + DynamoDBConfig: DynamoDBConfig{ + ChunkGangSize: gangsize, + ChunkGetMaxParallelism: maxParallelism, + backoffConfig: util.BackoffConfig{ + MinBackoff: 1 * time.Millisecond, + MaxBackoff: 5 * time.Millisecond, + MaxRetries: 20, + }, + }, }, DynamoDB: dynamoDB, S3: newMockS3(), diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index b633104e5f4..8ff6f0407e4 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -46,14 +46,6 @@ const ( dynamoDBMaxReadBatchSize = 100 ) -var backoffConfig = util.BackoffConfig{ - // Backoff for dynamoDB requests, to match AWS lib - see: - // https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go - MinBackoff: 100 * time.Millisecond, - MaxBackoff: 50 * time.Second, - MaxRetries: 20, -} - var ( dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -112,6 +104,7 @@ type DynamoDBConfig struct { ApplicationAutoScaling util.URLValue ChunkGangSize int ChunkGetMaxParallelism int + backoffConfig util.BackoffConfig } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -122,6 +115,9 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.") f.IntVar(&cfg.ChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)") f.IntVar(&cfg.ChunkGetMaxParallelism, "dynamodb.chunk.get.max.parallelism", 32, "Max number of chunk-get operations to start in parallel") + f.DurationVar(&cfg.backoffConfig.MinBackoff, "dynamodb.min-backoff", 100*time.Millisecond, "Minimum backoff time") + f.DurationVar(&cfg.backoffConfig.MaxBackoff, "dynamodb.max-backoff", 50*time.Second, "Maximum backoff time") + f.IntVar(&cfg.backoffConfig.MaxRetries, "dynamodb.max-retries", 20, "Maximum number of times to retry an operation") } // StorageConfig specifies config for storing data on AWS. @@ -213,7 +209,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e outstanding := input.(dynamoDBWriteBatch) unprocessed := dynamoDBWriteBatch{} - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.NumRetries())) }() @@ -346,7 +342,7 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c } func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) { - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) }() @@ -592,7 +588,7 @@ func (a storageClient) getDynamoDBChunks(ctx context.Context, chunks []chunk.Chu result := []chunk.Chunk{} unprocessed := dynamoDBReadRequest{} - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.NumRetries())) }() From dbd4d789dcb3eff7390b2f589fbe323d0bde0e17 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 13 Apr 2018 13:53:17 +0000 Subject: [PATCH 2/5] Be able to defer mock error for a number of operations So we can run a test where some operations succeed and then subsequent ones fail. --- pkg/chunk/aws/mock.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/chunk/aws/mock.go b/pkg/chunk/aws/mock.go index 19b820a01ea..52072aa7888 100644 --- a/pkg/chunk/aws/mock.go +++ b/pkg/chunk/aws/mock.go @@ -28,6 +28,7 @@ type mockDynamoDBClient struct { mtx sync.RWMutex unprocessed int provisionedErr int + errAfter int tables map[string]*mockDynamoDBTable } @@ -63,7 +64,9 @@ func (m *mockDynamoDBClient) batchWriteItemRequest(_ context.Context, input *dyn UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, } - if m.provisionedErr > 0 { + if m.errAfter > 0 { + m.errAfter-- + } else if m.provisionedErr > 0 { m.provisionedErr-- return &dynamoDBMockRequest{ result: resp, @@ -122,7 +125,9 @@ func (m *mockDynamoDBClient) batchGetItemRequest(_ context.Context, input *dynam UnprocessedKeys: map[string]*dynamodb.KeysAndAttributes{}, } - if m.provisionedErr > 0 { + if m.errAfter > 0 { + m.errAfter-- + } else if m.provisionedErr > 0 { m.provisionedErr-- return &dynamoDBMockRequest{ result: resp, From b71c876e530188f5130c9bb8bedd0ca338bde17b Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 13 Apr 2018 15:25:54 +0000 Subject: [PATCH 3/5] Refactor: extract function to create chunks for testing --- pkg/chunk/storage/storage_client_test.go | 35 +++++++++++++++--------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index 34457ff992f..a583f2f889a 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -23,19 +23,10 @@ func TestChunksBasic(t *testing.T) { // Write a few batches of chunks. written := []string{} for i := 0; i < 50; i++ { - chunks := []chunk.Chunk{} - for j := 0; j < batchSize; j++ { - chunk := dummyChunkFor(model.Now(), model.Metric{ - model.MetricNameLabel: "foo", - "index": model.LabelValue(strconv.Itoa(i*batchSize + j)), - }) - chunks = append(chunks, chunk) - _, err := chunk.Encode() // Need to encode it, side effect calculates crc - require.NoError(t, err) - written = append(written, chunk.ExternalKey()) - } - - err := client.PutChunks(ctx, chunks) + keys, chunks, err := createChunks(i, batchSize) + require.NoError(t, err) + written = append(written, keys...) + err = client.PutChunks(ctx, chunks) require.NoError(t, err) } @@ -66,3 +57,21 @@ func TestChunksBasic(t *testing.T) { } }) } + +func createChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { + keys := []string{} + chunks := []chunk.Chunk{} + for j := 0; j < batchSize; j++ { + chunk := dummyChunkFor(model.Now(), model.Metric{ + model.MetricNameLabel: "foo", + "index": model.LabelValue(strconv.Itoa(startIndex*batchSize + j)), + }) + chunks = append(chunks, chunk) + _, err := chunk.Encode() // Need to encode it, side effect calculates crc + if err != nil { + return nil, nil, err + } + keys = append(keys, chunk.ExternalKey()) + } + return keys, chunks, nil +} From 20a3cedeb79bfddb1f2cfcc6993c92688613c6b3 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 13 Apr 2018 15:28:40 +0000 Subject: [PATCH 4/5] Add test for DynamoDB operation with partial result Some kludging required to (a) make sure it runs only on DynamoDB and (b) access the error parameters which are on a non-exported type. Also lots of magic numbers. It does the job. --- pkg/chunk/aws/mock.go | 7 +++++ pkg/chunk/storage/storage_client_test.go | 33 ++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/pkg/chunk/aws/mock.go b/pkg/chunk/aws/mock.go index 52072aa7888..974963143dc 100644 --- a/pkg/chunk/aws/mock.go +++ b/pkg/chunk/aws/mock.go @@ -48,6 +48,13 @@ func newMockDynamoDB(unprocessed int, provisionedErr int) *mockDynamoDBClient { } } +func (a storageClient) SetErrorParameters(provisionedErr, errAfter int) { + if m, ok := a.DynamoDB.(*mockDynamoDBClient); ok { + m.provisionedErr = provisionedErr + m.errAfter = errAfter + } +} + func (m *mockDynamoDBClient) createTable(name string) { m.mtx.Lock() defer m.mtx.Unlock() diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index a583f2f889a..8307090097b 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "sort" "strconv" + "strings" "testing" "time" @@ -75,3 +76,35 @@ func createChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { } return keys, chunks, nil } + +type clientWithErrorParameters interface { + SetErrorParameters(provisionedErr, errAfter int) +} + +func TestChunksPartialError(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) { + // This test is currently very specialised for DynamoDB + if !strings.Contains(t.Name(), "DynamoDB") { + return + } + // We use some carefully-chosen numbers: + // Start with 150 chunks; DynamoDB writes batches in 25s so 6 batches. + // We tell the client to error after 7 operations so all writes succeed + // and then the 2nd read fails, so we read back only 100 chunks + if ep, ok := client.(clientWithErrorParameters); ok { + ep.SetErrorParameters(22, 7) + } else { + t.Error("DynamoDB test fixture does not support SetErrorParameters() call") + return + } + ctx := context.Background() + _, chunks, err := createChunks(0, 150) + require.NoError(t, err) + err = client.PutChunks(ctx, chunks) + require.NoError(t, err) + + chunksWeGot, err := client.GetChunks(ctx, chunks) + require.Error(t, err) + require.Equal(t, 100, len(chunksWeGot)) + }) +} From 39debda6937f22b0a8eedc0fc89c14ca7741b0ad Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 10 Apr 2018 12:04:27 +0000 Subject: [PATCH 5/5] Return chunks fetched by GetChunks on error If DynamoDB is overloaded, GetChunks() may time out, but we should return the chunks fetch so they can be put into the cache. Then if the client retries we will get a bit further next time. --- pkg/chunk/aws/storage_client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 8ff6f0407e4..d9542814599 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -497,14 +497,12 @@ func (a storageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]c in := <-results if in.err != nil { err = in.err // TODO: cancel other sub-queries at this point - } else { - finalChunks = append(finalChunks, in.chunks...) } + finalChunks = append(finalChunks, in.chunks...) } sp.LogFields(otlog.Int("chunks fetched", len(finalChunks))) if err != nil { sp.LogFields(otlog.String("error", err.Error())) - return nil, err } // Return any chunks we did receive: a partial result may be useful