Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions chunk/aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call
}
}

// Filters
if query.ValueEqual != nil {
input.FilterExpression = aws.String(fmt.Sprintf("%s = :v", valueKey))
input.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{
":v": {
B: query.ValueEqual,
},
}
}

request := a.queryRequestFn(input)
backoff := minBackoff
for page := request; page != nil; page = page.NextPage() {
Expand Down
170 changes: 168 additions & 2 deletions chunk/aws_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/prometheus/common/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
Expand Down Expand Up @@ -101,11 +102,46 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe
Items: []map[string]*dynamodb.AttributeValue{},
}

// Required filters
hashValue := *input.KeyConditions[hashKey].AttributeValueList[0].S
items := m.tables[*input.TableName].items[hashValue]

// TODO we should also filter by range value
// Optional filters
var (
rangeValueFilter []byte
rangeValueFilterType string
)
if c, ok := input.KeyConditions[rangeKey]; ok {
rangeValueFilter = c.AttributeValueList[0].B
rangeValueFilterType = *c.ComparisonOperator
}

// Filter by HashValue, RangeValue and Value if it exists
items := m.tables[*input.TableName].items[hashValue]
for _, item := range items {
rangeValue := item[rangeKey].B
if rangeValueFilterType == dynamodb.ComparisonOperatorGe && bytes.Compare(rangeValue, rangeValueFilter) < 0 {
continue
}
if rangeValueFilterType == dynamodb.ComparisonOperatorBeginsWith && !bytes.HasPrefix(rangeValue, rangeValueFilter) {
continue
}

if item[valueKey] != nil {
value := item[valueKey].B

// Apply filterExpression if it exists (supporting only v = :v)
if input.FilterExpression != nil {
if *input.FilterExpression == fmt.Sprintf("%s = :v", valueKey) {
filterValue := input.ExpressionAttributeValues[":v"].B
if !bytes.Equal(value, filterValue) {
continue
}
} else {
log.Warnf("Unsupported FilterExpression: %s", *input.FilterExpression)
}
}
}

result.Items = append(result.Items, item)
}

Expand Down Expand Up @@ -170,6 +206,136 @@ func TestDynamoDBClient(t *testing.T) {
}
}

func TestDynamoDBClientQueryPages(t *testing.T) {
dynamoDB := newMockDynamoDB(0, 0)
client := awsStorageClient{
DynamoDB: dynamoDB,
queryRequestFn: dynamoDB.queryRequest,
}

entries := []IndexEntry{
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("bar:1"),
Value: []byte("10"),
},
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("bar:2"),
Value: []byte("20"),
},
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("bar:3"),
Value: []byte("30"),
},
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("baz:1"),
Value: []byte("10"),
},
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("baz:2"),
Value: []byte("20"),
},
{
TableName: "table",
HashValue: "flip",
RangeValue: []byte("bar:1"),
Value: []byte("abc"),
},
{
TableName: "table",
HashValue: "flip",
RangeValue: []byte("bar:2"),
Value: []byte("abc"),
},
{
TableName: "table",
HashValue: "flip",
RangeValue: []byte("bar:3"),
Value: []byte("abc"),
},
}

tests := []struct {
name string
query IndexQuery
want []IndexEntry
}{
{
"check HashValue only",
IndexQuery{
TableName: "table",
HashValue: "flip",
},
[]IndexEntry{entries[5], entries[6], entries[7]},
},
{
"check RangeValueStart",
IndexQuery{
TableName: "table",
HashValue: "foo",
RangeValueStart: []byte("bar:2"),
},
[]IndexEntry{entries[1], entries[2], entries[3], entries[4]},
},
{
"check RangeValuePrefix",
IndexQuery{
TableName: "table",
HashValue: "foo",
RangeValuePrefix: []byte("baz:"),
},
[]IndexEntry{entries[3], entries[4]},
},
{
"check ValueEqual",
IndexQuery{
TableName: "table",
HashValue: "foo",
RangeValuePrefix: []byte("bar"),
ValueEqual: []byte("20"),
},
[]IndexEntry{entries[1]},
},
}

batch := client.NewWriteBatch()
for _, entry := range entries {
batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value)
}
dynamoDB.createTable("table")

err := client.BatchWrite(context.Background(), batch)
require.NoError(t, err)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var have []IndexEntry
err := client.QueryPages(context.Background(), tt.query, func(read ReadBatch, lastPage bool) bool {
for i := 0; i < read.Len(); i++ {
have = append(have, IndexEntry{
TableName: tt.query.TableName,
HashValue: tt.query.HashValue,
RangeValue: read.RangeValue(i),
Value: read.Value(i),
})
}
return !lastPage
})
require.NoError(t, err)
require.Equal(t, tt.want, have)
})
}
}

func TestAWSConfigFromURL(t *testing.T) {
for _, tc := range []struct {
url string
Expand Down
13 changes: 13 additions & 0 deletions chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ func (m *MockStorage) QueryPages(_ context.Context, query IndexQuery, callback f
log.Debugf("Lookup %s/* (%d)", query.HashValue, len(items))
}

// Filters
if query.ValueEqual != nil {
log.Debugf("Filter Value EQ = %s", query.ValueEqual)

filtered := make([]mockItem, 0)
for _, v := range items {
if bytes.Equal(v.value, query.ValueEqual) {
filtered = append(filtered, v)
}
}
items = filtered
}

result := mockReadBatch{}
for _, item := range items {
result = append(result, item)
Expand Down
4 changes: 4 additions & 0 deletions chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type IndexQuery struct {
// - If neither is set, must read all keys for that row.
RangeValuePrefix []byte
RangeValueStart []byte

// Filters for querying
ValueEqual []byte
}

// IndexEntry describes an entry in the chunk index
Expand Down Expand Up @@ -492,6 +495,7 @@ func (v6Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, labelName),
RangeValueStart: encodeRangeKey(encodedFromBytes),
ValueEqual: []byte(labelValue),
},
}, nil
}
Expand Down