Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/chunk/aws/dynamodb_table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type dynamoTableClient struct {
DynamoDB dynamodbiface.DynamoDBAPI
ApplicationAutoScaling applicationautoscalingiface.ApplicationAutoScalingAPI
limiter *rate.Limiter
backoffConfig util.BackoffConfig
}

// NewDynamoDBTableClient makes a new DynamoTableClient.
Expand All @@ -63,6 +64,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) {
DynamoDB: dynamoDB,
ApplicationAutoScaling: applicationAutoScaling,
limiter: rate.NewLimiter(rate.Limit(cfg.APILimit), 1),
backoffConfig: cfg.backoffConfig,
}, nil
}

Expand All @@ -71,7 +73,7 @@ func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context.
d.limiter.Wait(ctx)
}

backoff := util.NewBackoff(ctx, backoffConfig)
backoff := util.NewBackoff(ctx, d.backoffConfig)
for backoff.Ongoing() {
if err := fn(ctx); err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "ThrottlingException" {
Expand Down
10 changes: 9 additions & 1 deletion pkg/chunk/aws/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,15 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) chunk.Fixture
}
storage := &storageClient{
cfg: StorageConfig{
DynamoDBConfig: DynamoDBConfig{ChunkGangSize: gangsize, ChunkGetMaxParallelism: maxParallelism},
DynamoDBConfig: DynamoDBConfig{
ChunkGangSize: gangsize,
ChunkGetMaxParallelism: maxParallelism,
backoffConfig: util.BackoffConfig{
MinBackoff: 1 * time.Millisecond,
MaxBackoff: 5 * time.Millisecond,
MaxRetries: 20,
},
},
},
DynamoDB: dynamoDB,
S3: newMockS3(),
Expand Down
16 changes: 14 additions & 2 deletions pkg/chunk/aws/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type mockDynamoDBClient struct {
mtx sync.RWMutex
unprocessed int
provisionedErr int
errAfter int
tables map[string]*mockDynamoDBTable
}

Expand All @@ -47,6 +48,13 @@ func newMockDynamoDB(unprocessed int, provisionedErr int) *mockDynamoDBClient {
}
}

func (a storageClient) SetErrorParameters(provisionedErr, errAfter int) {
if m, ok := a.DynamoDB.(*mockDynamoDBClient); ok {
m.provisionedErr = provisionedErr
m.errAfter = errAfter
}
}

func (m *mockDynamoDBClient) createTable(name string) {
m.mtx.Lock()
defer m.mtx.Unlock()
Expand All @@ -63,7 +71,9 @@ func (m *mockDynamoDBClient) batchWriteItemRequest(_ context.Context, input *dyn
UnprocessedItems: map[string][]*dynamodb.WriteRequest{},
}

if m.provisionedErr > 0 {
if m.errAfter > 0 {
m.errAfter--
} else if m.provisionedErr > 0 {
m.provisionedErr--
return &dynamoDBMockRequest{
result: resp,
Expand Down Expand Up @@ -122,7 +132,9 @@ func (m *mockDynamoDBClient) batchGetItemRequest(_ context.Context, input *dynam
UnprocessedKeys: map[string]*dynamodb.KeysAndAttributes{},
}

if m.provisionedErr > 0 {
if m.errAfter > 0 {
m.errAfter--
} else if m.provisionedErr > 0 {
m.provisionedErr--
return &dynamoDBMockRequest{
result: resp,
Expand Down
22 changes: 8 additions & 14 deletions pkg/chunk/aws/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ const (
dynamoDBMaxReadBatchSize = 100
)

var backoffConfig = util.BackoffConfig{
// Backoff for dynamoDB requests, to match AWS lib - see:
// https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 50 * time.Second,
MaxRetries: 20,
}

var (
dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -112,6 +104,7 @@ type DynamoDBConfig struct {
ApplicationAutoScaling util.URLValue
ChunkGangSize int
ChunkGetMaxParallelism int
backoffConfig util.BackoffConfig
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -122,6 +115,9 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.")
f.IntVar(&cfg.ChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)")
f.IntVar(&cfg.ChunkGetMaxParallelism, "dynamodb.chunk.get.max.parallelism", 32, "Max number of chunk-get operations to start in parallel")
f.DurationVar(&cfg.backoffConfig.MinBackoff, "dynamodb.min-backoff", 100*time.Millisecond, "Minimum backoff time")
f.DurationVar(&cfg.backoffConfig.MaxBackoff, "dynamodb.max-backoff", 50*time.Second, "Maximum backoff time")
f.IntVar(&cfg.backoffConfig.MaxRetries, "dynamodb.max-retries", 20, "Maximum number of times to retry an operation")
}

// StorageConfig specifies config for storing data on AWS.
Expand Down Expand Up @@ -213,7 +209,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
outstanding := input.(dynamoDBWriteBatch)
unprocessed := dynamoDBWriteBatch{}

backoff := util.NewBackoff(ctx, backoffConfig)
backoff := util.NewBackoff(ctx, a.cfg.backoffConfig)
defer func() {
dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.NumRetries()))
}()
Expand Down Expand Up @@ -346,7 +342,7 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c
}

func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) {
backoff := util.NewBackoff(ctx, backoffConfig)
backoff := util.NewBackoff(ctx, a.cfg.backoffConfig)
defer func() {
dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries()))
}()
Expand Down Expand Up @@ -501,14 +497,12 @@ func (a storageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]c
in := <-results
if in.err != nil {
err = in.err // TODO: cancel other sub-queries at this point
} else {
finalChunks = append(finalChunks, in.chunks...)
}
finalChunks = append(finalChunks, in.chunks...)
}
sp.LogFields(otlog.Int("chunks fetched", len(finalChunks)))
if err != nil {
sp.LogFields(otlog.String("error", err.Error()))
return nil, err
}

// Return any chunks we did receive: a partial result may be useful
Expand Down Expand Up @@ -592,7 +586,7 @@ func (a storageClient) getDynamoDBChunks(ctx context.Context, chunks []chunk.Chu

result := []chunk.Chunk{}
unprocessed := dynamoDBReadRequest{}
backoff := util.NewBackoff(ctx, backoffConfig)
backoff := util.NewBackoff(ctx, a.cfg.backoffConfig)
defer func() {
dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.NumRetries()))
}()
Expand Down
68 changes: 55 additions & 13 deletions pkg/chunk/storage/storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"sort"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -23,19 +24,10 @@ func TestChunksBasic(t *testing.T) {
// Write a few batches of chunks.
written := []string{}
for i := 0; i < 50; i++ {
chunks := []chunk.Chunk{}
for j := 0; j < batchSize; j++ {
chunk := dummyChunkFor(model.Now(), model.Metric{
model.MetricNameLabel: "foo",
"index": model.LabelValue(strconv.Itoa(i*batchSize + j)),
})
chunks = append(chunks, chunk)
_, err := chunk.Encode() // Need to encode it, side effect calculates crc
require.NoError(t, err)
written = append(written, chunk.ExternalKey())
}

err := client.PutChunks(ctx, chunks)
keys, chunks, err := createChunks(i, batchSize)
require.NoError(t, err)
written = append(written, keys...)
err = client.PutChunks(ctx, chunks)
require.NoError(t, err)
}

Expand Down Expand Up @@ -66,3 +58,53 @@ func TestChunksBasic(t *testing.T) {
}
})
}

func createChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) {
keys := []string{}
chunks := []chunk.Chunk{}
for j := 0; j < batchSize; j++ {
chunk := dummyChunkFor(model.Now(), model.Metric{
model.MetricNameLabel: "foo",
"index": model.LabelValue(strconv.Itoa(startIndex*batchSize + j)),
})
chunks = append(chunks, chunk)
_, err := chunk.Encode() // Need to encode it, side effect calculates crc
if err != nil {
return nil, nil, err
}
keys = append(keys, chunk.ExternalKey())
}
return keys, chunks, nil
}

type clientWithErrorParameters interface {
SetErrorParameters(provisionedErr, errAfter int)
}

func TestChunksPartialError(t *testing.T) {
forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) {
// This test is currently very specialised for DynamoDB
if !strings.Contains(t.Name(), "DynamoDB") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this test belongs in aws package instead then? We already put the dynamodb specific table manager tests there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried. It would need more mechanism to get to the fixture stuff, while avoiding circular dependencies.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh well, never mind then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had another go; after 2 hours it's still not passing tests so going to merge this one and post the refactor as a separate PR.

return
}
// We use some carefully-chosen numbers:
// Start with 150 chunks; DynamoDB writes batches in 25s so 6 batches.
// We tell the client to error after 7 operations so all writes succeed
// and then the 2nd read fails, so we read back only 100 chunks
if ep, ok := client.(clientWithErrorParameters); ok {
ep.SetErrorParameters(22, 7)
} else {
t.Error("DynamoDB test fixture does not support SetErrorParameters() call")
return
}
ctx := context.Background()
_, chunks, err := createChunks(0, 150)
require.NoError(t, err)
err = client.PutChunks(ctx, chunks)
require.NoError(t, err)

chunksWeGot, err := client.GetChunks(ctx, chunks)
require.Error(t, err)
require.Equal(t, 100, len(chunksWeGot))
})
}