Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/chunk/aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ type DynamoDBConfig struct {
DynamoDB util.URLValue
APILimit float64
ApplicationAutoScaling util.URLValue
DynamoDBChunkGangSize int
ChunkGangSize int
ChunkGetMaxParallelism int
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -118,7 +119,8 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<table-name> to use a mock in-memory implementation.")
f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.")
f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.")
f.IntVar(&cfg.DynamoDBChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)")
f.IntVar(&cfg.ChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)")
f.IntVar(&cfg.ChunkGetMaxParallelism, "dynamodb.chunk.get.max.parallelism", 32, "Max number of chunk-get operations to start in parallel")
}

// AWSStorageConfig specifies config for storing data on AWS.
Expand Down Expand Up @@ -450,9 +452,13 @@ func (a awsStorageClient) GetChunks(ctx context.Context, chunks []Chunk) ([]Chun
return s3Chunks, err
}

gangSize := a.cfg.DynamoDBChunkGangSize * dynamoDBMaxReadBatchSize
gangSize := a.cfg.ChunkGangSize * dynamoDBMaxReadBatchSize
if gangSize == 0 { // zero means turn feature off
gangSize = len(dynamoDBChunks)
} else {
if len(dynamoDBChunks)/gangSize > a.cfg.ChunkGetMaxParallelism {
gangSize = len(dynamoDBChunks)/a.cfg.ChunkGetMaxParallelism + 1
}
}

results := make(chan chunksPlusError)
Expand Down
9 changes: 5 additions & 4 deletions pkg/chunk/aws_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,11 @@ func TestAWSStorageClientChunks(t *testing.T) {
name string
provisionedErr int
gangSize int
maxParallelism int
}{
{"DynamoDB chunks", 0, 10},
{"DynamoDB chunks with parallel fetch disabled", 0, 0},
{"DynamoDB chunks retry logic", 2, 10},
{"DynamoDB chunks", 0, 10, 20},
{"DynamoDB chunks with parallel fetch disabled", 0, 0, 20},
{"DynamoDB chunks retry logic", 2, 10, 20},
}

for _, tt := range tests {
Expand All @@ -508,7 +509,7 @@ func TestAWSStorageClientChunks(t *testing.T) {

client := awsStorageClient{
cfg: AWSStorageConfig{
DynamoDBConfig: DynamoDBConfig{DynamoDBChunkGangSize: tt.gangSize},
DynamoDBConfig: DynamoDBConfig{ChunkGangSize: tt.gangSize, ChunkGetMaxParallelism: tt.maxParallelism},
},
DynamoDB: dynamoDB,
schemaCfg: schemaConfig,
Expand Down