Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions chunk/aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ func (a awsStorageClient) QueryPages(ctx context.Context, entry IndexEntry, call
}
}

// Filters
if entry.ValueEqual != nil {
input.KeyConditions[valueKey] = &dynamodb.Condition{
AttributeValueList: []*dynamodb.AttributeValue{
{B: entry.ValueEqual},
},
ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq),
}
}

request := a.queryRequestFn(input)
backoff := minBackoff
for page := request; page != nil; page = page.NextPage() {
Expand Down
166 changes: 164 additions & 2 deletions chunk/aws_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,43 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe
Items: []map[string]*dynamodb.AttributeValue{},
}

// Required filters
hashValue := *input.KeyConditions[hashKey].AttributeValueList[0].S
items := m.tables[*input.TableName].items[hashValue]

// TODO we should also filter by range value
// Optional filters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for implementing this! I was being lazy.

var (
rangeValueFilter []byte
rangeValueFilterType string
valueFilter []byte
valueFilterType *string
)
if c, ok := input.KeyConditions[rangeKey]; ok {
rangeValueFilter = c.AttributeValueList[0].B
rangeValueFilterType = *c.ComparisonOperator
}
if c, ok := input.KeyConditions[valueKey]; ok {
valueFilter = c.AttributeValueList[0].B
valueFilterType = c.ComparisonOperator
}

// Filter by HashValue, RangeValue and Value if it exists
items := m.tables[*input.TableName].items[hashValue]
for _, item := range items {
rangeValue := item[rangeKey].B
if rangeValueFilterType == dynamodb.ComparisonOperatorGe && bytes.Compare(rangeValue, rangeValueFilter) < 0 {
continue
}
if rangeValueFilterType == dynamodb.ComparisonOperatorBeginsWith && !bytes.HasPrefix(rangeValue, rangeValueFilter) {
continue
}

if item[valueKey] != nil {
value := item[valueKey].B
if valueFilter != nil && *valueFilterType == dynamodb.ComparisonOperatorEq && !bytes.Equal(value, valueFilter) {
continue
}
}

result.Items = append(result.Items, item)
}

Expand Down Expand Up @@ -170,6 +202,136 @@ func TestDynamoDBClient(t *testing.T) {
}
}

func TestDynamoDBClientQueryPages(t *testing.T) {
dynamoDB := newMockDynamoDB(0, 0)
client := awsStorageClient{
DynamoDB: dynamoDB,
queryRequestFn: dynamoDB.queryRequest,
}

entries := []IndexEntry{
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("bar:1"),
Value: []byte("10"),
},
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("bar:2"),
Value: []byte("20"),
},
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("bar:3"),
Value: []byte("30"),
},
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("baz:1"),
Value: []byte("10"),
},
{
TableName: "table",
HashValue: "foo",
RangeValue: []byte("baz:2"),
Value: []byte("20"),
},
{
TableName: "table",
HashValue: "flip",
RangeValue: []byte("bar:1"),
Value: []byte("abc"),
},
{
TableName: "table",
HashValue: "flip",
RangeValue: []byte("bar:2"),
Value: []byte("abc"),
},
{
TableName: "table",
HashValue: "flip",
RangeValue: []byte("bar:3"),
Value: []byte("abc"),
},
}

tests := []struct {
name string
entry IndexEntry
want []IndexEntry
}{
{
"check HashValue only",
IndexEntry{
TableName: "table",
HashValue: "flip",
},
[]IndexEntry{entries[5], entries[6], entries[7]},
},
{
"check RangeValueStart",
IndexEntry{
TableName: "table",
HashValue: "foo",
RangeValueStart: []byte("bar:2"),
},
[]IndexEntry{entries[1], entries[2], entries[3], entries[4]},
},
{
"check RangeValuePrefix",
IndexEntry{
TableName: "table",
HashValue: "foo",
RangeValuePrefix: []byte("baz:"),
},
[]IndexEntry{entries[3], entries[4]},
},
{
"check ValueEqual",
IndexEntry{
TableName: "table",
HashValue: "foo",
RangeValuePrefix: []byte("bar"),
ValueEqual: []byte("20"),
},
[]IndexEntry{entries[1]},
},
}

batch := client.NewWriteBatch()
for _, entry := range entries {
batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value)
}
dynamoDB.createTable("table")

err := client.BatchWrite(context.Background(), batch)
require.NoError(t, err)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var have []IndexEntry
err := client.QueryPages(context.Background(), tt.entry, func(read ReadBatch, lastPage bool) bool {
for i := 0; i < read.Len(); i++ {
have = append(have, IndexEntry{
TableName: tt.entry.TableName,
HashValue: tt.entry.HashValue,
RangeValue: read.RangeValue(i),
Value: read.Value(i),
})
}
return !lastPage
})
require.NoError(t, err)
require.Equal(t, tt.want, have)
})
}
}

func TestAWSConfigFromURL(t *testing.T) {
for _, tc := range []struct {
url string
Expand Down
13 changes: 13 additions & 0 deletions chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ func (m *MockStorage) QueryPages(_ context.Context, entry IndexEntry, callback f
log.Debugf("Lookup %s/* (%d)", entry.HashValue, len(items))
}

// Filters
if entry.ValueEqual != nil {
log.Debugf("Filter Value EQ = %s", entry.ValueEqual)

filtered := make([]mockItem, 0)
for _, v := range items {
if bytes.Equal(v.value, entry.ValueEqual) {
filtered = append(filtered, v)
}
}
items = filtered
}

result := mockReadBatch{}
for _, item := range items {
result = append(result, item)
Expand Down
4 changes: 4 additions & 0 deletions chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type IndexEntry struct {
// - If neither is set, must read all keys for that row.
RangeValuePrefix []byte
RangeValueStart []byte

// Filters for querying
ValueEqual []byte
}

// v1Schema was:
Expand Down Expand Up @@ -410,6 +413,7 @@ func (v6Entries) GetReadMetricLabelValueEntries(from, _ uint32, tableName, hashK
TableName: tableName,
HashValue: hashKey + ":" + string(labelName),
RangeValueStart: buildRangeKey(encodedFromBytes),
ValueEqual: []byte(labelValue),
},
}, nil
}