Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions pkg/chunk/aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type DynamoDBConfig struct {
DynamoDB util.URLValue
APILimit float64
ApplicationAutoScaling util.URLValue
DynamoDBChunkGangSize int
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -115,6 +116,7 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<table-name> to use a mock in-memory implementation.")
f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.")
f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.")
f.IntVar(&cfg.DynamoDBChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm more familiar with choosing the level of parallelism (e.g. how many concurrent goroutines) than choosing the size of each concurrent job, as you're doing here. I don't have opinions on which is better. Why did you decide to do it this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choosing the level of parallelism requires a global queue of work across all queries. I'd like to do that, but don't feel I can complete it in the current sprint.

Computing the "gang size" to target a certain parallelism per query is harder to tune (wanting to keep the batches sent to DynamdDB fairly large), and the end result, given many queries in parallel, will still have highly variable overall parallelism.

}

// AWSStorageConfig specifies config for storing data on AWS.
Expand Down Expand Up @@ -415,6 +417,11 @@ func (a dynamoDBRequestAdapter) Retryable() bool {
return *a.request.Retryable
}

type chunksPlusError struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obligatory type theory nerdery: this is more accurately chunksTimesError. I'm not actually suggesting you change this.

chunks []Chunk
err error
}

func (a awsStorageClient) GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) {
sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks")
defer sp.Finish()
Expand Down Expand Up @@ -443,10 +450,37 @@ func (a awsStorageClient) GetChunks(ctx context.Context, chunks []Chunk) ([]Chun
return s3Chunks, err
}

dynamoDBChunks, err = a.getDynamoDBChunks(ctx, dynamoDBChunks)
gangSize := a.cfg.DynamoDBChunkGangSize * dynamoDBMaxReadBatchSize
if gangSize == 0 { // zero means turn feature off
gangSize = len(dynamoDBChunks)
}

results := make(chan chunksPlusError)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a few other implementations of "parallel map a function that returns (a, err)". Some of the others use a buffered channel. I don't think it's necessary here, because the loop immediately following will drain all of results as quickly as possible.

No action required, just flagging in case my reading is wrong and you notice something on a second glance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a buffer would be extra memory management overhead for no obvious benefit.

for i := 0; i < len(dynamoDBChunks); i += gangSize {
go func(start int) {
end := start + gangSize
if end > len(dynamoDBChunks) {
end = len(dynamoDBChunks)
}
outChunks, err := a.getDynamoDBChunks(ctx, dynamoDBChunks[start:end])
results <- chunksPlusError{outChunks, err}
}(i)
}
finalChunks := s3Chunks
for i := 0; i < len(dynamoDBChunks); i += gangSize {
in := <-results
if in.err != nil {
err = in.err // TODO: cancel other sub-queries at this point
} else {
finalChunks = append(finalChunks, in.chunks...)
}
}
if err != nil {
return nil, err
}

// Return any chunks we did receive: a partial result may be useful
return append(dynamoDBChunks, s3Chunks...), err
return finalChunks, err
}

func (a awsStorageClient) getS3Chunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) {
Expand Down
9 changes: 7 additions & 2 deletions pkg/chunk/aws_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,11 @@ func TestAWSStorageClientChunks(t *testing.T) {
tests := []struct {
name string
provisionedErr int
gangSize int
}{
{"DynamoDB chunks", 0},
{"DynamoDB chunks retry logic", 2},
{"DynamoDB chunks", 0, 10},
{"DynamoDB chunks with parallel fetch disabled", 0, 0},
{"DynamoDB chunks retry logic", 2, 10},
}

for _, tt := range tests {
Expand All @@ -505,6 +507,9 @@ func TestAWSStorageClientChunks(t *testing.T) {
require.NoError(t, err)

client := awsStorageClient{
cfg: AWSStorageConfig{
DynamoDBConfig: DynamoDBConfig{DynamoDBChunkGangSize: tt.gangSize},
},
DynamoDB: dynamoDB,
schemaCfg: schemaConfig,
queryRequestFn: dynamoDB.queryRequest,
Expand Down