From bec45d9945fc10b8c4eff934e475ce4320fb0367 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 15 Nov 2017 22:14:57 +0000 Subject: [PATCH] Simple parallel chunk fetching from DynamoDB Run multiple goroutines in parallel to drive DynamoDB harder. Group chunk fetches into "gangs" to allow some configuration over how hard we hit DynamoDB --- pkg/chunk/aws_storage_client.go | 38 ++++++++++++++++++++++++++-- pkg/chunk/aws_storage_client_test.go | 9 +++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pkg/chunk/aws_storage_client.go b/pkg/chunk/aws_storage_client.go index 7bf8d25cb47..c7948d42555 100644 --- a/pkg/chunk/aws_storage_client.go +++ b/pkg/chunk/aws_storage_client.go @@ -107,6 +107,7 @@ type DynamoDBConfig struct { DynamoDB util.URLValue APILimit float64 ApplicationAutoScaling util.URLValue + DynamoDBChunkGangSize int } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -115,6 +116,7 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { "If only region is specified as a host, proper endpoint will be deduced. Use inmemory:/// to use a mock in-memory implementation.") f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.") f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.") + f.IntVar(&cfg.DynamoDBChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)") } // AWSStorageConfig specifies config for storing data on AWS. @@ -415,6 +417,11 @@ func (a dynamoDBRequestAdapter) Retryable() bool { return *a.request.Retryable } +type chunksPlusError struct { + chunks []Chunk + err error +} + func (a awsStorageClient) GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) { sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks") defer sp.Finish() @@ -443,10 +450,37 @@ func (a awsStorageClient) GetChunks(ctx context.Context, chunks []Chunk) ([]Chun return s3Chunks, err } - dynamoDBChunks, err = a.getDynamoDBChunks(ctx, dynamoDBChunks) + gangSize := a.cfg.DynamoDBChunkGangSize * dynamoDBMaxReadBatchSize + if gangSize == 0 { // zero means turn feature off + gangSize = len(dynamoDBChunks) + } + + results := make(chan chunksPlusError) + for i := 0; i < len(dynamoDBChunks); i += gangSize { + go func(start int) { + end := start + gangSize + if end > len(dynamoDBChunks) { + end = len(dynamoDBChunks) + } + outChunks, err := a.getDynamoDBChunks(ctx, dynamoDBChunks[start:end]) + results <- chunksPlusError{outChunks, err} + }(i) + } + finalChunks := s3Chunks + for i := 0; i < len(dynamoDBChunks); i += gangSize { + in := <-results + if in.err != nil { + err = in.err // TODO: cancel other sub-queries at this point + } else { + finalChunks = append(finalChunks, in.chunks...) + } + } + if err != nil { + return nil, err + } // Return any chunks we did receive: a partial result may be useful - return append(dynamoDBChunks, s3Chunks...), err + return finalChunks, err } func (a awsStorageClient) getS3Chunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) { diff --git a/pkg/chunk/aws_storage_client_test.go b/pkg/chunk/aws_storage_client_test.go index 76cee508660..6a077f6db2d 100644 --- a/pkg/chunk/aws_storage_client_test.go +++ b/pkg/chunk/aws_storage_client_test.go @@ -479,9 +479,11 @@ func TestAWSStorageClientChunks(t *testing.T) { tests := []struct { name string provisionedErr int + gangSize int }{ - {"DynamoDB chunks", 0}, - {"DynamoDB chunks retry logic", 2}, + {"DynamoDB chunks", 0, 10}, + {"DynamoDB chunks with parallel fetch disabled", 0, 0}, + {"DynamoDB chunks retry logic", 2, 10}, } for _, tt := range tests { @@ -505,6 +507,9 @@ func TestAWSStorageClientChunks(t *testing.T) { require.NoError(t, err) client := awsStorageClient{ + cfg: AWSStorageConfig{ + DynamoDBConfig: DynamoDBConfig{DynamoDBChunkGangSize: tt.gangSize}, + }, DynamoDB: dynamoDB, schemaCfg: schemaConfig, queryRequestFn: dynamoDB.queryRequest,