From 97330a798c3ae5be1d5e69da647ed9a67ddef4db Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Tue, 18 Apr 2017 14:56:48 +0100 Subject: [PATCH 1/4] Filter equality matchers on the dynamo side --- chunk/aws_storage_client.go | 10 ++ chunk/aws_storage_client_test.go | 166 ++++++++++++++++++++++++++++++- chunk/inmemory_storage_client.go | 13 +++ chunk/schema.go | 4 + 4 files changed, 191 insertions(+), 2 deletions(-) diff --git a/chunk/aws_storage_client.go b/chunk/aws_storage_client.go index 4822ef38daa..15e4be85e66 100644 --- a/chunk/aws_storage_client.go +++ b/chunk/aws_storage_client.go @@ -240,6 +240,16 @@ func (a awsStorageClient) QueryPages(ctx context.Context, entry IndexEntry, call } } + // Filters + if entry.ValueEq != nil { + input.KeyConditions[valueKey] = &dynamodb.Condition{ + AttributeValueList: []*dynamodb.AttributeValue{ + {B: entry.ValueEq}, + }, + ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq), + } + } + request := a.queryRequestFn(input) backoff := minBackoff for page := request; page != nil; page = page.NextPage() { diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index 5861e9f5d38..e67c0e1d51e 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -101,11 +101,43 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe Items: []map[string]*dynamodb.AttributeValue{}, } + // Required filters hashValue := *input.KeyConditions[hashKey].AttributeValueList[0].S - items := m.tables[*input.TableName].items[hashValue] - // TODO we should also filter by range value + // Optional filters + var ( + rangeValueFilter []byte + rangeValueFilterType string + valueFilter []byte + valueFilterType *string + ) + if input.KeyConditions[rangeKey] != nil { + rangeValueFilter = input.KeyConditions[rangeKey].AttributeValueList[0].B + rangeValueFilterType = *input.KeyConditions[rangeKey].ComparisonOperator + } + if input.KeyConditions[valueKey] != nil { + valueFilter = input.KeyConditions[valueKey].AttributeValueList[0].B + valueFilterType = input.KeyConditions[valueKey].ComparisonOperator + } + + // Filter by HashValue, RangeValue and Value if it exists + items := m.tables[*input.TableName].items[hashValue] for _, item := range items { + rangeValue := item[rangeKey].B + if rangeValueFilterType == "GE" && bytes.Compare(rangeValue, rangeValueFilter) < 0 { + continue + } + if rangeValueFilterType == "BEGINS_WITH" && !bytes.HasPrefix(rangeValue, rangeValueFilter) { + continue + } + + if item[valueKey] != nil { + value := item[valueKey].B + if valueFilter != nil && *valueFilterType == "EQ" && !bytes.Equal(value, valueFilter) { + continue + } + } + result.Items = append(result.Items, item) } @@ -170,6 +202,136 @@ func TestDynamoDBClient(t *testing.T) { } } +func TestDynamoDBClientQueryPages(t *testing.T) { + dynamoDB := newMockDynamoDB(0, 0) + client := awsStorageClient{ + DynamoDB: dynamoDB, + queryRequestFn: dynamoDB.queryRequest, + } + + entries := []IndexEntry{ + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:3"), + Value: []byte("30"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:1"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:2"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:3"), + Value: []byte("abc"), + }, + } + + tests := []struct { + name string + entry IndexEntry + want []IndexEntry + }{ + { + "check HashValue only", + IndexEntry{ + TableName: "table", + HashValue: "flip", + }, + []IndexEntry{entries[5], entries[6], entries[7]}, + }, + { + "check RangeValueStart", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValueStart: []byte("bar:2"), + }, + []IndexEntry{entries[1], entries[2], entries[3], entries[4]}, + }, + { + "check RangeValuePrefix", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("baz:"), + }, + []IndexEntry{entries[3], entries[4]}, + }, + { + "check ValueEq", + IndexEntry{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEq: []byte("20"), + }, + []IndexEntry{entries[1]}, + }, + } + + batch := client.NewWriteBatch() + for _, entry := range entries { + batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } + dynamoDB.createTable("table") + + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var have []IndexEntry + err := client.QueryPages(context.Background(), tt.entry, func(read ReadBatch, lastPage bool) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, IndexEntry{ + TableName: tt.entry.TableName, + HashValue: tt.entry.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return !lastPage + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) + }) + } +} + func TestAWSConfigFromURL(t *testing.T) { for _, tc := range []struct { url string diff --git a/chunk/inmemory_storage_client.go b/chunk/inmemory_storage_client.go index 5802215b2c2..3f17b233a86 100644 --- a/chunk/inmemory_storage_client.go +++ b/chunk/inmemory_storage_client.go @@ -193,6 +193,19 @@ func (m *MockStorage) QueryPages(_ context.Context, entry IndexEntry, callback f log.Debugf("Lookup %s/* (%d)", entry.HashValue, len(items)) } + // Filters + if entry.ValueEq != nil { + log.Debugf("Filter Value EQ = %s", entry.ValueEq) + + filtered := make([]mockItem, 0) + for _, v := range items { + if bytes.Equal(v.value, entry.ValueEq) { + filtered = append(filtered, v) + } + } + items = filtered + } + result := mockReadBatch{} for _, item := range items { result = append(result, item) diff --git a/chunk/schema.go b/chunk/schema.go index bee4e3de345..c0ad741e3cd 100644 --- a/chunk/schema.go +++ b/chunk/schema.go @@ -52,6 +52,9 @@ type IndexEntry struct { // - If neither is set, must read all keys for that row. RangeValuePrefix []byte RangeValueStart []byte + + // Filters for querying + ValueEq []byte } // v1Schema was: @@ -410,6 +413,7 @@ func (v6Entries) GetReadMetricLabelValueEntries(from, _ uint32, tableName, hashK TableName: tableName, HashValue: hashKey + ":" + string(labelName), RangeValueStart: buildRangeKey(encodedFromBytes), + ValueEq: []byte(labelValue), }, }, nil } From a6bad8f5821c7799d3cb38baca55373f5a035d04 Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Wed, 19 Apr 2017 14:52:16 +0100 Subject: [PATCH 2/4] Rename ValueEq to ValueEqual --- chunk/aws_storage_client.go | 4 ++-- chunk/aws_storage_client_test.go | 4 ++-- chunk/inmemory_storage_client.go | 6 +++--- chunk/schema.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/chunk/aws_storage_client.go b/chunk/aws_storage_client.go index 15e4be85e66..b8a36cc83cc 100644 --- a/chunk/aws_storage_client.go +++ b/chunk/aws_storage_client.go @@ -241,10 +241,10 @@ func (a awsStorageClient) QueryPages(ctx context.Context, entry IndexEntry, call } // Filters - if entry.ValueEq != nil { + if entry.ValueEqual != nil { input.KeyConditions[valueKey] = &dynamodb.Condition{ AttributeValueList: []*dynamodb.AttributeValue{ - {B: entry.ValueEq}, + {B: entry.ValueEqual}, }, ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq), } diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index e67c0e1d51e..8a544b43101 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -292,12 +292,12 @@ func TestDynamoDBClientQueryPages(t *testing.T) { []IndexEntry{entries[3], entries[4]}, }, { - "check ValueEq", + "check ValueEqual", IndexEntry{ TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar"), - ValueEq: []byte("20"), + ValueEqual: []byte("20"), }, []IndexEntry{entries[1]}, }, diff --git a/chunk/inmemory_storage_client.go b/chunk/inmemory_storage_client.go index 3f17b233a86..1c087452a7b 100644 --- a/chunk/inmemory_storage_client.go +++ b/chunk/inmemory_storage_client.go @@ -194,12 +194,12 @@ func (m *MockStorage) QueryPages(_ context.Context, entry IndexEntry, callback f } // Filters - if entry.ValueEq != nil { - log.Debugf("Filter Value EQ = %s", entry.ValueEq) + if entry.ValueEqual != nil { + log.Debugf("Filter Value EQ = %s", entry.ValueEqual) filtered := make([]mockItem, 0) for _, v := range items { - if bytes.Equal(v.value, entry.ValueEq) { + if bytes.Equal(v.value, entry.ValueEqual) { filtered = append(filtered, v) } } diff --git a/chunk/schema.go b/chunk/schema.go index c0ad741e3cd..d8bf4d2f4fd 100644 --- a/chunk/schema.go +++ b/chunk/schema.go @@ -54,7 +54,7 @@ type IndexEntry struct { RangeValueStart []byte // Filters for querying - ValueEq []byte + ValueEqual []byte } // v1Schema was: @@ -413,7 +413,7 @@ func (v6Entries) GetReadMetricLabelValueEntries(from, _ uint32, tableName, hashK TableName: tableName, HashValue: hashKey + ":" + string(labelName), RangeValueStart: buildRangeKey(encodedFromBytes), - ValueEq: []byte(labelValue), + ValueEqual: []byte(labelValue), }, }, nil } From 84dffe22f7d0277ccbf8d94ed7b68a183373deb6 Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Wed, 19 Apr 2017 14:55:24 +0100 Subject: [PATCH 3/4] Tidy up filter code --- chunk/aws_storage_client_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index 8a544b43101..4e9161e0ae7 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -111,13 +111,13 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe valueFilter []byte valueFilterType *string ) - if input.KeyConditions[rangeKey] != nil { - rangeValueFilter = input.KeyConditions[rangeKey].AttributeValueList[0].B - rangeValueFilterType = *input.KeyConditions[rangeKey].ComparisonOperator + if c, ok := input.KeyConditions[rangeKey]; ok { + rangeValueFilter = c.AttributeValueList[0].B + rangeValueFilterType = *c.ComparisonOperator } - if input.KeyConditions[valueKey] != nil { - valueFilter = input.KeyConditions[valueKey].AttributeValueList[0].B - valueFilterType = input.KeyConditions[valueKey].ComparisonOperator + if c, ok := input.KeyConditions[valueKey]; ok { + valueFilter = c.AttributeValueList[0].B + valueFilterType = c.ComparisonOperator } // Filter by HashValue, RangeValue and Value if it exists From f0225f06f7125e66c9718f5771e17f3dcc66661c Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Wed, 19 Apr 2017 14:58:16 +0100 Subject: [PATCH 4/4] Use dynamoDB constants --- chunk/aws_storage_client_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/chunk/aws_storage_client_test.go b/chunk/aws_storage_client_test.go index 4e9161e0ae7..4319f7a45f0 100644 --- a/chunk/aws_storage_client_test.go +++ b/chunk/aws_storage_client_test.go @@ -124,16 +124,16 @@ func (m *mockDynamoDBClient) queryRequest(input *dynamodb.QueryInput) dynamoDBRe items := m.tables[*input.TableName].items[hashValue] for _, item := range items { rangeValue := item[rangeKey].B - if rangeValueFilterType == "GE" && bytes.Compare(rangeValue, rangeValueFilter) < 0 { + if rangeValueFilterType == dynamodb.ComparisonOperatorGe && bytes.Compare(rangeValue, rangeValueFilter) < 0 { continue } - if rangeValueFilterType == "BEGINS_WITH" && !bytes.HasPrefix(rangeValue, rangeValueFilter) { + if rangeValueFilterType == dynamodb.ComparisonOperatorBeginsWith && !bytes.HasPrefix(rangeValue, rangeValueFilter) { continue } if item[valueKey] != nil { value := item[valueKey].B - if valueFilter != nil && *valueFilterType == "EQ" && !bytes.Equal(value, valueFilter) { + if valueFilter != nil && *valueFilterType == dynamodb.ComparisonOperatorEq && !bytes.Equal(value, valueFilter) { continue } }