diff --git a/chunk/aws_storage_client.go b/chunk/aws_storage_client.go index a344604d4d4..cc2d9bb5c17 100644 --- a/chunk/aws_storage_client.go +++ b/chunk/aws_storage_client.go @@ -240,6 +240,16 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call } } + // Filters + if query.ValueEqual != nil { + input.FilterExpression = aws.String(fmt.Sprintf("%s = :v", valueKey)) + input.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ + ":v": { + B: query.ValueEqual, + }, + } + } + request := a.queryRequestFn(input) backoff := minBackoff for page := request; page != nil; page = page.NextPage() { diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index c3b4fc15e79..3d8627109a1 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/prometheus/common/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/context" @@ -101,11 +102,46 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe Items: []map[string]*dynamodb.AttributeValue{}, } + // Required filters hashValue := *input.KeyConditions[hashKey].AttributeValueList[0].S - items := m.tables[*input.TableName].items[hashValue] - // TODO we should also filter by range value + // Optional filters + var ( + rangeValueFilter []byte + rangeValueFilterType string + ) + if c, ok := input.KeyConditions[rangeKey]; ok { + rangeValueFilter = c.AttributeValueList[0].B + rangeValueFilterType = *c.ComparisonOperator + } + + // Filter by HashValue, RangeValue and Value if it exists + items := m.tables[*input.TableName].items[hashValue] for _, item := range items { + rangeValue := item[rangeKey].B + if rangeValueFilterType == dynamodb.ComparisonOperatorGe && bytes.Compare(rangeValue, rangeValueFilter) < 0 { + continue + } + if rangeValueFilterType == dynamodb.ComparisonOperatorBeginsWith && !bytes.HasPrefix(rangeValue, rangeValueFilter) { + continue + } + + if item[valueKey] != nil { + value := item[valueKey].B + + // Apply filterExpression if it exists (supporting only v = :v) + if input.FilterExpression != nil { + if *input.FilterExpression == fmt.Sprintf("%s = :v", valueKey) { + filterValue := input.ExpressionAttributeValues[":v"].B + if !bytes.Equal(value, filterValue) { + continue + } + } else { + log.Warnf("Unsupported FilterExpression: %s", *input.FilterExpression) + } + } + } + result.Items = append(result.Items, item) } @@ -170,6 +206,136 @@ func TestDynamoDBClient(t *testing.T) { } } +func TestDynamoDBClientQueryPages(t *testing.T) { + dynamoDB := newMockDynamoDB(0, 0) + client := awsStorageClient{ + DynamoDB: dynamoDB, + queryRequestFn: dynamoDB.queryRequest, + } + + entries := []IndexEntry{ + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:3"), + Value: []byte("30"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:1"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:2"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:3"), + Value: []byte("abc"), + }, + } + + tests := []struct { + name string + query IndexQuery + want []IndexEntry + }{ + { + "check HashValue only", + IndexQuery{ + TableName: "table", + HashValue: "flip", + }, + []IndexEntry{entries[5], entries[6], entries[7]}, + }, + { + "check RangeValueStart", + IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValueStart: []byte("bar:2"), + }, + []IndexEntry{entries[1], entries[2], entries[3], entries[4]}, + }, + { + "check RangeValuePrefix", + IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("baz:"), + }, + []IndexEntry{entries[3], entries[4]}, + }, + { + "check ValueEqual", + IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEqual: []byte("20"), + }, + []IndexEntry{entries[1]}, + }, + } + + batch := client.NewWriteBatch() + for _, entry := range entries { + batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } + dynamoDB.createTable("table") + + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var have []IndexEntry + err := client.QueryPages(context.Background(), tt.query, func(read ReadBatch, lastPage bool) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, IndexEntry{ + TableName: tt.query.TableName, + HashValue: tt.query.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return !lastPage + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) + }) + } +} + func TestAWSConfigFromURL(t *testing.T) { for _, tc := range []struct { url string diff --git a/chunk/inmemory_storage_client.go b/chunk/inmemory_storage_client.go index 954905271fa..e1458f7c322 100644 --- a/chunk/inmemory_storage_client.go +++ b/chunk/inmemory_storage_client.go @@ -207,6 +207,19 @@ func (m *MockStorage) QueryPages(_ context.Context, query IndexQuery, callback f log.Debugf("Lookup %s/* (%d)", query.HashValue, len(items)) } + // Filters + if query.ValueEqual != nil { + log.Debugf("Filter Value EQ = %s", query.ValueEqual) + + filtered := make([]mockItem, 0) + for _, v := range items { + if bytes.Equal(v.value, query.ValueEqual) { + filtered = append(filtered, v) + } + } + items = filtered + } + result := mockReadBatch{} for _, item := range items { result = append(result, item) diff --git a/chunk/schema.go b/chunk/schema.go index 0f241dddfdb..225eda5bdbc 100644 --- a/chunk/schema.go +++ b/chunk/schema.go @@ -48,6 +48,9 @@ type IndexQuery struct { // - If neither is set, must read all keys for that row. RangeValuePrefix []byte RangeValueStart []byte + + // Filters for querying + ValueEqual []byte } // IndexEntry describes an entry in the chunk index @@ -492,6 +495,7 @@ func (v6Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. TableName: bucket.tableName, HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, labelName), RangeValueStart: encodeRangeKey(encodedFromBytes), + ValueEqual: []byte(labelValue), }, }, nil }