From dc881030eda3afc662fc87c301a753f26e0dbe5d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 29 Aug 2018 16:01:13 +0100 Subject: [PATCH 1/4] Batch index lookups. Signed-off-by: Tom Wilkie --- pkg/chunk/aws/storage_client.go | 7 +- pkg/chunk/cassandra/storage_client.go | 7 +- pkg/chunk/chunk_store.go | 40 +------- pkg/chunk/gcp/storage_client.go | 90 ++++++++++++++++- pkg/chunk/inmemory_storage_client.go | 21 +++- pkg/chunk/storage/caching_storage_client.go | 104 ++++++-------------- pkg/chunk/storage/index_test.go | 12 ++- pkg/chunk/storage_client.go | 2 +- pkg/chunk/util/util.go | 75 ++++++++++++++ 9 files changed, 235 insertions(+), 123 deletions(-) create mode 100644 pkg/chunk/util/util.go diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 3bf4b168694..739dbac066a 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -29,6 +29,7 @@ import ( "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/chunk" + chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" "github.com/weaveworks/cortex/pkg/util" ) @@ -288,7 +289,11 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e return backoff.Err() } -func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + return chunk_util.DoParallelQueries(ctx, s.query, queries, callback) +} + +func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index 54d40be2f39..7df057e61e7 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/util" ) const ( @@ -172,7 +173,11 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return nil } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + return util.DoParallelQueries(ctx, s.query, queries, callback) +} + +func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { var q *gocql.Query switch { diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 3edfeb9cdc0..d86366d1175 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -412,38 +412,8 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode } func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { - incomingEntries := make(chan []IndexEntry) - incomingErrors := make(chan error) - for _, query := range queries { - go func(query IndexQuery) { - entries, err := c.lookupEntriesByQuery(ctx, query) - if err != nil { - incomingErrors <- err - } else { - incomingEntries <- entries - } - }(query) - } - - // Combine the results into one slice - var entries []IndexEntry - var lastErr error - for i := 0; i < len(queries); i++ { - select { - case incoming := <-incomingEntries: - entries = append(entries, incoming...) - case err := <-incomingErrors: - lastErr = err - } - } - - return entries, lastErr -} - -func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry - - if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) { + err := c.storage.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { for i := 0; i < resp.Len(); i++ { entries = append(entries, IndexEntry{ TableName: query.TableName, @@ -453,12 +423,8 @@ func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]I }) } return true - }); err != nil { - level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err) - return nil, err - } - - return entries, nil + }) + return entries, err } func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index 18b779c2e9f..888ca2bbe6c 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" + chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" "github.com/weaveworks/cortex/pkg/util" ) @@ -162,7 +163,88 @@ func (s *storageClientColumnKey) BatchWrite(ctx context.Context, batch chunk.Wri return nil } -func (s *storageClientColumnKey) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages") + defer sp.Finish() + + // A limitation of this approach is that this only fetches whole rows; but + // whatever, we filter them in the cache on the client. But for unit tests to + // pass, we must do this. + callback = chunk_util.QueryFilter(callback) + + type tableQuery struct { + name string + queries map[string]chunk.IndexQuery + rows bigtable.RowList + } + + tableQueries := map[string]tableQuery{} + for _, query := range queries { + tq, ok := tableQueries[query.TableName] + if !ok { + tq = tableQuery{ + name: query.TableName, + queries: map[string]chunk.IndexQuery{}, + } + } + tq.queries[query.HashValue] = query + tq.rows = append(tq.rows, query.HashValue) + tableQueries[query.TableName] = tq + } + + errs := make(chan error) + + for _, tq := range tableQueries { + + table := s.client.Open(tq.name) + for i := 0; i < len(tq.rows); i += maxRowReads { + + page := tq.rows[i:util.Min(i+maxRowReads, len(tq.rows))] + go func(page bigtable.RowList, tq tableQuery) { + var processingErr error + // rows are returned in key order, not order in row list + err := table.ReadRows(ctx, page, func(row bigtable.Row) bool { + + query, ok := tq.queries[row.Key()] + if !ok { + processingErr = errors.WithStack(fmt.Errorf("Got row for unknown chunk: %s", row.Key())) + return false + } + + val, ok := row[columnFamily] + if !ok { + // There are no matching rows. + return true + } + + return callback(query, bigtableReadBatchColumnKey{ + items: val, + columnPrefix: columnFamily + ":", + }) + }) + + if processingErr != nil { + errs <- processingErr + } else { + errs <- err + } + }(page, tq) + } + } + + var lastErr error + for _, tq := range tableQueries { + for i := 0; i < len(tq.rows); i += maxRowReads { + err := <-errs + if err != nil { + lastErr = err + } + } + } + return lastErr +} + +func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { const null = string('\xff') sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) @@ -343,7 +425,11 @@ func (s *storageClientColumnKey) GetChunks(ctx context.Context, input []chunk.Ch return output, nil } -func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClientV1) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + return chunk_util.DoParallelQueries(ctx, s.query, queries, callback) +} + +func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { const null = string('\xff') sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index c95f7e5a4ea..452cb99eb17 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -158,13 +158,26 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { } // QueryPages implements StorageClient. -func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error { - logger := util.WithContext(ctx, util.Logger) - level.Debug(logger).Log("msg", "QueryPages", "query", query.HashValue) - +func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error { m.mtx.RLock() defer m.mtx.RUnlock() + for _, query := range queries { + err := m.query(ctx, query, func(b ReadBatch) bool { + return callback(query, b) + }) + if err != nil { + return err + } + } + + return nil +} + +func (m *MockStorage) query(ctx context.Context, query IndexQuery, callback func(ReadBatch) (shouldContinue bool)) error { + logger := util.WithContext(ctx, util.Logger) + level.Debug(logger).Log("msg", "QueryPages", "query", query.HashValue) + table, ok := m.tables[query.TableName] if !ok { return fmt.Errorf("table not found") diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 5b58078b2af..9b5b1f941fd 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -1,13 +1,12 @@ package storage import ( - "bytes" "context" - "strings" "time" "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/cache" + chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" ) type cachingStorageClient struct { @@ -27,87 +26,48 @@ func newCachingStorageClient(client chunk.StorageClient, size int, validity time } } -func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { - value, ok := s.cache.Get(ctx, queryKey(query)) - if ok { - batches := value.([]chunk.ReadBatch) - filteredBatch := filterBatchByQuery(query, batches) - callback(filteredBatch) +func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { + // We cache the entire row, so filter client side. + callback = chunk_util.QueryFilter(callback) + cacheableMissed := []chunk.IndexQuery{} + missed := map[string]chunk.IndexQuery{} + + for _, query := range queries { + value, ok := s.cache.Get(ctx, queryKey(query)) + if !ok { + cacheableMissed = append(cacheableMissed, chunk.IndexQuery{ + TableName: query.TableName, + HashValue: query.HashValue, + }) + missed[queryKey(query)] = query + continue + } - return nil + for _, batch := range value.([]chunk.ReadBatch) { + callback(query, batch) + } } - batches := []chunk.ReadBatch{} - cacheableQuery := chunk.IndexQuery{ - TableName: query.TableName, - HashValue: query.HashValue, - } // Just reads the entire row and caches it. - - err := s.StorageClient.QueryPages(ctx, cacheableQuery, copyingCallback(&batches)) + results := map[string][]chunk.ReadBatch{} + err := s.StorageClient.QueryPages(ctx, cacheableMissed, func(cacheableQuery chunk.IndexQuery, r chunk.ReadBatch) bool { + key := queryKey(cacheableQuery) + results[key] = append(results[key], r) + return true + }) if err != nil { return err } - filteredBatch := filterBatchByQuery(query, batches) - callback(filteredBatch) - - s.cache.Put(ctx, queryKey(query), batches) - - return nil -} - -type readBatch []cell - -func (b readBatch) Len() int { return len(b) } -func (b readBatch) RangeValue(i int) []byte { return b[i].column } -func (b readBatch) Value(i int) []byte { return b[i].value } - -type cell struct { - column []byte - value []byte -} - -func copyingCallback(readBatches *[]chunk.ReadBatch) func(chunk.ReadBatch) bool { - return func(result chunk.ReadBatch) bool { - *readBatches = append(*readBatches, result) - return true + for key, batches := range results { + query := missed[key] + for _, batch := range batches { + callback(query, batch) + } } + return nil } func queryKey(q chunk.IndexQuery) string { const sep = "\xff" return q.TableName + sep + q.HashValue } - -func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) readBatch { - filter := func([]byte, []byte) bool { return true } - - if len(query.RangeValuePrefix) != 0 { - filter = func(rangeValue []byte, value []byte) bool { - return strings.HasPrefix(string(rangeValue), string(query.RangeValuePrefix)) - } - } - if len(query.RangeValueStart) != 0 { - filter = func(rangeValue []byte, value []byte) bool { - return string(rangeValue) >= string(query.RangeValueStart) - } - } - if len(query.ValueEqual) != 0 { - // This is on top of the existing filters. - existingFilter := filter - filter = func(rangeValue []byte, value []byte) bool { - return existingFilter(rangeValue, value) && bytes.Equal(value, query.ValueEqual) - } - } - - finalBatch := make(readBatch, 0, len(batches)) // On the higher side for most queries. On the lower side for column key schema. - for _, batch := range batches { - for i := 0; i < batch.Len(); i++ { - if filter(batch.RangeValue(i), batch.Value(i)) { - finalBatch = append(finalBatch, cell{column: batch.RangeValue(i), value: batch.Value(i)}) - } - } - } - - return finalBatch -} diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index 91d8532bb96..e8f8db00761 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -21,12 +21,14 @@ func TestIndexBasic(t *testing.T) { // Make sure we get back the correct entries by hash value. for i := 0; i < 30; i++ { - entry := chunk.IndexQuery{ - TableName: tableName, - HashValue: fmt.Sprintf("hash%d", i), + entries := []chunk.IndexQuery{ + { + TableName: tableName, + HashValue: fmt.Sprintf("hash%d", i), + }, } var have []chunk.IndexEntry - err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch) bool { + err := client.QueryPages(context.Background(), entries, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { for j := 0; j < read.Len(); j++ { have = append(have, chunk.IndexEntry{ RangeValue: read.RangeValue(j), @@ -167,7 +169,7 @@ func TestQueryPages(t *testing.T) { run := true for run { var have []chunk.IndexEntry - err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { + err = client.QueryPages(context.Background(), []chunk.IndexQuery{tt.query}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { for i := 0; i < read.Len(); i++ { have = append(have, chunk.IndexEntry{ TableName: tt.query.TableName, diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index c86f573b77b..92ccd557582 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -9,7 +9,7 @@ type StorageClient interface { BatchWrite(context.Context, WriteBatch) error // For the read path. - QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error + QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error // For storing and retrieving chunks. PutChunks(ctx context.Context, chunks []Chunk) error diff --git a/pkg/chunk/util/util.go b/pkg/chunk/util/util.go new file mode 100644 index 00000000000..610255f9c7e --- /dev/null +++ b/pkg/chunk/util/util.go @@ -0,0 +1,75 @@ +package util + +import ( + "bytes" + "context" + "strings" + + "github.com/weaveworks/cortex/pkg/chunk" +) + +// DoSingleQuery is the interface for indexes that don't support batching yet. +type DoSingleQuery func( + ctx context.Context, query chunk.IndexQuery, + callback func(chunk.ReadBatch) bool, +) error + +// DoParallelQueries translates between our interface for query batching, +// and indexes that don't yet support batching. +func DoParallelQueries( + ctx context.Context, doSingleQuery DoSingleQuery, queries []chunk.IndexQuery, + callback func(chunk.IndexQuery, chunk.ReadBatch) bool, +) error { + incomingErrors := make(chan error) + for _, query := range queries { + go func(query chunk.IndexQuery) { + incomingErrors <- doSingleQuery(ctx, query, func(r chunk.ReadBatch) bool { + return callback(query, r) + }) + }(query) + } + var lastErr error + for i := 0; i < len(queries); i++ { + err := <-incomingErrors + if err != nil { + + lastErr = err + } + } + return lastErr +} + +type Callback func(chunk.IndexQuery, chunk.ReadBatch) bool + +type readBatch []cell + +func (b readBatch) Len() int { return len(b) } +func (b readBatch) RangeValue(i int) []byte { return b[i].column } +func (b readBatch) Value(i int) []byte { return b[i].value } + +type cell struct { + column []byte + value []byte +} + +func QueryFilter(callback Callback) Callback { + return func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + finalBatch := make(readBatch, 0, batch.Len()) + for i := 0; i < batch.Len(); i++ { + rangeValue, value := batch.RangeValue(i), batch.Value(i) + + if len(query.RangeValuePrefix) != 0 && !strings.HasPrefix(string(rangeValue), string(query.RangeValuePrefix)) { + continue + } + if len(query.RangeValueStart) != 0 && string(rangeValue) < string(query.RangeValueStart) { + continue + } + if len(query.ValueEqual) != 0 && !bytes.Equal(value, query.ValueEqual) { + continue + } + + finalBatch = append(finalBatch, cell{column: rangeValue, value: value}) + } + return callback(query, finalBatch) + } +} From 387a6f3af65380b2424cb288efb0000316cbd6e9 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 29 Aug 2018 21:33:43 +0100 Subject: [PATCH 2/4] Prevent concurrent modifications of the map. Signed-off-by: Tom Wilkie --- pkg/chunk/storage/caching_storage_client.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 9b5b1f941fd..b8ff6275d21 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -2,6 +2,7 @@ package storage import ( "context" + "sync" "time" "github.com/weaveworks/cortex/pkg/chunk" @@ -48,8 +49,11 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I } } + var resultsMtx sync.Mutex results := map[string][]chunk.ReadBatch{} err := s.StorageClient.QueryPages(ctx, cacheableMissed, func(cacheableQuery chunk.IndexQuery, r chunk.ReadBatch) bool { + resultsMtx.Lock() + defer resultsMtx.Unlock() key := queryKey(cacheableQuery) results[key] = append(results[key], r) return true @@ -58,6 +62,8 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I return err } + resultsMtx.Lock() + defer resultsMtx.Unlock() for key, batches := range results { query := missed[key] for _, batch := range batches { From 453785ab4e1587071353dbe72eba429d07793b4f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 30 Aug 2018 17:30:29 +0100 Subject: [PATCH 3/4] Turn chunk.ReadBatch into a iterator style interface to reduce copying. Signed-off-by: Tom Wilkie --- pkg/chunk/aws/storage_client.go | 29 ++++++++------ pkg/chunk/cassandra/storage_client.go | 21 +++++----- pkg/chunk/chunk_store.go | 6 +-- pkg/chunk/gcp/storage_client.go | 55 +++++++++++++++------------ pkg/chunk/inmemory_storage_client.go | 26 ++++++++----- pkg/chunk/storage/index_test.go | 10 ++--- pkg/chunk/storage_client.go | 6 +-- pkg/chunk/util/util.go | 50 ++++++++++++------------ 8 files changed, 112 insertions(+), 91 deletions(-) diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 739dbac066a..0561b755b99 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -289,8 +289,8 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e return backoff.Err() } -func (s storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { - return chunk_util.DoParallelQueries(ctx, s.query, queries, callback) +func (a storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + return chunk_util.DoParallelQueries(ctx, a.query, queries, callback) } func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { @@ -363,7 +363,7 @@ func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callba return nil } -func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) { +func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (*dynamoDBReadResponse, error) { backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) @@ -393,7 +393,10 @@ func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput } queryOutput := page.Data().(*dynamodb.QueryOutput) - return dynamoDBReadResponse(queryOutput.Items), nil + return &dynamoDBReadResponse{ + i: -1, + items: queryOutput.Items, + }, nil } return nil, fmt.Errorf("QueryPage error: %s for table %v, last error %v", backoff.Err(), *input.TableName, err) } @@ -777,18 +780,22 @@ func (a storageClient) putS3Chunk(ctx context.Context, key string, buf []byte) e } // Slice of values returned; map key is attribute name -type dynamoDBReadResponse []map[string]*dynamodb.AttributeValue +type dynamoDBReadResponse struct { + i int + items []map[string]*dynamodb.AttributeValue +} -func (b dynamoDBReadResponse) Len() int { - return len(b) +func (b *dynamoDBReadResponse) Next() bool { + b.i++ + return b.i < len(b.items) } -func (b dynamoDBReadResponse) RangeValue(i int) []byte { - return b[i][rangeKey].B +func (b *dynamoDBReadResponse) RangeValue() []byte { + return b.items[b.i][rangeKey].B } -func (b dynamoDBReadResponse) Value(i int) []byte { - chunkValue, ok := b[i][valueKey] +func (b *dynamoDBReadResponse) Value() []byte { + chunkValue, ok := b.items[b.i][valueKey] if !ok { return nil } diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index 7df057e61e7..7f8aa7830ae 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -210,7 +210,7 @@ func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callb defer iter.Close() scanner := iter.Scanner() for scanner.Next() { - var b readBatch + b := &readBatch{} if err := scanner.Scan(&b.rangeValue, &b.value); err != nil { return errors.WithStack(err) } @@ -223,27 +223,26 @@ func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callb // readBatch represents a batch of rows read from Cassandra. type readBatch struct { + consumed bool rangeValue []byte value []byte } // Len implements chunk.ReadBatch; in Cassandra we 'stream' results back // one-by-one, so this always returns 1. -func (readBatch) Len() int { - return 1 +func (b *readBatch) Next() bool { + if b.consumed { + return false + } + b.consumed = true + return true } -func (b readBatch) RangeValue(index int) []byte { - if index != 0 { - panic("index != 0") - } +func (b *readBatch) RangeValue() []byte { return b.rangeValue } -func (b readBatch) Value(index int) []byte { - if index != 0 { - panic("index != 0") - } +func (b *readBatch) Value() []byte { return b.value } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index d86366d1175..853125d53bf 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -414,12 +414,12 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry err := c.storage.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { - for i := 0; i < resp.Len(); i++ { + for resp.Next() { entries = append(entries, IndexEntry{ TableName: query.TableName, HashValue: query.HashValue, - RangeValue: resp.RangeValue(i), - Value: resp.Value(i), + RangeValue: resp.RangeValue(), + Value: resp.Value(), }) } return true diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index 888ca2bbe6c..d287c826893 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -217,7 +217,8 @@ func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk return true } - return callback(query, bigtableReadBatchColumnKey{ + return callback(query, &bigtableReadBatchColumnKey{ + i: -1, items: val, columnPrefix: columnFamily + ":", }) @@ -284,7 +285,8 @@ func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQue val = filteredItems } - callback(bigtableReadBatchColumnKey{ + callback(&bigtableReadBatchColumnKey{ + i: -1, items: val, columnPrefix: columnFamily + ":", }) @@ -293,22 +295,22 @@ func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQue // bigtableReadBatchColumnKey represents a batch of values read from Bigtable. type bigtableReadBatchColumnKey struct { + i int items []bigtable.ReadItem columnPrefix string } -func (b bigtableReadBatchColumnKey) Len() int { - return len(b.items) +func (b *bigtableReadBatchColumnKey) Next() bool { + b.i++ + return b.i < len(b.items) } -func (b bigtableReadBatchColumnKey) RangeValue(index int) []byte { - return []byte( - strings.TrimPrefix(b.items[index].Column, b.columnPrefix), - ) +func (b *bigtableReadBatchColumnKey) RangeValue() []byte { + return []byte(strings.TrimPrefix(b.items[b.i].Column, b.columnPrefix)) } -func (b bigtableReadBatchColumnKey) Value(index int) []byte { - return b.items[index].Value +func (b *bigtableReadBatchColumnKey) Value() []byte { + return b.items[b.i].Value } func (s *storageClientColumnKey) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { @@ -459,7 +461,9 @@ func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, cal err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { - return callback(bigtableReadBatchV1(r)) + return callback(&bigtableReadBatchV1{ + row: r, + }) } return true @@ -474,24 +478,27 @@ func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, cal // bigtableReadBatchV1 represents a batch of rows read from Bigtable. As the // bigtable interface gives us rows one-by-one, a batch always only contains // a single row. -type bigtableReadBatchV1 bigtable.Row - -func (bigtableReadBatchV1) Len() int { - return 1 +type bigtableReadBatchV1 struct { + consumed bool + row bigtable.Row } -func (b bigtableReadBatchV1) RangeValue(index int) []byte { - if index != 0 { - panic("index != 0") + +func (b *bigtableReadBatchV1) Next() bool { + if b.consumed { + return false } + b.consumed = true + return true +} + +func (b *bigtableReadBatchV1) RangeValue() []byte { // String before the first separator is the hashkey - parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) + parts := strings.SplitN(b.row.Key(), separator, 2) return []byte(parts[1]) } -func (b bigtableReadBatchV1) Value(index int) []byte { - if index != 0 { - panic("index != 0") - } - cf, ok := b[columnFamily] + +func (b *bigtableReadBatchV1) Value() []byte { + cf, ok := b.row[columnFamily] if !ok || len(cf) != 1 { panic("bad response from bigtable") } diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index 452cb99eb17..df2f3971711 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -243,12 +243,14 @@ func (m *MockStorage) query(ctx context.Context, query IndexQuery, callback func items = filtered } - result := mockReadBatch{} + result := mockReadBatch{ + index: -1, + } for _, item := range items { - result = append(result, item) + result.items = append(result.items, item) } - callback(result) + callback(&result) return nil } @@ -302,16 +304,20 @@ func (b *mockWriteBatch) Add(tableName, hashValue string, rangeValue []byte, val }{tableName, hashValue, rangeValue, value}) } -type mockReadBatch []mockItem +type mockReadBatch struct { + index int + items []mockItem +} -func (b mockReadBatch) Len() int { - return len(b) +func (b *mockReadBatch) Next() bool { + b.index++ + return b.index < len(b.items) } -func (b mockReadBatch) RangeValue(i int) []byte { - return b[i].rangeValue +func (b *mockReadBatch) RangeValue() []byte { + return b.items[b.index].rangeValue } -func (b mockReadBatch) Value(i int) []byte { - return b[i].value +func (b *mockReadBatch) Value() []byte { + return b.items[b.index].value } diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index e8f8db00761..260d0fb5632 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -29,9 +29,9 @@ func TestIndexBasic(t *testing.T) { } var have []chunk.IndexEntry err := client.QueryPages(context.Background(), entries, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { - for j := 0; j < read.Len(); j++ { + for read.Next() { have = append(have, chunk.IndexEntry{ - RangeValue: read.RangeValue(j), + RangeValue: read.RangeValue(), }) } return true @@ -170,12 +170,12 @@ func TestQueryPages(t *testing.T) { for run { var have []chunk.IndexEntry err = client.QueryPages(context.Background(), []chunk.IndexQuery{tt.query}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { - for i := 0; i < read.Len(); i++ { + for read.Next() { have = append(have, chunk.IndexEntry{ TableName: tt.query.TableName, HashValue: tt.query.HashValue, - RangeValue: read.RangeValue(i), - Value: read.Value(i), + RangeValue: read.RangeValue(), + Value: read.Value(), }) } return true diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index 92ccd557582..d867def5ab3 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -23,7 +23,7 @@ type WriteBatch interface { // ReadBatch represents the results of a QueryPages. type ReadBatch interface { - Len() int - RangeValue(index int) []byte - Value(index int) []byte + Next() bool + RangeValue() []byte + Value() []byte } diff --git a/pkg/chunk/util/util.go b/pkg/chunk/util/util.go index 610255f9c7e..6a6c412d303 100644 --- a/pkg/chunk/util/util.go +++ b/pkg/chunk/util/util.go @@ -39,37 +39,39 @@ func DoParallelQueries( return lastErr } +// Callback from an IndexQuery. type Callback func(chunk.IndexQuery, chunk.ReadBatch) bool -type readBatch []cell +type filteringBatch struct { + query chunk.IndexQuery + chunk.ReadBatch +} + +func (f *filteringBatch) Next() bool { + for f.ReadBatch.Next() { + rangeValue, value := f.ReadBatch.RangeValue(), f.ReadBatch.Value() + + if len(f.query.RangeValuePrefix) != 0 && !strings.HasPrefix(string(rangeValue), string(f.query.RangeValuePrefix)) { + continue + } + if len(f.query.RangeValueStart) != 0 && string(rangeValue) < string(f.query.RangeValueStart) { + continue + } + if len(f.query.ValueEqual) != 0 && !bytes.Equal(value, f.query.ValueEqual) { + continue + } -func (b readBatch) Len() int { return len(b) } -func (b readBatch) RangeValue(i int) []byte { return b[i].column } -func (b readBatch) Value(i int) []byte { return b[i].value } + return true + } -type cell struct { - column []byte - value []byte + return false } +// QueryFilter wraps a callback to ensure the results are filtered correctly; +// useful for the cache and BigTable backend, which only ever fetches the whole +// row. func QueryFilter(callback Callback) Callback { return func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { - finalBatch := make(readBatch, 0, batch.Len()) - for i := 0; i < batch.Len(); i++ { - rangeValue, value := batch.RangeValue(i), batch.Value(i) - - if len(query.RangeValuePrefix) != 0 && !strings.HasPrefix(string(rangeValue), string(query.RangeValuePrefix)) { - continue - } - if len(query.RangeValueStart) != 0 && string(rangeValue) < string(query.RangeValueStart) { - continue - } - if len(query.ValueEqual) != 0 && !bytes.Equal(value, query.ValueEqual) { - continue - } - - finalBatch = append(finalBatch, cell{column: rangeValue, value: value}) - } - return callback(query, finalBatch) + return callback(query, &filteringBatch{query, batch}) } } From 3309d9c550f979f5198867f228408464548198e8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 3 Sep 2018 11:03:25 +0100 Subject: [PATCH 4/4] Review feedback. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_store.go | 3 +++ pkg/chunk/gcp/storage_client.go | 18 ++++++++---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 853125d53bf..e0c84adb7b2 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -424,6 +424,9 @@ func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery } return true }) + if err != nil { + level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err) + } return entries, err } diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index d287c826893..da6def35168 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -19,6 +19,7 @@ import ( const ( columnFamily = "f" + columnPrefix = columnFamily + ":" column = "c" separator = "\000" maxRowReads = 100 @@ -218,9 +219,8 @@ func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk } return callback(query, &bigtableReadBatchColumnKey{ - i: -1, - items: val, - columnPrefix: columnFamily + ":", + i: -1, + items: val, }) }) @@ -286,18 +286,16 @@ func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQue val = filteredItems } callback(&bigtableReadBatchColumnKey{ - i: -1, - items: val, - columnPrefix: columnFamily + ":", + i: -1, + items: val, }) return nil } // bigtableReadBatchColumnKey represents a batch of values read from Bigtable. type bigtableReadBatchColumnKey struct { - i int - items []bigtable.ReadItem - columnPrefix string + i int + items []bigtable.ReadItem } func (b *bigtableReadBatchColumnKey) Next() bool { @@ -306,7 +304,7 @@ func (b *bigtableReadBatchColumnKey) Next() bool { } func (b *bigtableReadBatchColumnKey) RangeValue() []byte { - return []byte(strings.TrimPrefix(b.items[b.i].Column, b.columnPrefix)) + return []byte(strings.TrimPrefix(b.items[b.i].Column, columnPrefix)) } func (b *bigtableReadBatchColumnKey) Value() []byte {