From dadcaf50de282663a05ebb41489205782ba3cd3a Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 9 Jan 2018 11:33:54 +0000 Subject: [PATCH] Limit parallelism when fetching chunks from DynamoDB If someone sends a query that requires 260,000 chunks we don't want to hit DynamoDB from 260 goroutines in parallel. --- pkg/chunk/aws_storage_client.go | 12 +++++++++--- pkg/chunk/aws_storage_client_test.go | 9 +++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/chunk/aws_storage_client.go b/pkg/chunk/aws_storage_client.go index 59e3b6a60b2..a0072b3711d 100644 --- a/pkg/chunk/aws_storage_client.go +++ b/pkg/chunk/aws_storage_client.go @@ -109,7 +109,8 @@ type DynamoDBConfig struct { DynamoDB util.URLValue APILimit float64 ApplicationAutoScaling util.URLValue - DynamoDBChunkGangSize int + ChunkGangSize int + ChunkGetMaxParallelism int } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -118,7 +119,8 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { "If only region is specified as a host, proper endpoint will be deduced. Use inmemory:/// to use a mock in-memory implementation.") f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.") f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.") - f.IntVar(&cfg.DynamoDBChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)") + f.IntVar(&cfg.ChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)") + f.IntVar(&cfg.ChunkGetMaxParallelism, "dynamodb.chunk.get.max.parallelism", 32, "Max number of chunk-get operations to start in parallel") } // AWSStorageConfig specifies config for storing data on AWS. @@ -450,9 +452,13 @@ func (a awsStorageClient) GetChunks(ctx context.Context, chunks []Chunk) ([]Chun return s3Chunks, err } - gangSize := a.cfg.DynamoDBChunkGangSize * dynamoDBMaxReadBatchSize + gangSize := a.cfg.ChunkGangSize * dynamoDBMaxReadBatchSize if gangSize == 0 { // zero means turn feature off gangSize = len(dynamoDBChunks) + } else { + if len(dynamoDBChunks)/gangSize > a.cfg.ChunkGetMaxParallelism { + gangSize = len(dynamoDBChunks)/a.cfg.ChunkGetMaxParallelism + 1 + } } results := make(chan chunksPlusError) diff --git a/pkg/chunk/aws_storage_client_test.go b/pkg/chunk/aws_storage_client_test.go index 80df93a1530..6af399337f9 100644 --- a/pkg/chunk/aws_storage_client_test.go +++ b/pkg/chunk/aws_storage_client_test.go @@ -480,10 +480,11 @@ func TestAWSStorageClientChunks(t *testing.T) { name string provisionedErr int gangSize int + maxParallelism int }{ - {"DynamoDB chunks", 0, 10}, - {"DynamoDB chunks with parallel fetch disabled", 0, 0}, - {"DynamoDB chunks retry logic", 2, 10}, + {"DynamoDB chunks", 0, 10, 20}, + {"DynamoDB chunks with parallel fetch disabled", 0, 0, 20}, + {"DynamoDB chunks retry logic", 2, 10, 20}, } for _, tt := range tests { @@ -508,7 +509,7 @@ func TestAWSStorageClientChunks(t *testing.T) { client := awsStorageClient{ cfg: AWSStorageConfig{ - DynamoDBConfig: DynamoDBConfig{DynamoDBChunkGangSize: tt.gangSize}, + DynamoDBConfig: DynamoDBConfig{ChunkGangSize: tt.gangSize, ChunkGetMaxParallelism: tt.maxParallelism}, }, DynamoDB: dynamoDB, schemaCfg: schemaConfig,