From 1b870e9e2dc5ae3590ec8b57f739385a32a9d967 Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Tue, 18 Apr 2017 14:56:48 +0100 Subject: [PATCH 1/7] Filter equality matchers on the dynamo side --- chunk/aws_storage_client.go | 10 ++ chunk/aws_storage_client_test.go | 166 ++++++++++++++++++++++++++++++- chunk/inmemory_storage_client.go | 13 +++ chunk/schema.go | 4 + 4 files changed, 191 insertions(+), 2 deletions(-) diff --git a/chunk/aws_storage_client.go b/chunk/aws_storage_client.go index a344604d4d4..53758ccb710 100644 --- a/chunk/aws_storage_client.go +++ b/chunk/aws_storage_client.go @@ -240,6 +240,16 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call } } + // Filters + if entry.ValueEq != nil { + input.KeyConditions[valueKey] = &dynamodb.Condition{ + AttributeValueList: []*dynamodb.AttributeValue{ + {B: entry.ValueEq}, + }, + ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq), + } + } + request := a.queryRequestFn(input) backoff := minBackoff for page := request; page != nil; page = page.NextPage() { diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index c3b4fc15e79..0a8b87014ce 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -101,11 +101,43 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe Items: []map[string]*dynamodb.AttributeValue{}, } + // Required filters hashValue := *input.KeyConditions[hashKey].AttributeValueList[0].S - items := m.tables[*input.TableName].items[hashValue] - // TODO we should also filter by range value + // Optional filters + var ( + rangeValueFilter []byte + rangeValueFilterType string + valueFilter []byte + valueFilterType *string + ) + if input.KeyConditions[rangeKey] != nil { + rangeValueFilter = input.KeyConditions[rangeKey].AttributeValueList[0].B + rangeValueFilterType = *input.KeyConditions[rangeKey].ComparisonOperator + } + if input.KeyConditions[valueKey] != nil { + valueFilter = input.KeyConditions[valueKey].AttributeValueList[0].B + valueFilterType = input.KeyConditions[valueKey].ComparisonOperator + } + + // Filter by HashValue, RangeValue and Value if it exists + items := m.tables[*input.TableName].items[hashValue] for _, item := range items { + rangeValue := item[rangeKey].B + if rangeValueFilterType == "GE" && bytes.Compare(rangeValue, rangeValueFilter) < 0 { + continue + } + if rangeValueFilterType == "BEGINS_WITH" && !bytes.HasPrefix(rangeValue, rangeValueFilter) { + continue + } + + if item[valueKey] != nil { + value := item[valueKey].B + if valueFilter != nil && *valueFilterType == "EQ" && !bytes.Equal(value, valueFilter) { + continue + } + } + result.Items = append(result.Items, item) } @@ -170,6 +202,136 @@ func TestDynamoDBClient(t *testing.T) { } } +func TestDynamoDBClientQueryPages(t *testing.T) { + dynamoDB := newMockDynamoDB(0, 0) + client := awsStorageClient{ + DynamoDB: dynamoDB, + queryRequestFn: dynamoDB.queryRequest, + } + + entries := []IndexEntry{ + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:3"), + Value: []byte("30"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:1"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:2"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:3"), + Value: []byte("abc"), + }, + } + + tests := []struct { + name string + entry IndexEntry + want []IndexEntry + }{ + { + "check HashValue only", + IndexEntry{ + TableName: "table", + HashValue: "flip", + }, + []IndexEntry{entries[5], entries[6], entries[7]}, + }, + { + "check RangeValueStart", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValueStart: []byte("bar:2"), + }, + []IndexEntry{entries[1], entries[2], entries[3], entries[4]}, + }, + { + "check RangeValuePrefix", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("baz:"), + }, + []IndexEntry{entries[3], entries[4]}, + }, + { + "check ValueEq", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEq: []byte("20"), + }, + []IndexEntry{entries[1]}, + }, + } + + batch := client.NewWriteBatch() + for _, entry := range entries { + batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } + dynamoDB.createTable("table") + + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var have []IndexEntry + err := client.QueryPages(context.Background(), tt.entry, func(read ReadBatch, lastPage bool) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, IndexEntry{ + TableName: tt.entry.TableName, + HashValue: tt.entry.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return !lastPage + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) + }) + } +} + func TestAWSConfigFromURL(t *testing.T) { for _, tc := range []struct { url string diff --git a/chunk/inmemory_storage_client.go b/chunk/inmemory_storage_client.go index 954905271fa..21ae14a8a2c 100644 --- a/chunk/inmemory_storage_client.go +++ b/chunk/inmemory_storage_client.go @@ -207,6 +207,19 @@ func (m *MockStorage) QueryPages(_ context.Context, query IndexQuery, callback f log.Debugf("Lookup %s/* (%d)", query.HashValue, len(items)) } + // Filters + if entry.ValueEq != nil { + log.Debugf("Filter Value EQ = %s", entry.ValueEq) + + filtered := make([]mockItem, 0) + for _, v := range items { + if bytes.Equal(v.value, entry.ValueEq) { + filtered = append(filtered, v) + } + } + items = filtered + } + result := mockReadBatch{} for _, item := range items { result = append(result, item) diff --git a/chunk/schema.go b/chunk/schema.go index 0f241dddfdb..499a22e9172 100644 --- a/chunk/schema.go +++ b/chunk/schema.go @@ -48,6 +48,9 @@ type IndexQuery struct { // - If neither is set, must read all keys for that row. RangeValuePrefix []byte RangeValueStart []byte + + // Filters for querying + ValueEq []byte } // IndexEntry describes an entry in the chunk index @@ -492,6 +495,7 @@ func (v6Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. TableName: bucket.tableName, HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, labelName), RangeValueStart: encodeRangeKey(encodedFromBytes), + ValueEq: []byte(labelValue), }, }, nil } From feb7134ce6bc013597f31302712cbcdf5ca5c301 Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Wed, 19 Apr 2017 14:52:16 +0100 Subject: [PATCH 2/7] Rename ValueEq to ValueEqual --- chunk/aws_storage_client.go | 4 ++-- chunk/aws_storage_client_test.go | 4 ++-- chunk/inmemory_storage_client.go | 6 +++--- chunk/schema.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/chunk/aws_storage_client.go b/chunk/aws_storage_client.go index 53758ccb710..8f3d088b3f2 100644 --- a/chunk/aws_storage_client.go +++ b/chunk/aws_storage_client.go @@ -241,10 +241,10 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call } // Filters - if entry.ValueEq != nil { + if entry.ValueEqual != nil { input.KeyConditions[valueKey] = &dynamodb.Condition{ AttributeValueList: []*dynamodb.AttributeValue{ - {B: entry.ValueEq}, + {B: entry.ValueEqual}, }, ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq), } diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index 0a8b87014ce..fc2695f3681 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -292,12 +292,12 @@ func TestDynamoDBClientQueryPages(t *testing.T) { []IndexEntry{entries[3], entries[4]}, }, { - "check ValueEq", + "check ValueEqual", IndexEntry{ TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar"), - ValueEq: []byte("20"), + ValueEqual: []byte("20"), }, []IndexEntry{entries[1]}, }, diff --git a/chunk/inmemory_storage_client.go b/chunk/inmemory_storage_client.go index 21ae14a8a2c..aa508c6b460 100644 --- a/chunk/inmemory_storage_client.go +++ b/chunk/inmemory_storage_client.go @@ -208,12 +208,12 @@ func (m *MockStorage) QueryPages(_ context.Context, query IndexQuery, callback f } // Filters - if entry.ValueEq != nil { - log.Debugf("Filter Value EQ = %s", entry.ValueEq) + if entry.ValueEqual != nil { + log.Debugf("Filter Value EQ = %s", entry.ValueEqual) filtered := make([]mockItem, 0) for _, v := range items { - if bytes.Equal(v.value, entry.ValueEq) { + if bytes.Equal(v.value, entry.ValueEqual) { filtered = append(filtered, v) } } diff --git a/chunk/schema.go b/chunk/schema.go index 499a22e9172..225eda5bdbc 100644 --- a/chunk/schema.go +++ b/chunk/schema.go @@ -50,7 +50,7 @@ type IndexQuery struct { RangeValueStart []byte // Filters for querying - ValueEq []byte + ValueEqual []byte } // IndexEntry describes an entry in the chunk index @@ -495,7 +495,7 @@ func (v6Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model. TableName: bucket.tableName, HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, labelName), RangeValueStart: encodeRangeKey(encodedFromBytes), - ValueEq: []byte(labelValue), + ValueEqual: []byte(labelValue), }, }, nil } From c1998ed72361e6163d141bfd28a25573d32eb5e5 Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Wed, 19 Apr 2017 14:55:24 +0100 Subject: [PATCH 3/7] Tidy up filter code --- chunk/aws_storage_client_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index fc2695f3681..592d01284d7 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -111,13 +111,13 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe valueFilter []byte valueFilterType *string ) - if input.KeyConditions[rangeKey] != nil { - rangeValueFilter = input.KeyConditions[rangeKey].AttributeValueList[0].B - rangeValueFilterType = *input.KeyConditions[rangeKey].ComparisonOperator + if c, ok := input.KeyConditions[rangeKey]; ok { + rangeValueFilter = c.AttributeValueList[0].B + rangeValueFilterType = *c.ComparisonOperator } - if input.KeyConditions[valueKey] != nil { - valueFilter = input.KeyConditions[valueKey].AttributeValueList[0].B - valueFilterType = input.KeyConditions[valueKey].ComparisonOperator + if c, ok := input.KeyConditions[valueKey]; ok { + valueFilter = c.AttributeValueList[0].B + valueFilterType = c.ComparisonOperator } // Filter by HashValue, RangeValue and Value if it exists From 7f54d04d7b09175694e4f8961063d277c096ece3 Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Wed, 19 Apr 2017 14:58:16 +0100 Subject: [PATCH 4/7] Use dynamoDB constants --- chunk/aws_storage_client_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index 592d01284d7..b0c3084b5f6 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -124,16 +124,16 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe items := m.tables[*input.TableName].items[hashValue] for _, item := range items { rangeValue := item[rangeKey].B - if rangeValueFilterType == "GE" && bytes.Compare(rangeValue, rangeValueFilter) < 0 { + if rangeValueFilterType == dynamodb.ComparisonOperatorGe && bytes.Compare(rangeValue, rangeValueFilter) < 0 { continue } - if rangeValueFilterType == "BEGINS_WITH" && !bytes.HasPrefix(rangeValue, rangeValueFilter) { + if rangeValueFilterType == dynamodb.ComparisonOperatorBeginsWith && !bytes.HasPrefix(rangeValue, rangeValueFilter) { continue } if item[valueKey] != nil { value := item[valueKey].B - if valueFilter != nil && *valueFilterType == "EQ" && !bytes.Equal(value, valueFilter) { + if valueFilter != nil && *valueFilterType == dynamodb.ComparisonOperatorEq && !bytes.Equal(value, valueFilter) { continue } } From e5c1bc49e3c657d19db1ee52341386321abb83f8 Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Fri, 21 Apr 2017 10:14:11 +0100 Subject: [PATCH 5/7] Use filterExpression --- chunk/aws_storage_client.go | 8 ++++---- chunk/aws_storage_client_test.go | 15 +++++++-------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/chunk/aws_storage_client.go b/chunk/aws_storage_client.go index 8f3d088b3f2..c6c25437065 100644 --- a/chunk/aws_storage_client.go +++ b/chunk/aws_storage_client.go @@ -242,11 +242,11 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call // Filters if entry.ValueEqual != nil { - input.KeyConditions[valueKey] = &dynamodb.Condition{ - AttributeValueList: []*dynamodb.AttributeValue{ - {B: entry.ValueEqual}, + input.FilterExpression = aws.String(fmt.Sprintf("%s = :v", valueKey)) + input.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ + ":v": { + B: entry.ValueEqual, }, - ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq), } } diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index b0c3084b5f6..ee8a9428ae4 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -108,17 +108,11 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe var ( rangeValueFilter []byte rangeValueFilterType string - valueFilter []byte - valueFilterType *string ) if c, ok := input.KeyConditions[rangeKey]; ok { rangeValueFilter = c.AttributeValueList[0].B rangeValueFilterType = *c.ComparisonOperator } - if c, ok := input.KeyConditions[valueKey]; ok { - valueFilter = c.AttributeValueList[0].B - valueFilterType = c.ComparisonOperator - } // Filter by HashValue, RangeValue and Value if it exists items := m.tables[*input.TableName].items[hashValue] @@ -133,8 +127,13 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe if item[valueKey] != nil { value := item[valueKey].B - if valueFilter != nil && *valueFilterType == dynamodb.ComparisonOperatorEq && !bytes.Equal(value, valueFilter) { - continue + + // Apply filterExpression if it exists (supporting only v = :v) + if input.FilterExpression != nil && *input.FilterExpression == fmt.Sprintf("%s = :v", valueKey) { + filterValue := input.ExpressionAttributeValues[":v"].B + if !bytes.Equal(value, filterValue) { + continue + } } } From caa1ad0e342eca19f2203f4656f014d8a20e773c Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Fri, 21 Apr 2017 12:04:49 +0100 Subject: [PATCH 6/7] Warn if unsupported FilterExpression used in tests --- chunk/aws_storage_client_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index ee8a9428ae4..105651fcbbb 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/prometheus/common/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/context" @@ -129,10 +130,14 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe value := item[valueKey].B // Apply filterExpression if it exists (supporting only v = :v) - if input.FilterExpression != nil && *input.FilterExpression == fmt.Sprintf("%s = :v", valueKey) { - filterValue := input.ExpressionAttributeValues[":v"].B - if !bytes.Equal(value, filterValue) { - continue + if input.FilterExpression != nil { + if *input.FilterExpression == fmt.Sprintf("%s = :v", valueKey) { + filterValue := input.ExpressionAttributeValues[":v"].B + if !bytes.Equal(value, filterValue) { + continue + } + } else { + log.Warnf("Unsupported FilterExpression: %s", *input.FilterExpression) } } } From 6cf3480c6c7ff438a0738010aa977422d75e452b Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Fri, 21 Apr 2017 12:46:58 +0100 Subject: [PATCH 7/7] Fix rebase --- chunk/aws_storage_client.go | 4 ++-- chunk/aws_storage_client_test.go | 16 ++++++++-------- chunk/inmemory_storage_client.go | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/chunk/aws_storage_client.go b/chunk/aws_storage_client.go index c6c25437065..cc2d9bb5c17 100644 --- a/chunk/aws_storage_client.go +++ b/chunk/aws_storage_client.go @@ -241,11 +241,11 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call } // Filters - if entry.ValueEqual != nil { + if query.ValueEqual != nil { input.FilterExpression = aws.String(fmt.Sprintf("%s = :v", valueKey)) input.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ ":v": { - B: entry.ValueEqual, + B: query.ValueEqual, }, } } diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index 105651fcbbb..3d8627109a1 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -266,12 +266,12 @@ func TestDynamoDBClientQueryPages(t *testing.T) { tests := []struct { name string - entry IndexEntry + query IndexQuery want []IndexEntry }{ { "check HashValue only", - IndexEntry{ + IndexQuery{ TableName: "table", HashValue: "flip", }, @@ -279,7 +279,7 @@ func TestDynamoDBClientQueryPages(t *testing.T) { }, { "check RangeValueStart", - IndexEntry{ + IndexQuery{ TableName: "table", HashValue: "foo", RangeValueStart: []byte("bar:2"), @@ -288,7 +288,7 @@ func TestDynamoDBClientQueryPages(t *testing.T) { }, { "check RangeValuePrefix", - IndexEntry{ + IndexQuery{ TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("baz:"), @@ -297,7 +297,7 @@ func TestDynamoDBClientQueryPages(t *testing.T) { }, { "check ValueEqual", - IndexEntry{ + IndexQuery{ TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar"), @@ -319,11 +319,11 @@ func TestDynamoDBClientQueryPages(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var have []IndexEntry - err := client.QueryPages(context.Background(), tt.entry, func(read ReadBatch, lastPage bool) bool { + err := client.QueryPages(context.Background(), tt.query, func(read ReadBatch, lastPage bool) bool { for i := 0; i < read.Len(); i++ { have = append(have, IndexEntry{ - TableName: tt.entry.TableName, - HashValue: tt.entry.HashValue, + TableName: tt.query.TableName, + HashValue: tt.query.HashValue, RangeValue: read.RangeValue(i), Value: read.Value(i), }) diff --git a/chunk/inmemory_storage_client.go b/chunk/inmemory_storage_client.go index aa508c6b460..e1458f7c322 100644 --- a/chunk/inmemory_storage_client.go +++ b/chunk/inmemory_storage_client.go @@ -208,12 +208,12 @@ func (m *MockStorage) QueryPages(_ context.Context, query IndexQuery, callback f } // Filters - if entry.ValueEqual != nil { - log.Debugf("Filter Value EQ = %s", entry.ValueEqual) + if query.ValueEqual != nil { + log.Debugf("Filter Value EQ = %s", query.ValueEqual) filtered := make([]mockItem, 0) for _, v := range items { - if bytes.Equal(v.value, entry.ValueEqual) { + if bytes.Equal(v.value, query.ValueEqual) { filtered = append(filtered, v) } }