diff --git a/pkg/chunk/aws/dynamodb_table_client.go b/pkg/chunk/aws/dynamodb_table_client.go index b134965edb8..be443040ccf 100644 --- a/pkg/chunk/aws/dynamodb_table_client.go +++ b/pkg/chunk/aws/dynamodb_table_client.go @@ -41,6 +41,7 @@ type dynamoTableClient struct { DynamoDB dynamodbiface.DynamoDBAPI ApplicationAutoScaling applicationautoscalingiface.ApplicationAutoScalingAPI limiter *rate.Limiter + backoffConfig util.BackoffConfig } // NewDynamoDBTableClient makes a new DynamoTableClient. @@ -63,6 +64,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { DynamoDB: dynamoDB, ApplicationAutoScaling: applicationAutoScaling, limiter: rate.NewLimiter(rate.Limit(cfg.APILimit), 1), + backoffConfig: cfg.backoffConfig, }, nil } @@ -71,7 +73,7 @@ func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context. d.limiter.Wait(ctx) } - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(ctx, d.backoffConfig) for backoff.Ongoing() { if err := fn(ctx); err != nil { if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "ThrottlingException" { diff --git a/pkg/chunk/aws/fixtures.go b/pkg/chunk/aws/fixtures.go index 7665b6d86aa..d38be932822 100644 --- a/pkg/chunk/aws/fixtures.go +++ b/pkg/chunk/aws/fixtures.go @@ -70,7 +70,15 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) chunk.Fixture } storage := &storageClient{ cfg: StorageConfig{ - DynamoDBConfig: DynamoDBConfig{ChunkGangSize: gangsize, ChunkGetMaxParallelism: maxParallelism}, + DynamoDBConfig: DynamoDBConfig{ + ChunkGangSize: gangsize, + ChunkGetMaxParallelism: maxParallelism, + backoffConfig: util.BackoffConfig{ + MinBackoff: 1 * time.Millisecond, + MaxBackoff: 5 * time.Millisecond, + MaxRetries: 20, + }, + }, }, DynamoDB: dynamoDB, S3: newMockS3(), diff --git a/pkg/chunk/aws/mock.go b/pkg/chunk/aws/mock.go index 19b820a01ea..974963143dc 100644 --- a/pkg/chunk/aws/mock.go +++ b/pkg/chunk/aws/mock.go @@ -28,6 +28,7 @@ type mockDynamoDBClient struct { mtx sync.RWMutex unprocessed int provisionedErr int + errAfter int tables map[string]*mockDynamoDBTable } @@ -47,6 +48,13 @@ func newMockDynamoDB(unprocessed int, provisionedErr int) *mockDynamoDBClient { } } +func (a storageClient) SetErrorParameters(provisionedErr, errAfter int) { + if m, ok := a.DynamoDB.(*mockDynamoDBClient); ok { + m.provisionedErr = provisionedErr + m.errAfter = errAfter + } +} + func (m *mockDynamoDBClient) createTable(name string) { m.mtx.Lock() defer m.mtx.Unlock() @@ -63,7 +71,9 @@ func (m *mockDynamoDBClient) batchWriteItemRequest(_ context.Context, input *dyn UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, } - if m.provisionedErr > 0 { + if m.errAfter > 0 { + m.errAfter-- + } else if m.provisionedErr > 0 { m.provisionedErr-- return &dynamoDBMockRequest{ result: resp, @@ -122,7 +132,9 @@ func (m *mockDynamoDBClient) batchGetItemRequest(_ context.Context, input *dynam UnprocessedKeys: map[string]*dynamodb.KeysAndAttributes{}, } - if m.provisionedErr > 0 { + if m.errAfter > 0 { + m.errAfter-- + } else if m.provisionedErr > 0 { m.provisionedErr-- return &dynamoDBMockRequest{ result: resp, diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index b633104e5f4..d9542814599 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -46,14 +46,6 @@ const ( dynamoDBMaxReadBatchSize = 100 ) -var backoffConfig = util.BackoffConfig{ - // Backoff for dynamoDB requests, to match AWS lib - see: - // https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go - MinBackoff: 100 * time.Millisecond, - MaxBackoff: 50 * time.Second, - MaxRetries: 20, -} - var ( dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -112,6 +104,7 @@ type DynamoDBConfig struct { ApplicationAutoScaling util.URLValue ChunkGangSize int ChunkGetMaxParallelism int + backoffConfig util.BackoffConfig } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -122,6 +115,9 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.") f.IntVar(&cfg.ChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)") f.IntVar(&cfg.ChunkGetMaxParallelism, "dynamodb.chunk.get.max.parallelism", 32, "Max number of chunk-get operations to start in parallel") + f.DurationVar(&cfg.backoffConfig.MinBackoff, "dynamodb.min-backoff", 100*time.Millisecond, "Minimum backoff time") + f.DurationVar(&cfg.backoffConfig.MaxBackoff, "dynamodb.max-backoff", 50*time.Second, "Maximum backoff time") + f.IntVar(&cfg.backoffConfig.MaxRetries, "dynamodb.max-retries", 20, "Maximum number of times to retry an operation") } // StorageConfig specifies config for storing data on AWS. @@ -213,7 +209,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e outstanding := input.(dynamoDBWriteBatch) unprocessed := dynamoDBWriteBatch{} - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.NumRetries())) }() @@ -346,7 +342,7 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c } func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) { - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) }() @@ -501,14 +497,12 @@ func (a storageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]c in := <-results if in.err != nil { err = in.err // TODO: cancel other sub-queries at this point - } else { - finalChunks = append(finalChunks, in.chunks...) } + finalChunks = append(finalChunks, in.chunks...) } sp.LogFields(otlog.Int("chunks fetched", len(finalChunks))) if err != nil { sp.LogFields(otlog.String("error", err.Error())) - return nil, err } // Return any chunks we did receive: a partial result may be useful @@ -592,7 +586,7 @@ func (a storageClient) getDynamoDBChunks(ctx context.Context, chunks []chunk.Chu result := []chunk.Chunk{} unprocessed := dynamoDBReadRequest{} - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.NumRetries())) }() diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index 34457ff992f..8307090097b 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "sort" "strconv" + "strings" "testing" "time" @@ -23,19 +24,10 @@ func TestChunksBasic(t *testing.T) { // Write a few batches of chunks. written := []string{} for i := 0; i < 50; i++ { - chunks := []chunk.Chunk{} - for j := 0; j < batchSize; j++ { - chunk := dummyChunkFor(model.Now(), model.Metric{ - model.MetricNameLabel: "foo", - "index": model.LabelValue(strconv.Itoa(i*batchSize + j)), - }) - chunks = append(chunks, chunk) - _, err := chunk.Encode() // Need to encode it, side effect calculates crc - require.NoError(t, err) - written = append(written, chunk.ExternalKey()) - } - - err := client.PutChunks(ctx, chunks) + keys, chunks, err := createChunks(i, batchSize) + require.NoError(t, err) + written = append(written, keys...) + err = client.PutChunks(ctx, chunks) require.NoError(t, err) } @@ -66,3 +58,53 @@ func TestChunksBasic(t *testing.T) { } }) } + +func createChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { + keys := []string{} + chunks := []chunk.Chunk{} + for j := 0; j < batchSize; j++ { + chunk := dummyChunkFor(model.Now(), model.Metric{ + model.MetricNameLabel: "foo", + "index": model.LabelValue(strconv.Itoa(startIndex*batchSize + j)), + }) + chunks = append(chunks, chunk) + _, err := chunk.Encode() // Need to encode it, side effect calculates crc + if err != nil { + return nil, nil, err + } + keys = append(keys, chunk.ExternalKey()) + } + return keys, chunks, nil +} + +type clientWithErrorParameters interface { + SetErrorParameters(provisionedErr, errAfter int) +} + +func TestChunksPartialError(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) { + // This test is currently very specialised for DynamoDB + if !strings.Contains(t.Name(), "DynamoDB") { + return + } + // We use some carefully-chosen numbers: + // Start with 150 chunks; DynamoDB writes batches in 25s so 6 batches. + // We tell the client to error after 7 operations so all writes succeed + // and then the 2nd read fails, so we read back only 100 chunks + if ep, ok := client.(clientWithErrorParameters); ok { + ep.SetErrorParameters(22, 7) + } else { + t.Error("DynamoDB test fixture does not support SetErrorParameters() call") + return + } + ctx := context.Background() + _, chunks, err := createChunks(0, 150) + require.NoError(t, err) + err = client.PutChunks(ctx, chunks) + require.NoError(t, err) + + chunksWeGot, err := client.GetChunks(ctx, chunks) + require.Error(t, err) + require.Equal(t, 100, len(chunksWeGot)) + }) +}