diff --git a/chunk/aws_storage_client.go b/chunk/aws_storage_client.go index 4822ef38daa..b8a36cc83cc 100644 --- a/chunk/aws_storage_client.go +++ b/chunk/aws_storage_client.go @@ -240,6 +240,16 @@ func (a awsStorageClient) QueryPages(ctx context.Context, entry IndexEntry, call } } + // Filters + if entry.ValueEqual != nil { + input.KeyConditions[valueKey] = &dynamodb.Condition{ + AttributeValueList: []*dynamodb.AttributeValue{ + {B: entry.ValueEqual}, + }, + ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq), + } + } + request := a.queryRequestFn(input) backoff := minBackoff for page := request; page != nil; page = page.NextPage() { diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index 5861e9f5d38..4319f7a45f0 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -101,11 +101,43 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe Items: []map[string]*dynamodb.AttributeValue{}, } + // Required filters hashValue := *input.KeyConditions[hashKey].AttributeValueList[0].S - items := m.tables[*input.TableName].items[hashValue] - // TODO we should also filter by range value + // Optional filters + var ( + rangeValueFilter []byte + rangeValueFilterType string + valueFilter []byte + valueFilterType *string + ) + if c, ok := input.KeyConditions[rangeKey]; ok { + rangeValueFilter = c.AttributeValueList[0].B + rangeValueFilterType = *c.ComparisonOperator + } + if c, ok := input.KeyConditions[valueKey]; ok { + valueFilter = c.AttributeValueList[0].B + valueFilterType = c.ComparisonOperator + } + + // Filter by HashValue, RangeValue and Value if it exists + items := m.tables[*input.TableName].items[hashValue] for _, item := range items { + rangeValue := item[rangeKey].B + if rangeValueFilterType == dynamodb.ComparisonOperatorGe && bytes.Compare(rangeValue, rangeValueFilter) < 0 { + continue + } + if rangeValueFilterType == dynamodb.ComparisonOperatorBeginsWith && !bytes.HasPrefix(rangeValue, rangeValueFilter) { + continue + } + + if item[valueKey] != nil { + value := item[valueKey].B + if valueFilter != nil && *valueFilterType == dynamodb.ComparisonOperatorEq && !bytes.Equal(value, valueFilter) { + continue + } + } + result.Items = append(result.Items, item) } @@ -170,6 +202,136 @@ func TestDynamoDBClient(t *testing.T) { } } +func TestDynamoDBClientQueryPages(t *testing.T) { + dynamoDB := newMockDynamoDB(0, 0) + client := awsStorageClient{ + DynamoDB: dynamoDB, + queryRequestFn: dynamoDB.queryRequest, + } + + entries := []IndexEntry{ + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:3"), + Value: []byte("30"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:1"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:2"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:3"), + Value: []byte("abc"), + }, + } + + tests := []struct { + name string + entry IndexEntry + want []IndexEntry + }{ + { + "check HashValue only", + IndexEntry{ + TableName: "table", + HashValue: "flip", + }, + []IndexEntry{entries[5], entries[6], entries[7]}, + }, + { + "check RangeValueStart", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValueStart: []byte("bar:2"), + }, + []IndexEntry{entries[1], entries[2], entries[3], entries[4]}, + }, + { + "check RangeValuePrefix", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("baz:"), + }, + []IndexEntry{entries[3], entries[4]}, + }, + { + "check ValueEqual", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEqual: []byte("20"), + }, + []IndexEntry{entries[1]}, + }, + } + + batch := client.NewWriteBatch() + for _, entry := range entries { + batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } + dynamoDB.createTable("table") + + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var have []IndexEntry + err := client.QueryPages(context.Background(), tt.entry, func(read ReadBatch, lastPage bool) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, IndexEntry{ + TableName: tt.entry.TableName, + HashValue: tt.entry.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return !lastPage + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) + }) + } +} + func TestAWSConfigFromURL(t *testing.T) { for _, tc := range []struct { url string diff --git a/chunk/inmemory_storage_client.go b/chunk/inmemory_storage_client.go index 5802215b2c2..1c087452a7b 100644 --- a/chunk/inmemory_storage_client.go +++ b/chunk/inmemory_storage_client.go @@ -193,6 +193,19 @@ func (m *MockStorage) QueryPages(_ context.Context, entry IndexEntry, callback f log.Debugf("Lookup %s/* (%d)", entry.HashValue, len(items)) } + // Filters + if entry.ValueEqual != nil { + log.Debugf("Filter Value EQ = %s", entry.ValueEqual) + + filtered := make([]mockItem, 0) + for _, v := range items { + if bytes.Equal(v.value, entry.ValueEqual) { + filtered = append(filtered, v) + } + } + items = filtered + } + result := mockReadBatch{} for _, item := range items { result = append(result, item) diff --git a/chunk/schema.go b/chunk/schema.go index bee4e3de345..d8bf4d2f4fd 100644 --- a/chunk/schema.go +++ b/chunk/schema.go @@ -52,6 +52,9 @@ type IndexEntry struct { // - If neither is set, must read all keys for that row. RangeValuePrefix []byte RangeValueStart []byte + + // Filters for querying + ValueEqual []byte } // v1Schema was: @@ -410,6 +413,7 @@ func (v6Entries) GetReadMetricLabelValueEntries(from, _ uint32, tableName, hashK TableName: tableName, HashValue: hashKey + ":" + string(labelName), RangeValueStart: buildRangeKey(encodedFromBytes), + ValueEqual: []byte(labelValue), }, }, nil }