From 14e7d55ed21fd5be0f5e1056ce89705953933c62 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 3 Sep 2018 13:02:56 +0100 Subject: [PATCH 1/9] Revert "Revert "Merge pull request #971 from grafana/batch-index-lookups"" This reverts commit 8b74f9053037c4590ec1512b65288e4869d5e748. Signed-off-by: Tom Wilkie --- pkg/chunk/aws/storage_client.go | 32 +++-- pkg/chunk/cassandra/storage_client.go | 28 ++-- pkg/chunk/chunk_store.go | 45 +----- pkg/chunk/gcp/storage_client.go | 149 ++++++++++++++++---- pkg/chunk/inmemory_storage_client.go | 47 ++++-- pkg/chunk/storage/caching_storage_client.go | 127 +++++------------ pkg/chunk/storage/index_test.go | 22 +-- pkg/chunk/storage_client.go | 8 +- pkg/chunk/util/util.go | 77 ++++++++++ 9 files changed, 329 insertions(+), 206 deletions(-) create mode 100644 pkg/chunk/util/util.go diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 9353280ead5..b8d5f7b3f92 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -30,6 +30,7 @@ import ( "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/chunk" + chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" "github.com/weaveworks/cortex/pkg/util" ) @@ -301,7 +302,11 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e return backoff.Err() } -func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (a storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + return chunk_util.DoParallelQueries(ctx, a.query, queries, callback) +} + +func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() @@ -371,7 +376,7 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c return nil } -func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) { +func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (*dynamoDBReadResponse, error) { backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) @@ -401,7 +406,10 @@ func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput } queryOutput := page.Data().(*dynamodb.QueryOutput) - return dynamoDBReadResponse(queryOutput.Items), nil + return &dynamoDBReadResponse{ + i: -1, + items: queryOutput.Items, + }, nil } return nil, fmt.Errorf("QueryPage error: %s for table %v, last error %v", backoff.Err(), *input.TableName, err) } @@ -785,18 +793,22 @@ func (a storageClient) putS3Chunk(ctx context.Context, key string, buf []byte) e } // Slice of values returned; map key is attribute name -type dynamoDBReadResponse []map[string]*dynamodb.AttributeValue +type dynamoDBReadResponse struct { + i int + items []map[string]*dynamodb.AttributeValue +} -func (b dynamoDBReadResponse) Len() int { - return len(b) +func (b *dynamoDBReadResponse) Next() bool { + b.i++ + return b.i < len(b.items) } -func (b dynamoDBReadResponse) RangeValue(i int) []byte { - return b[i][rangeKey].B +func (b *dynamoDBReadResponse) RangeValue() []byte { + return b.items[b.i][rangeKey].B } -func (b dynamoDBReadResponse) Value(i int) []byte { - chunkValue, ok := b[i][valueKey] +func (b *dynamoDBReadResponse) Value() []byte { + chunkValue, ok := b.items[b.i][valueKey] if !ok { return nil } diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index 88d443db8bf..13078b2e227 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/common/model" "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/util" ) const ( @@ -185,7 +186,11 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return nil } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + return util.DoParallelQueries(ctx, s.query, queries, callback) +} + +func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { var q *gocql.Query switch { @@ -218,7 +223,7 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, defer iter.Close() scanner := iter.Scanner() for scanner.Next() { - var b readBatch + b := &readBatch{} if err := scanner.Scan(&b.rangeValue, &b.value); err != nil { return errors.WithStack(err) } @@ -231,27 +236,26 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, // readBatch represents a batch of rows read from Cassandra. type readBatch struct { + consumed bool rangeValue []byte value []byte } // Len implements chunk.ReadBatch; in Cassandra we 'stream' results back // one-by-one, so this always returns 1. -func (readBatch) Len() int { - return 1 +func (b *readBatch) Next() bool { + if b.consumed { + return false + } + b.consumed = true + return true } -func (b readBatch) RangeValue(index int) []byte { - if index != 0 { - panic("index != 0") - } +func (b *readBatch) RangeValue() []byte { return b.rangeValue } -func (b readBatch) Value(index int) []byte { - if index != 0 { - panic("index != 0") - } +func (b *readBatch) Value() []byte { return b.value } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index c8abb23441c..f811a9cf7b8 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -347,53 +347,22 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode } func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { - incomingEntries := make(chan []IndexEntry) - incomingErrors := make(chan error) - for _, query := range queries { - go func(query IndexQuery) { - entries, err := c.lookupEntriesByQuery(ctx, query) - if err != nil { - incomingErrors <- err - } else { - incomingEntries <- entries - } - }(query) - } - - // Combine the results into one slice - var entries []IndexEntry - var lastErr error - for i := 0; i < len(queries); i++ { - select { - case incoming := <-incomingEntries: - entries = append(entries, incoming...) - case err := <-incomingErrors: - lastErr = err - } - } - - return entries, lastErr -} - -func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry - - if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) { - for i := 0; i < resp.Len(); i++ { + err := c.storage.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { + for resp.Next() { entries = append(entries, IndexEntry{ TableName: query.TableName, HashValue: query.HashValue, - RangeValue: resp.RangeValue(i), - Value: resp.Value(i), + RangeValue: resp.RangeValue(), + Value: resp.Value(), }) } return true - }); err != nil { + }) + if err != nil { level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err) - return nil, err } - - return entries, nil + return entries, err } func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index 9135236acc7..ec0597bc22a 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -14,11 +14,13 @@ import ( "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" + chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" "github.com/weaveworks/cortex/pkg/util" ) const ( columnFamily = "f" + columnPrefix = columnFamily + ":" column = "c" separator = "\000" maxRowReads = 100 @@ -187,7 +189,88 @@ func (s *storageClientColumnKey) BatchWrite(ctx context.Context, batch chunk.Wri return nil } -func (s *storageClientColumnKey) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages") + defer sp.Finish() + + // A limitation of this approach is that this only fetches whole rows; but + // whatever, we filter them in the cache on the client. But for unit tests to + // pass, we must do this. + callback = chunk_util.QueryFilter(callback) + + type tableQuery struct { + name string + queries map[string]chunk.IndexQuery + rows bigtable.RowList + } + + tableQueries := map[string]tableQuery{} + for _, query := range queries { + tq, ok := tableQueries[query.TableName] + if !ok { + tq = tableQuery{ + name: query.TableName, + queries: map[string]chunk.IndexQuery{}, + } + } + tq.queries[query.HashValue] = query + tq.rows = append(tq.rows, query.HashValue) + tableQueries[query.TableName] = tq + } + + errs := make(chan error) + + for _, tq := range tableQueries { + + table := s.client.Open(tq.name) + for i := 0; i < len(tq.rows); i += maxRowReads { + + page := tq.rows[i:util.Min(i+maxRowReads, len(tq.rows))] + go func(page bigtable.RowList, tq tableQuery) { + var processingErr error + // rows are returned in key order, not order in row list + err := table.ReadRows(ctx, page, func(row bigtable.Row) bool { + + query, ok := tq.queries[row.Key()] + if !ok { + processingErr = errors.WithStack(fmt.Errorf("Got row for unknown chunk: %s", row.Key())) + return false + } + + val, ok := row[columnFamily] + if !ok { + // There are no matching rows. + return true + } + + return callback(query, &bigtableReadBatchColumnKey{ + i: -1, + items: val, + }) + }) + + if processingErr != nil { + errs <- processingErr + } else { + errs <- err + } + }(page, tq) + } + } + + var lastErr error + for _, tq := range tableQueries { + for i := 0; i < len(tq.rows); i += maxRowReads { + err := <-errs + if err != nil { + lastErr = err + } + } + } + return lastErr +} + +func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { const null = string('\xff') sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) @@ -227,31 +310,30 @@ func (s *storageClientColumnKey) QueryPages(ctx context.Context, query chunk.Ind val = filteredItems } - callback(bigtableReadBatchColumnKey{ - items: val, - columnPrefix: columnFamily + ":", + callback(&bigtableReadBatchColumnKey{ + i: -1, + items: val, }) return nil } // bigtableReadBatchColumnKey represents a batch of values read from Bigtable. type bigtableReadBatchColumnKey struct { - items []bigtable.ReadItem - columnPrefix string + i int + items []bigtable.ReadItem } -func (b bigtableReadBatchColumnKey) Len() int { - return len(b.items) +func (b *bigtableReadBatchColumnKey) Next() bool { + b.i++ + return b.i < len(b.items) } -func (b bigtableReadBatchColumnKey) RangeValue(index int) []byte { - return []byte( - strings.TrimPrefix(b.items[index].Column, b.columnPrefix), - ) +func (b *bigtableReadBatchColumnKey) RangeValue() []byte { + return []byte(strings.TrimPrefix(b.items[b.i].Column, columnPrefix)) } -func (b bigtableReadBatchColumnKey) Value(index int) []byte { - return b.items[index].Value +func (b *bigtableReadBatchColumnKey) Value() []byte { + return b.items[b.i].Value } func (s *storageClientColumnKey) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { @@ -368,7 +450,11 @@ func (s *storageClientColumnKey) GetChunks(ctx context.Context, input []chunk.Ch return output, nil } -func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClientV1) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { + return chunk_util.DoParallelQueries(ctx, s.query, queries, callback) +} + +func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { const null = string('\xff') sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) @@ -398,7 +484,9 @@ func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { - return callback(bigtableReadBatchV1(r)) + return callback(&bigtableReadBatchV1{ + row: r, + }) } return true @@ -413,24 +501,27 @@ func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery // bigtableReadBatchV1 represents a batch of rows read from Bigtable. As the // bigtable interface gives us rows one-by-one, a batch always only contains // a single row. -type bigtableReadBatchV1 bigtable.Row - -func (bigtableReadBatchV1) Len() int { - return 1 +type bigtableReadBatchV1 struct { + consumed bool + row bigtable.Row } -func (b bigtableReadBatchV1) RangeValue(index int) []byte { - if index != 0 { - panic("index != 0") + +func (b *bigtableReadBatchV1) Next() bool { + if b.consumed { + return false } + b.consumed = true + return true +} + +func (b *bigtableReadBatchV1) RangeValue() []byte { // String before the first separator is the hashkey - parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) + parts := strings.SplitN(b.row.Key(), separator, 2) return []byte(parts[1]) } -func (b bigtableReadBatchV1) Value(index int) []byte { - if index != 0 { - panic("index != 0") - } - cf, ok := b[columnFamily] + +func (b *bigtableReadBatchV1) Value() []byte { + cf, ok := b.row[columnFamily] if !ok || len(cf) != 1 { panic("bad response from bigtable") } diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index 0add546b2a4..a47c32601a9 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -169,13 +169,26 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { } // QueryPages implements StorageClient. -func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error { - logger := util.WithContext(ctx, util.Logger) - level.Debug(logger).Log("msg", "QueryPages", "query", query.HashValue) - +func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error { m.mtx.RLock() defer m.mtx.RUnlock() + for _, query := range queries { + err := m.query(ctx, query, func(b ReadBatch) bool { + return callback(query, b) + }) + if err != nil { + return err + } + } + + return nil +} + +func (m *MockStorage) query(ctx context.Context, query IndexQuery, callback func(ReadBatch) (shouldContinue bool)) error { + logger := util.WithContext(ctx, util.Logger) + level.Debug(logger).Log("msg", "QueryPages", "query", query.HashValue) + table, ok := m.tables[query.TableName] if !ok { return fmt.Errorf("table not found") @@ -241,12 +254,14 @@ func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback items = filtered } - result := mockReadBatch{} + result := mockReadBatch{ + index: -1, + } for _, item := range items { - result = append(result, item) + result.items = append(result.items, item) } - callback(result) + callback(&result) return nil } @@ -300,16 +315,20 @@ func (b *mockWriteBatch) Add(tableName, hashValue string, rangeValue []byte, val }{tableName, hashValue, rangeValue, value}) } -type mockReadBatch []mockItem +type mockReadBatch struct { + index int + items []mockItem +} -func (b mockReadBatch) Len() int { - return len(b) +func (b *mockReadBatch) Next() bool { + b.index++ + return b.index < len(b.items) } -func (b mockReadBatch) RangeValue(i int) []byte { - return b[i].rangeValue +func (b *mockReadBatch) RangeValue() []byte { + return b.items[b.index].rangeValue } -func (b mockReadBatch) Value(i int) []byte { - return b[i].value +func (b *mockReadBatch) Value() []byte { + return b.items[b.index].value } diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 00e9ed922e4..c7f7ecb3b37 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -1,11 +1,8 @@ package storage import ( - "bytes" "context" - "encoding/hex" - "hash/fnv" - "strings" + "sync" "time" proto "github.com/golang/protobuf/proto" @@ -14,6 +11,7 @@ import ( "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/cache" + chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" ) var ( @@ -104,102 +102,53 @@ func newCachingStorageClient(client chunk.StorageClient, cache cache.Cache, vali } } -func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { - value, ok, err := s.cache.Fetch(ctx, queryKey(query)) - if err != nil { - cacheCorruptErrs.Inc() - } - - if ok && err == nil { - filteredBatch, _ := filterBatchByQuery(query, []chunk.ReadBatch{value}) - callback(filteredBatch) +func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { + // We cache the entire row, so filter client side. + callback = chunk_util.QueryFilter(callback) + cacheableMissed := []chunk.IndexQuery{} + missed := map[string]chunk.IndexQuery{} + + for _, query := range queries { + value, ok := s.cache.Get(ctx, queryKey(query)) + if !ok { + cacheableMissed = append(cacheableMissed, chunk.IndexQuery{ + TableName: query.TableName, + HashValue: query.HashValue, + }) + missed[queryKey(query)] = query + continue + } - return nil + for _, batch := range value.([]chunk.ReadBatch) { + callback(query, batch) + } } - batches := []chunk.ReadBatch{} - cacheableQuery := chunk.IndexQuery{ - TableName: query.TableName, - HashValue: query.HashValue, - } // Just reads the entire row and caches it. - - expiryTime := time.Now().Add(s.validity) - err = s.StorageClient.QueryPages(ctx, cacheableQuery, copyingCallback(&batches)) + var resultsMtx sync.Mutex + results := map[string][]chunk.ReadBatch{} + err := s.StorageClient.QueryPages(ctx, cacheableMissed, func(cacheableQuery chunk.IndexQuery, r chunk.ReadBatch) bool { + resultsMtx.Lock() + defer resultsMtx.Unlock() + key := queryKey(cacheableQuery) + results[key] = append(results[key], r) + return true + }) if err != nil { return err } - filteredBatch, totalBatches := filterBatchByQuery(query, batches) - callback(filteredBatch) - - totalBatches.Key = queryKey(query) - totalBatches.Expiry = expiryTime.UnixNano() - - s.cache.Store(ctx, totalBatches.Key, totalBatches) - return nil -} - -// Len implements chunk.ReadBatch. -func (b ReadBatch) Len() int { return len(b.Entries) } - -// RangeValue implements chunk.ReadBatch. -func (b ReadBatch) RangeValue(i int) []byte { return b.Entries[i].Column } - -// Value implements chunk.ReadBatch. -func (b ReadBatch) Value(i int) []byte { return b.Entries[i].Value } - -func copyingCallback(readBatches *[]chunk.ReadBatch) func(chunk.ReadBatch) bool { - return func(result chunk.ReadBatch) bool { - *readBatches = append(*readBatches, result) - return true + resultsMtx.Lock() + defer resultsMtx.Unlock() + for key, batches := range results { + query := missed[key] + for _, batch := range batches { + callback(query, batch) + } } + return nil } func queryKey(q chunk.IndexQuery) string { const sep = "\xff" return q.TableName + sep + q.HashValue } - -func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) (filteredBatch, totalBatch ReadBatch) { - filter := func([]byte, []byte) bool { return true } - - if len(query.RangeValuePrefix) != 0 { - filter = func(rangeValue []byte, value []byte) bool { - return strings.HasPrefix(string(rangeValue), string(query.RangeValuePrefix)) - } - } - if len(query.RangeValueStart) != 0 { - filter = func(rangeValue []byte, value []byte) bool { - return string(rangeValue) >= string(query.RangeValueStart) - } - } - if len(query.ValueEqual) != 0 { - // This is on top of the existing filters. - existingFilter := filter - filter = func(rangeValue []byte, value []byte) bool { - return existingFilter(rangeValue, value) && bytes.Equal(value, query.ValueEqual) - } - } - - filteredBatch.Entries = make([]*Entry, 0, len(batches)) // On the higher side for most queries. On the lower side for column key schema. - totalBatch.Entries = make([]*Entry, 0, len(batches)) - for _, batch := range batches { - for i := 0; i < batch.Len(); i++ { - totalBatch.Entries = append(totalBatch.Entries, &Entry{Column: batch.RangeValue(i), Value: batch.Value(i)}) - - if filter(batch.RangeValue(i), batch.Value(i)) { - filteredBatch.Entries = append(filteredBatch.Entries, &Entry{Column: batch.RangeValue(i), Value: batch.Value(i)}) - } - } - } - - return -} - -func hashKey(key string) string { - hasher := fnv.New64a() - hasher.Write([]byte(key)) // This'll never error. - - // Hex because memcache errors for the bytes produced by the hash. - return hex.EncodeToString(hasher.Sum(nil)) -} diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index 91d8532bb96..260d0fb5632 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -21,15 +21,17 @@ func TestIndexBasic(t *testing.T) { // Make sure we get back the correct entries by hash value. for i := 0; i < 30; i++ { - entry := chunk.IndexQuery{ - TableName: tableName, - HashValue: fmt.Sprintf("hash%d", i), + entries := []chunk.IndexQuery{ + { + TableName: tableName, + HashValue: fmt.Sprintf("hash%d", i), + }, } var have []chunk.IndexEntry - err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch) bool { - for j := 0; j < read.Len(); j++ { + err := client.QueryPages(context.Background(), entries, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { + for read.Next() { have = append(have, chunk.IndexEntry{ - RangeValue: read.RangeValue(j), + RangeValue: read.RangeValue(), }) } return true @@ -167,13 +169,13 @@ func TestQueryPages(t *testing.T) { run := true for run { var have []chunk.IndexEntry - err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { - for i := 0; i < read.Len(); i++ { + err = client.QueryPages(context.Background(), []chunk.IndexQuery{tt.query}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { + for read.Next() { have = append(have, chunk.IndexEntry{ TableName: tt.query.TableName, HashValue: tt.query.HashValue, - RangeValue: read.RangeValue(i), - Value: read.Value(i), + RangeValue: read.RangeValue(), + Value: read.Value(), }) } return true diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index c86f573b77b..d867def5ab3 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -9,7 +9,7 @@ type StorageClient interface { BatchWrite(context.Context, WriteBatch) error // For the read path. - QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error + QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error // For storing and retrieving chunks. PutChunks(ctx context.Context, chunks []Chunk) error @@ -23,7 +23,7 @@ type WriteBatch interface { // ReadBatch represents the results of a QueryPages. type ReadBatch interface { - Len() int - RangeValue(index int) []byte - Value(index int) []byte + Next() bool + RangeValue() []byte + Value() []byte } diff --git a/pkg/chunk/util/util.go b/pkg/chunk/util/util.go new file mode 100644 index 00000000000..6a6c412d303 --- /dev/null +++ b/pkg/chunk/util/util.go @@ -0,0 +1,77 @@ +package util + +import ( + "bytes" + "context" + "strings" + + "github.com/weaveworks/cortex/pkg/chunk" +) + +// DoSingleQuery is the interface for indexes that don't support batching yet. +type DoSingleQuery func( + ctx context.Context, query chunk.IndexQuery, + callback func(chunk.ReadBatch) bool, +) error + +// DoParallelQueries translates between our interface for query batching, +// and indexes that don't yet support batching. +func DoParallelQueries( + ctx context.Context, doSingleQuery DoSingleQuery, queries []chunk.IndexQuery, + callback func(chunk.IndexQuery, chunk.ReadBatch) bool, +) error { + incomingErrors := make(chan error) + for _, query := range queries { + go func(query chunk.IndexQuery) { + incomingErrors <- doSingleQuery(ctx, query, func(r chunk.ReadBatch) bool { + return callback(query, r) + }) + }(query) + } + var lastErr error + for i := 0; i < len(queries); i++ { + err := <-incomingErrors + if err != nil { + + lastErr = err + } + } + return lastErr +} + +// Callback from an IndexQuery. +type Callback func(chunk.IndexQuery, chunk.ReadBatch) bool + +type filteringBatch struct { + query chunk.IndexQuery + chunk.ReadBatch +} + +func (f *filteringBatch) Next() bool { + for f.ReadBatch.Next() { + rangeValue, value := f.ReadBatch.RangeValue(), f.ReadBatch.Value() + + if len(f.query.RangeValuePrefix) != 0 && !strings.HasPrefix(string(rangeValue), string(f.query.RangeValuePrefix)) { + continue + } + if len(f.query.RangeValueStart) != 0 && string(rangeValue) < string(f.query.RangeValueStart) { + continue + } + if len(f.query.ValueEqual) != 0 && !bytes.Equal(value, f.query.ValueEqual) { + continue + } + + return true + } + + return false +} + +// QueryFilter wraps a callback to ensure the results are filtered correctly; +// useful for the cache and BigTable backend, which only ever fetches the whole +// row. +func QueryFilter(callback Callback) Callback { + return func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + return callback(query, &filteringBatch{query, batch}) + } +} From c19ae4f2c3b2a187dd5d20dfcb7a135dfecef792 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 3 Sep 2018 13:02:56 +0100 Subject: [PATCH 2/9] Fix index caching with batch index accessors. - Use iterators on the batches, so the batches themselves aren't mutated and can be cached. - Actually write back to the cache in the index caching! - Add a test that we've written back to the cache. - Use bytes.Compare, bytes.Equal etc in the filteringBatchIter to reduce copies. This reverts commit 8b74f9053037c4590ec1512b65288e4869d5e748. Signed-off-by: Tom Wilkie --- pkg/chunk/aws/storage_client.go | 20 +++-- pkg/chunk/cassandra/storage_client.go | 19 ++-- pkg/chunk/chunk_store.go | 7 +- pkg/chunk/gcp/storage_client.go | 61 ++++++++----- pkg/chunk/inmemory_storage_client.go | 23 +++-- pkg/chunk/storage/caching_storage_client.go | 88 +++++++++++++++---- .../storage/caching_storage_client_test.go | 72 +++++++++++++++ pkg/chunk/storage/index_test.go | 12 +-- pkg/chunk/storage_client.go | 5 ++ pkg/chunk/util/util.go | 23 +++-- 10 files changed, 262 insertions(+), 68 deletions(-) create mode 100644 pkg/chunk/storage/caching_storage_client_test.go diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index b8d5f7b3f92..e47b9a666f6 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -407,7 +407,6 @@ func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput queryOutput := page.Data().(*dynamodb.QueryOutput) return &dynamoDBReadResponse{ - i: -1, items: queryOutput.Items, }, nil } @@ -794,20 +793,31 @@ func (a storageClient) putS3Chunk(ctx context.Context, key string, buf []byte) e // Slice of values returned; map key is attribute name type dynamoDBReadResponse struct { - i int items []map[string]*dynamodb.AttributeValue } -func (b *dynamoDBReadResponse) Next() bool { +func (b *dynamoDBReadResponse) Iterator() chunk.ReadBatchIterator { + return &dynamoDBReadResponseIterator{ + i: -1, + dynamoDBReadResponse: b, + } +} + +type dynamoDBReadResponseIterator struct { + i int + *dynamoDBReadResponse +} + +func (b *dynamoDBReadResponseIterator) Next() bool { b.i++ return b.i < len(b.items) } -func (b *dynamoDBReadResponse) RangeValue() []byte { +func (b *dynamoDBReadResponseIterator) RangeValue() []byte { return b.items[b.i][rangeKey].B } -func (b *dynamoDBReadResponse) Value() []byte { +func (b *dynamoDBReadResponseIterator) Value() []byte { chunkValue, ok := b.items[b.i][valueKey] if !ok { return nil diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index 13078b2e227..b0a6922770b 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -241,9 +241,18 @@ type readBatch struct { value []byte } -// Len implements chunk.ReadBatch; in Cassandra we 'stream' results back -// one-by-one, so this always returns 1. -func (b *readBatch) Next() bool { +func (r *readBatch) Iterator() chunk.ReadBatchIterator { + return &readBatchIter{ + readBatch: r, + } +} + +type readBatchIter struct { + consumed bool + *readBatch +} + +func (b *readBatchIter) Next() bool { if b.consumed { return false } @@ -251,11 +260,11 @@ func (b *readBatch) Next() bool { return true } -func (b *readBatch) RangeValue() []byte { +func (b *readBatchIter) RangeValue() []byte { return b.rangeValue } -func (b *readBatch) Value() []byte { +func (b *readBatchIter) Value() []byte { return b.value } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index f811a9cf7b8..a0d0819e3f4 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -349,12 +349,13 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry err := c.storage.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { - for resp.Next() { + iter := resp.Iterator() + for iter.Next() { entries = append(entries, IndexEntry{ TableName: query.TableName, HashValue: query.HashValue, - RangeValue: resp.RangeValue(), - Value: resp.Value(), + RangeValue: iter.RangeValue(), + Value: iter.Value(), }) } return true diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index ec0597bc22a..13728294ee4 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -243,8 +243,7 @@ func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk return true } - return callback(query, &bigtableReadBatchColumnKey{ - i: -1, + return callback(query, &columnKeyBatch{ items: val, }) }) @@ -310,30 +309,40 @@ func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQue val = filteredItems } - callback(&bigtableReadBatchColumnKey{ - i: -1, + callback(&columnKeyBatch{ items: val, }) return nil } -// bigtableReadBatchColumnKey represents a batch of values read from Bigtable. -type bigtableReadBatchColumnKey struct { - i int +// columnKeyBatch represents a batch of values read from Bigtable. +type columnKeyBatch struct { items []bigtable.ReadItem } -func (b *bigtableReadBatchColumnKey) Next() bool { - b.i++ - return b.i < len(b.items) +func (c *columnKeyBatch) Iterator() chunk.ReadBatchIterator { + return &columnKeyIterator{ + i: -1, + columnKeyBatch: c, + } +} + +type columnKeyIterator struct { + i int + *columnKeyBatch +} + +func (c *columnKeyIterator) Next() bool { + c.i++ + return c.i < len(c.items) } -func (b *bigtableReadBatchColumnKey) RangeValue() []byte { - return []byte(strings.TrimPrefix(b.items[b.i].Column, columnPrefix)) +func (c *columnKeyIterator) RangeValue() []byte { + return []byte(strings.TrimPrefix(c.items[c.i].Column, columnPrefix)) } -func (b *bigtableReadBatchColumnKey) Value() []byte { - return b.items[b.i].Value +func (c *columnKeyIterator) Value() []byte { + return c.items[c.i].Value } func (s *storageClientColumnKey) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { @@ -484,7 +493,7 @@ func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, cal err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { - return callback(&bigtableReadBatchV1{ + return callback(&rowBatch{ row: r, }) } @@ -498,15 +507,25 @@ func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, cal return nil } -// bigtableReadBatchV1 represents a batch of rows read from Bigtable. As the +// rowBatch represents a batch of rows read from Bigtable. As the // bigtable interface gives us rows one-by-one, a batch always only contains // a single row. -type bigtableReadBatchV1 struct { +type rowBatch struct { + row bigtable.Row +} + +func (b *rowBatch) Iterator() chunk.ReadBatchIterator { + return &rowBatchIterator{ + rowBatch: b, + } +} + +type rowBatchIterator struct { consumed bool - row bigtable.Row + *rowBatch } -func (b *bigtableReadBatchV1) Next() bool { +func (b *rowBatchIterator) Next() bool { if b.consumed { return false } @@ -514,13 +533,13 @@ func (b *bigtableReadBatchV1) Next() bool { return true } -func (b *bigtableReadBatchV1) RangeValue() []byte { +func (b *rowBatchIterator) RangeValue() []byte { // String before the first separator is the hashkey parts := strings.SplitN(b.row.Key(), separator, 2) return []byte(parts[1]) } -func (b *bigtableReadBatchV1) Value() []byte { +func (b *rowBatchIterator) Value() []byte { cf, ok := b.row[columnFamily] if !ok || len(cf) != 1 { panic("bad response from bigtable") diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index a47c32601a9..4978dede575 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -254,9 +254,7 @@ func (m *MockStorage) query(ctx context.Context, query IndexQuery, callback func items = filtered } - result := mockReadBatch{ - index: -1, - } + result := mockReadBatch{} for _, item := range items { result.items = append(result.items, item) } @@ -316,19 +314,30 @@ func (b *mockWriteBatch) Add(tableName, hashValue string, rangeValue []byte, val } type mockReadBatch struct { - index int items []mockItem } -func (b *mockReadBatch) Next() bool { +func (b *mockReadBatch) Iterator() ReadBatchIterator { + return &mockReadBatchIter{ + index: -1, + mockReadBatch: b, + } +} + +type mockReadBatchIter struct { + index int + *mockReadBatch +} + +func (b *mockReadBatchIter) Next() bool { b.index++ return b.index < len(b.items) } -func (b *mockReadBatch) RangeValue() []byte { +func (b *mockReadBatchIter) RangeValue() []byte { return b.items[b.index].rangeValue } -func (b *mockReadBatch) Value() []byte { +func (b *mockReadBatchIter) Value() []byte { return b.items[b.index].value } diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index c7f7ecb3b37..05fc67d5d53 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -2,6 +2,8 @@ package storage import ( "context" + "encoding/hex" + "hash/fnv" "sync" "time" @@ -109,28 +111,45 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I missed := map[string]chunk.IndexQuery{} for _, query := range queries { - value, ok := s.cache.Get(ctx, queryKey(query)) - if !ok { - cacheableMissed = append(cacheableMissed, chunk.IndexQuery{ - TableName: query.TableName, - HashValue: query.HashValue, - }) - missed[queryKey(query)] = query + key := queryKey(query) + batch, ok, err := s.cache.Fetch(ctx, key) + if err != nil { + cacheCorruptErrs.Inc() + } else if ok { + callback(query, batch) continue } - for _, batch := range value.([]chunk.ReadBatch) { - callback(query, batch) - } + // Just reads the entire row and caches it; filter client side. + cacheableMissed = append(cacheableMissed, chunk.IndexQuery{ + TableName: query.TableName, + HashValue: query.HashValue, + }) + missed[key] = query + } + + if len(cacheableMissed) == 0 { + return nil } var resultsMtx sync.Mutex - results := map[string][]chunk.ReadBatch{} + results := map[string]ReadBatch{} + expiryTime := time.Now().Add(s.validity) err := s.StorageClient.QueryPages(ctx, cacheableMissed, func(cacheableQuery chunk.IndexQuery, r chunk.ReadBatch) bool { resultsMtx.Lock() defer resultsMtx.Unlock() key := queryKey(cacheableQuery) - results[key] = append(results[key], r) + existing, ok := results[key] + if !ok { + existing = ReadBatch{ + Key: key, + Expiry: expiryTime.UnixNano(), + } + } + for iter := r.Iterator(); iter.Next(); { + existing.Entries = append(existing.Entries, &Entry{Column: iter.RangeValue(), Value: iter.Value()}) + } + results[key] = existing return true }) if err != nil { @@ -139,16 +158,53 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I resultsMtx.Lock() defer resultsMtx.Unlock() - for key, batches := range results { + for key, batch := range results { query := missed[key] - for _, batch := range batches { - callback(query, batch) - } + callback(query, batch) + s.cache.Store(ctx, queryKey(query), batch) + } + return nil +} + +// Iter implements chunk.ReadBatch. +func (b ReadBatch) Iterator() chunk.ReadBatchIterator { + return &readBatchIterator{ + index: -1, + readBatch: b, } return nil } +type readBatchIterator struct { + index int + readBatch ReadBatch +} + +// Len implements chunk.ReadBatchIterator. +func (b *readBatchIterator) Next() bool { + b.index++ + return b.index < len(b.readBatch.Entries) +} + +// RangeValue implements chunk.ReadBatchIterator. +func (b *readBatchIterator) RangeValue() []byte { + return b.readBatch.Entries[b.index].Column +} + +// Value implements chunk.ReadBatchIterator. +func (b *readBatchIterator) Value() []byte { + return b.readBatch.Entries[b.index].Value +} + func queryKey(q chunk.IndexQuery) string { const sep = "\xff" return q.TableName + sep + q.HashValue } + +func hashKey(key string) string { + hasher := fnv.New64a() + hasher.Write([]byte(key)) // This'll never error. + + // Hex because memcache errors for the bytes produced by the hash. + return hex.EncodeToString(hasher.Sum(nil)) +} diff --git a/pkg/chunk/storage/caching_storage_client_test.go b/pkg/chunk/storage/caching_storage_client_test.go new file mode 100644 index 00000000000..00be68793da --- /dev/null +++ b/pkg/chunk/storage/caching_storage_client_test.go @@ -0,0 +1,72 @@ +package storage + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/cache" +) + +type mockStore struct { + chunk.StorageClient + queries int +} + +func (m *mockStore) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { + m.queries++ + for _, query := range queries { + callback(query, mockReadBatch{}) + } + return nil +} + +type mockReadBatch struct{} + +func (mockReadBatch) Iterator() chunk.ReadBatchIterator { + return &mockReadBatchIterator{} +} + +type mockReadBatchIterator struct { + consumed bool +} + +func (m *mockReadBatchIterator) Next() bool { + if m.consumed { + return false + } + m.consumed = true + return true +} + +func (mockReadBatchIterator) RangeValue() []byte { + return []byte("foo") +} + +func (mockReadBatchIterator) Value() []byte { + return []byte("bar") +} + +func TestCachingStorageClient(t *testing.T) { + mock := &mockStore{} + cache := cache.NewFifoCache("test", 10, 10*time.Second) + client := newCachingStorageClient(mock, cache, 1*time.Second) + queries := []chunk.IndexQuery{{ + TableName: "table", + HashValue: "baz", + }} + err := client.QueryPages(context.Background(), queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool { + return true + }) + require.NoError(t, err) + require.EqualValues(t, 1, mock.queries) + + // If we do the query to the cache again, the underlying store shouldn't see it. + err = client.QueryPages(context.Background(), queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool { + return true + }) + require.NoError(t, err) + require.EqualValues(t, 1, mock.queries) +} diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index 260d0fb5632..24bca687ed2 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -29,9 +29,10 @@ func TestIndexBasic(t *testing.T) { } var have []chunk.IndexEntry err := client.QueryPages(context.Background(), entries, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { - for read.Next() { + iter := read.Iterator() + for iter.Next() { have = append(have, chunk.IndexEntry{ - RangeValue: read.RangeValue(), + RangeValue: iter.RangeValue(), }) } return true @@ -170,12 +171,13 @@ func TestQueryPages(t *testing.T) { for run { var have []chunk.IndexEntry err = client.QueryPages(context.Background(), []chunk.IndexQuery{tt.query}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { - for read.Next() { + iter := read.Iterator() + for iter.Next() { have = append(have, chunk.IndexEntry{ TableName: tt.query.TableName, HashValue: tt.query.HashValue, - RangeValue: read.RangeValue(), - Value: read.Value(), + RangeValue: iter.RangeValue(), + Value: iter.Value(), }) } return true diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index d867def5ab3..ecf83f7d622 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -23,6 +23,11 @@ type WriteBatch interface { // ReadBatch represents the results of a QueryPages. type ReadBatch interface { + Iterator() ReadBatchIterator +} + +// ReadBatchIterator is an iterator over a ReadBatch. +type ReadBatchIterator interface { Next() bool RangeValue() []byte Value() []byte diff --git a/pkg/chunk/util/util.go b/pkg/chunk/util/util.go index 6a6c412d303..4470de21b37 100644 --- a/pkg/chunk/util/util.go +++ b/pkg/chunk/util/util.go @@ -3,7 +3,6 @@ package util import ( "bytes" "context" - "strings" "github.com/weaveworks/cortex/pkg/chunk" ) @@ -47,14 +46,26 @@ type filteringBatch struct { chunk.ReadBatch } -func (f *filteringBatch) Next() bool { - for f.ReadBatch.Next() { - rangeValue, value := f.ReadBatch.RangeValue(), f.ReadBatch.Value() +func (f filteringBatch) Iterator() chunk.ReadBatchIterator { + return &filteringBatchIter{ + query: f.query, + ReadBatchIterator: f.ReadBatch.Iterator(), + } +} + +type filteringBatchIter struct { + query chunk.IndexQuery + chunk.ReadBatchIterator +} + +func (f *filteringBatchIter) Next() bool { + for f.ReadBatchIterator.Next() { + rangeValue, value := f.ReadBatchIterator.RangeValue(), f.ReadBatchIterator.Value() - if len(f.query.RangeValuePrefix) != 0 && !strings.HasPrefix(string(rangeValue), string(f.query.RangeValuePrefix)) { + if len(f.query.RangeValuePrefix) != 0 && !bytes.HasPrefix(rangeValue, f.query.RangeValuePrefix) { continue } - if len(f.query.RangeValueStart) != 0 && string(rangeValue) < string(f.query.RangeValueStart) { + if len(f.query.RangeValueStart) != 0 && bytes.Compare(f.query.RangeValueStart, rangeValue) > 0 { continue } if len(f.query.ValueEqual) != 0 && !bytes.Equal(value, f.query.ValueEqual) { From 6aa76092e2464bef3772c0c8d4b6654ccd408b03 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 11 Sep 2018 15:29:45 +0100 Subject: [PATCH 3/9] Don't commit generated code; use gogoproto optimisations on cache proto. Signed-off-by: Tom Wilkie --- .gitignore | 1 + pkg/chunk/storage/caching_storage_client.go | 18 +- .../storage/caching_storage_client.pb.go | 714 ------------------ .../storage/caching_storage_client.proto | 16 +- 4 files changed, 21 insertions(+), 728 deletions(-) delete mode 100644 pkg/chunk/storage/caching_storage_client.pb.go diff --git a/.gitignore b/.gitignore index 12df738aa72..e43b6b51f57 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,5 @@ cmd/lite/lite pkg/ingester/client/cortex.pb.go pkg/querier/frontend/frontend.pb.go pkg/ring/ring.pb.go +pkg/chunk/storage/caching_storage_client.pb.go images/ diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 05fc67d5d53..597a128a007 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -86,6 +86,14 @@ func (c *indexCache) Fetch(ctx context.Context, key string) (ReadBatch, bool, er return ReadBatch{}, false, nil } +func hashKey(key string) string { + hasher := fnv.New64a() + hasher.Write([]byte(key)) // This'll never error. + + // Hex because memcache errors for the bytes produced by the hash. + return hex.EncodeToString(hasher.Sum(nil)) +} + type cachingStorageClient struct { chunk.StorageClient cache IndexCache @@ -147,7 +155,7 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I } } for iter := r.Iterator(); iter.Next(); { - existing.Entries = append(existing.Entries, &Entry{Column: iter.RangeValue(), Value: iter.Value()}) + existing.Entries = append(existing.Entries, Entry{Column: iter.RangeValue(), Value: iter.Value()}) } results[key] = existing return true @@ -200,11 +208,3 @@ func queryKey(q chunk.IndexQuery) string { const sep = "\xff" return q.TableName + sep + q.HashValue } - -func hashKey(key string) string { - hasher := fnv.New64a() - hasher.Write([]byte(key)) // This'll never error. - - // Hex because memcache errors for the bytes produced by the hash. - return hex.EncodeToString(hasher.Sum(nil)) -} diff --git a/pkg/chunk/storage/caching_storage_client.pb.go b/pkg/chunk/storage/caching_storage_client.pb.go deleted file mode 100644 index 47c09c47d91..00000000000 --- a/pkg/chunk/storage/caching_storage_client.pb.go +++ /dev/null @@ -1,714 +0,0 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: caching_storage_client.proto - -/* - Package storage is a generated protocol buffer package. - - It is generated from these files: - caching_storage_client.proto - - It has these top-level messages: - Entry - ReadBatch -*/ -package storage - -import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" -import math "math" - -import bytes "bytes" - -import strings "strings" -import reflect "reflect" - -import io "io" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package - -type Entry struct { - Column []byte `protobuf:"bytes,1,opt,name=Column,json=column,proto3" json:"Column,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=Value,json=value,proto3" json:"Value,omitempty"` -} - -func (m *Entry) Reset() { *m = Entry{} } -func (*Entry) ProtoMessage() {} -func (*Entry) Descriptor() ([]byte, []int) { return fileDescriptorCachingStorageClient, []int{0} } - -func (m *Entry) GetColumn() []byte { - if m != nil { - return m.Column - } - return nil -} - -func (m *Entry) GetValue() []byte { - if m != nil { - return m.Value - } - return nil -} - -type ReadBatch struct { - Entries []*Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"` - Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` - // The time at which the key expires. - Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"` -} - -func (m *ReadBatch) Reset() { *m = ReadBatch{} } -func (*ReadBatch) ProtoMessage() {} -func (*ReadBatch) Descriptor() ([]byte, []int) { return fileDescriptorCachingStorageClient, []int{1} } - -func (m *ReadBatch) GetEntries() []*Entry { - if m != nil { - return m.Entries - } - return nil -} - -func (m *ReadBatch) GetKey() string { - if m != nil { - return m.Key - } - return "" -} - -func (m *ReadBatch) GetExpiry() int64 { - if m != nil { - return m.Expiry - } - return 0 -} - -func init() { - proto.RegisterType((*Entry)(nil), "storage.Entry") - proto.RegisterType((*ReadBatch)(nil), "storage.ReadBatch") -} -func (this *Entry) Equal(that interface{}) bool { - if that == nil { - return this == nil - } - - that1, ok := that.(*Entry) - if !ok { - that2, ok := that.(Entry) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - return this == nil - } else if this == nil { - return false - } - if !bytes.Equal(this.Column, that1.Column) { - return false - } - if !bytes.Equal(this.Value, that1.Value) { - return false - } - return true -} -func (this *ReadBatch) Equal(that interface{}) bool { - if that == nil { - return this == nil - } - - that1, ok := that.(*ReadBatch) - if !ok { - that2, ok := that.(ReadBatch) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - return this == nil - } else if this == nil { - return false - } - if len(this.Entries) != len(that1.Entries) { - return false - } - for i := range this.Entries { - if !this.Entries[i].Equal(that1.Entries[i]) { - return false - } - } - if this.Key != that1.Key { - return false - } - if this.Expiry != that1.Expiry { - return false - } - return true -} -func (this *Entry) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 6) - s = append(s, "&storage.Entry{") - s = append(s, "Column: "+fmt.Sprintf("%#v", this.Column)+",\n") - s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") - s = append(s, "}") - return strings.Join(s, "") -} -func (this *ReadBatch) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 7) - s = append(s, "&storage.ReadBatch{") - if this.Entries != nil { - s = append(s, "Entries: "+fmt.Sprintf("%#v", this.Entries)+",\n") - } - s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") - s = append(s, "Expiry: "+fmt.Sprintf("%#v", this.Expiry)+",\n") - s = append(s, "}") - return strings.Join(s, "") -} -func valueToGoStringCachingStorageClient(v interface{}, typ string) string { - rv := reflect.ValueOf(v) - if rv.IsNil() { - return "nil" - } - pv := reflect.Indirect(rv).Interface() - return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) -} -func (m *Entry) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Entry) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if len(m.Column) > 0 { - dAtA[i] = 0xa - i++ - i = encodeVarintCachingStorageClient(dAtA, i, uint64(len(m.Column))) - i += copy(dAtA[i:], m.Column) - } - if len(m.Value) > 0 { - dAtA[i] = 0x12 - i++ - i = encodeVarintCachingStorageClient(dAtA, i, uint64(len(m.Value))) - i += copy(dAtA[i:], m.Value) - } - return i, nil -} - -func (m *ReadBatch) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *ReadBatch) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if len(m.Entries) > 0 { - for _, msg := range m.Entries { - dAtA[i] = 0xa - i++ - i = encodeVarintCachingStorageClient(dAtA, i, uint64(msg.Size())) - n, err := msg.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n - } - } - if len(m.Key) > 0 { - dAtA[i] = 0x12 - i++ - i = encodeVarintCachingStorageClient(dAtA, i, uint64(len(m.Key))) - i += copy(dAtA[i:], m.Key) - } - if m.Expiry != 0 { - dAtA[i] = 0x18 - i++ - i = encodeVarintCachingStorageClient(dAtA, i, uint64(m.Expiry)) - } - return i, nil -} - -func encodeVarintCachingStorageClient(dAtA []byte, offset int, v uint64) int { - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return offset + 1 -} -func (m *Entry) Size() (n int) { - var l int - _ = l - l = len(m.Column) - if l > 0 { - n += 1 + l + sovCachingStorageClient(uint64(l)) - } - l = len(m.Value) - if l > 0 { - n += 1 + l + sovCachingStorageClient(uint64(l)) - } - return n -} - -func (m *ReadBatch) Size() (n int) { - var l int - _ = l - if len(m.Entries) > 0 { - for _, e := range m.Entries { - l = e.Size() - n += 1 + l + sovCachingStorageClient(uint64(l)) - } - } - l = len(m.Key) - if l > 0 { - n += 1 + l + sovCachingStorageClient(uint64(l)) - } - if m.Expiry != 0 { - n += 1 + sovCachingStorageClient(uint64(m.Expiry)) - } - return n -} - -func sovCachingStorageClient(x uint64) (n int) { - for { - n++ - x >>= 7 - if x == 0 { - break - } - } - return n -} -func sozCachingStorageClient(x uint64) (n int) { - return sovCachingStorageClient(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (this *Entry) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&Entry{`, - `Column:` + fmt.Sprintf("%v", this.Column) + `,`, - `Value:` + fmt.Sprintf("%v", this.Value) + `,`, - `}`, - }, "") - return s -} -func (this *ReadBatch) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&ReadBatch{`, - `Entries:` + strings.Replace(fmt.Sprintf("%v", this.Entries), "Entry", "Entry", 1) + `,`, - `Key:` + fmt.Sprintf("%v", this.Key) + `,`, - `Expiry:` + fmt.Sprintf("%v", this.Expiry) + `,`, - `}`, - }, "") - return s -} -func valueToStringCachingStorageClient(v interface{}) string { - rv := reflect.ValueOf(v) - if rv.IsNil() { - return "nil" - } - pv := reflect.Indirect(rv).Interface() - return fmt.Sprintf("*%v", pv) -} -func (m *Entry) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Entry: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Entry: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Column", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthCachingStorageClient - } - postIndex := iNdEx + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Column = append(m.Column[:0], dAtA[iNdEx:postIndex]...) - if m.Column == nil { - m.Column = []byte{} - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthCachingStorageClient - } - postIndex := iNdEx + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...) - if m.Value == nil { - m.Value = []byte{} - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipCachingStorageClient(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthCachingStorageClient - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *ReadBatch) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: ReadBatch: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: ReadBatch: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthCachingStorageClient - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Entries = append(m.Entries, &Entry{}) - if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthCachingStorageClient - } - postIndex := iNdEx + intStringLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Key = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Expiry", wireType) - } - m.Expiry = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Expiry |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipCachingStorageClient(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthCachingStorageClient - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func skipCachingStorageClient(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - return iNdEx, nil - case 1: - iNdEx += 8 - return iNdEx, nil - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - iNdEx += length - if length < 0 { - return 0, ErrInvalidLengthCachingStorageClient - } - return iNdEx, nil - case 3: - for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowCachingStorageClient - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipCachingStorageClient(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - } - return iNdEx, nil - case 4: - return iNdEx, nil - case 5: - iNdEx += 4 - return iNdEx, nil - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - } - panic("unreachable") -} - -var ( - ErrInvalidLengthCachingStorageClient = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowCachingStorageClient = fmt.Errorf("proto: integer overflow") -) - -func init() { proto.RegisterFile("caching_storage_client.proto", fileDescriptorCachingStorageClient) } - -var fileDescriptorCachingStorageClient = []byte{ - // 229 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x49, 0x4e, 0x4c, 0xce, - 0xc8, 0xcc, 0x4b, 0x8f, 0x2f, 0x2e, 0xc9, 0x2f, 0x4a, 0x4c, 0x4f, 0x8d, 0x4f, 0xce, 0xc9, 0x4c, - 0xcd, 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x87, 0x8a, 0x2a, 0x99, 0x72, 0xb1, - 0xba, 0xe6, 0x95, 0x14, 0x55, 0x0a, 0x89, 0x71, 0xb1, 0x39, 0xe7, 0xe7, 0x94, 0xe6, 0xe6, 0x49, - 0x30, 0x2a, 0x30, 0x6a, 0xf0, 0x04, 0xb1, 0x25, 0x83, 0x79, 0x42, 0x22, 0x5c, 0xac, 0x61, 0x89, - 0x39, 0xa5, 0xa9, 0x12, 0x4c, 0x60, 0x61, 0xd6, 0x32, 0x10, 0x47, 0x29, 0x9e, 0x8b, 0x33, 0x28, - 0x35, 0x31, 0xc5, 0x29, 0xb1, 0x24, 0x39, 0x43, 0x48, 0x83, 0x8b, 0x3d, 0x35, 0xaf, 0xa4, 0x28, - 0x33, 0xb5, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x88, 0x4f, 0x0f, 0x6a, 0xbc, 0x1e, 0xd8, - 0xec, 0x20, 0x98, 0xb4, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0x25, 0xd8, 0x28, 0xce, 0x20, 0x10, - 0x13, 0x64, 0x6d, 0x6a, 0x45, 0x41, 0x66, 0x51, 0xa5, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0x73, 0x10, - 0x94, 0xe7, 0xa4, 0x73, 0xe1, 0xa1, 0x1c, 0xc3, 0x8d, 0x87, 0x72, 0x0c, 0x1f, 0x1e, 0xca, 0x31, - 0x36, 0x3c, 0x92, 0x63, 0x5c, 0xf1, 0x48, 0x8e, 0xf1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, - 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x7c, 0xf1, 0x48, 0x8e, 0xe1, 0xc3, 0x23, 0x39, 0xc6, 0x09, 0x8f, - 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0xbe, 0x32, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x00, 0x1b, 0x46, - 0xe1, 0xf5, 0x00, 0x00, 0x00, -} diff --git a/pkg/chunk/storage/caching_storage_client.proto b/pkg/chunk/storage/caching_storage_client.proto index f10dbe34437..cc133b82884 100644 --- a/pkg/chunk/storage/caching_storage_client.proto +++ b/pkg/chunk/storage/caching_storage_client.proto @@ -1,15 +1,21 @@ syntax = "proto3"; + package storage; +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + message Entry { - bytes Column = 1; - bytes Value = 2; + bytes Column = 1 [(gogoproto.customtype) = "github.com/weaveworks/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; + bytes Value = 2 [(gogoproto.customtype) = "github.com/weaveworks/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; } message ReadBatch { - repeated Entry entries = 1; + repeated Entry entries = 1 [(gogoproto.nullable) = false]; string key = 2; - // The time at which the key expires. + // The time at which the key expires. int64 expiry = 3; -} \ No newline at end of file +} From 5665e44594cbef3847c939faf0a7f3318048ac9b Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 11 Sep 2018 15:35:03 +0100 Subject: [PATCH 4/9] Review feedback & fix lint. Signed-off-by: Tom Wilkie --- pkg/chunk/gcp/storage_client.go | 5 +---- pkg/chunk/storage/caching_storage_client.go | 3 +-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index 13728294ee4..aca2b4198ed 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -219,18 +219,15 @@ func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk } errs := make(chan error) - for _, tq := range tableQueries { - table := s.client.Open(tq.name) - for i := 0; i < len(tq.rows); i += maxRowReads { + for i := 0; i < len(tq.rows); i += maxRowReads { page := tq.rows[i:util.Min(i+maxRowReads, len(tq.rows))] go func(page bigtable.RowList, tq tableQuery) { var processingErr error // rows are returned in key order, not order in row list err := table.ReadRows(ctx, page, func(row bigtable.Row) bool { - query, ok := tq.queries[row.Key()] if !ok { processingErr = errors.WithStack(fmt.Errorf("Got row for unknown chunk: %s", row.Key())) diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 597a128a007..0b7991ba13f 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -174,13 +174,12 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I return nil } -// Iter implements chunk.ReadBatch. +// Iterator implements chunk.ReadBatch. func (b ReadBatch) Iterator() chunk.ReadBatchIterator { return &readBatchIterator{ index: -1, readBatch: b, } - return nil } type readBatchIterator struct { From 4e1ccd913e1b27c1f45dc31b11db725ca1e41229 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 30 Aug 2018 18:12:37 +0100 Subject: [PATCH 5/9] Adjust the histogram buckets for the query path so they're not crazy wrong. Signed-off-by: Tom Wilkie --- pkg/chunk/series_store.go | 12 ++++++------ pkg/ingester/ingester.go | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 082381237f1..03bb07b576a 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -30,22 +30,22 @@ var ( Namespace: "cortex", Name: "chunk_store_series_pre_intersection_per_query", Help: "Distribution of #series (pre intersection) per query.", - // A reasonable upper bound is around 100k - 10*(8^8) = 167k. - Buckets: prometheus.ExponentialBuckets(10, 8, 8), + // A reasonable upper bound is around 100k - 10*(8^5) = 327k. + Buckets: prometheus.ExponentialBuckets(10, 8, 5), }) postIntersectionPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_series_post_intersection_per_query", Help: "Distribution of #series (post intersection) per query.", - // A reasonable upper bound is around 100k - 10*(8^8) = 167k. - Buckets: prometheus.ExponentialBuckets(10, 8, 8), + // A reasonable upper bound is around 100k - 10*(8^5) = 327k. + Buckets: prometheus.ExponentialBuckets(10, 8, 5), }) chunksPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_chunks_per_query", Help: "Distribution of #chunks per query.", - // For v. high cardinality could go upto 1m chunks per query - 10*(8^9) = 1.3m. - Buckets: prometheus.ExponentialBuckets(10, 8, 9), + // For 100k series for 7 week, could be 1.2m - 10*(8^6) = 2.6m. + Buckets: prometheus.ExponentialBuckets(10, 8, 6), }) ) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8d960b77cbb..e4a76157068 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -55,20 +55,20 @@ var ( queriedSamples = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_queried_samples", Help: "The total number of samples returned from queries.", - // Could easily return 10m samples per query - 80*(8^9) = 10.7m. - Buckets: prometheus.ExponentialBuckets(80, 8, 9), + // Could easily return 10m samples per query - 10*(8^7) = 20.9m. + Buckets: prometheus.ExponentialBuckets(10, 8, 7), }) queriedSeries = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_queried_series", Help: "The total number of series returned from queries.", - // A reasonable upper bound is around 100k - 10*(8^8) = 167k. - Buckets: prometheus.ExponentialBuckets(10, 8, 8), + // A reasonable upper bound is around 100k - 10*(8^5) = 327k. + Buckets: prometheus.ExponentialBuckets(10, 8, 5), }) queriedChunks = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_queried_chunks", Help: "The total number of chunks returned from queries.", - // A small number of chunks per series - 10*(4^8) = 655k. - Buckets: prometheus.DefBuckets, + // A small number of chunks per series - 10*(8^6) = 2.6m. + Buckets: prometheus.ExponentialBuckets(10, 8, 6), }) ) From 52ae81300e980e5b9b1a05bbc751f36765606687 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 30 Aug 2018 18:57:06 +0100 Subject: [PATCH 6/9] Don't trace every cache lookup, thats too many Signed-off-by: Tom Wilkie --- pkg/chunk/cache/fifo_cache.go | 12 +----------- pkg/chunk/storage/caching_storage_client.go | 6 +++++- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/chunk/cache/fifo_cache.go b/pkg/chunk/cache/fifo_cache.go index a5a2f8218f7..9d88e357bb3 100644 --- a/pkg/chunk/cache/fifo_cache.go +++ b/pkg/chunk/cache/fifo_cache.go @@ -5,8 +5,6 @@ import ( "sync" "time" - ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -132,9 +130,6 @@ func (c *FifoCache) Stop() error { // Put stores the value against the key. func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) { - span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-put") - defer span.Finish() - c.entriesAdded.Inc() if c.size == 0 { return @@ -202,9 +197,6 @@ func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) { // Get returns the stored value against the key and when the key was last updated. func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) { - span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-get") - defer span.Finish() - c.totalGets.Inc() if c.size == 0 { return nil, false @@ -217,17 +209,15 @@ func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) { if ok { updated := c.entries[index].updated if time.Now().Sub(updated) < c.validity { - span.LogFields(otlog.Bool("hit", true)) + return c.entries[index].value, true } c.totalMisses.Inc() c.staleGets.Inc() - span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", true)) return nil, false } - span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", false)) c.totalMisses.Inc() return nil, false } diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 0b7991ba13f..939d70abf5a 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -8,9 +8,10 @@ import ( "time" proto "github.com/golang/protobuf/proto" + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/cache" chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" @@ -118,6 +119,7 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I cacheableMissed := []chunk.IndexQuery{} missed := map[string]chunk.IndexQuery{} + span, ctx := ot.StartSpanFromContext(ctx, "Index cache lookups") for _, query := range queries { key := queryKey(query) batch, ok, err := s.cache.Fetch(ctx, key) @@ -139,6 +141,8 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I if len(cacheableMissed) == 0 { return nil } + span.LogFields(otlog.Int("queries", len(queries)), otlog.Int("hits", len(queries)-len(missed)), otlog.Int("misses", len(missed))) + span.Finish() var resultsMtx sync.Mutex results := map[string]ReadBatch{} From 6e15c23f768e21f1b0511fe4c5ef56afff81208a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 13 Sep 2018 19:09:43 +0100 Subject: [PATCH 7/9] Batch cache lookups too, and be more careful about collisions. --- pkg/chunk/storage/caching_storage_client.go | 109 +++++++++++++----- .../storage/caching_storage_client_test.go | 74 +++++++++--- 2 files changed, 138 insertions(+), 45 deletions(-) diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 939d70abf5a..2d162ff9bea 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -7,14 +7,14 @@ import ( "sync" "time" + "github.com/go-kit/kit/log/level" proto "github.com/golang/protobuf/proto" - ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/cache" chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" + "github.com/weaveworks/cortex/pkg/util" ) var ( @@ -43,7 +43,7 @@ var ( // IndexCache describes the cache for the Index. type IndexCache interface { Store(ctx context.Context, key string, val ReadBatch) - Fetch(ctx context.Context, key string) (val ReadBatch, ok bool, err error) + Fetch(ctx context.Context, keys []string) (batches []ReadBatch, misses []string) Stop() error } @@ -65,26 +65,69 @@ func (c *indexCache) Store(ctx context.Context, key string, val ReadBatch) { return } -func (c *indexCache) Fetch(ctx context.Context, key string) (ReadBatch, bool, error) { +func (c *indexCache) Fetch(ctx context.Context, keys []string) (batches []ReadBatch, missed []string) { cacheGets.Inc() - found, valBytes, _, err := c.Cache.Fetch(ctx, []string{hashKey(key)}) - if len(found) != 1 || err != nil { - return ReadBatch{}, false, err + // Build a map from hash -> key; NB there can be collisions here; we'll fetch + // the last hash. + hashedKeys := make(map[string]string, len(keys)) + for _, key := range keys { + hashedKeys[hashKey(key)] = key } - var rb ReadBatch - if err := proto.Unmarshal(valBytes[0], &rb); err != nil { - return rb, false, err + // Build a list of hashes; could be less than keys due to collisions. + hashes := make([]string, 0, len(keys)) + for hash := range hashedKeys { + hashes = append(hashes, hash) } - // Make sure the hash(key) is not a collision by looking at the key in the value. - if key == rb.Key && time.Now().Before(time.Unix(0, rb.Expiry)) { + // Look up the hashes in a single batch. If we get an error, we just "miss" all + // of the keys. Eventually I want to push all the errors to the leafs of the cache + // tree, to the caches only return found & missed. + foundHashes, bufs, _, err := c.Cache.Fetch(ctx, hashes) + if err != nil { + level.Warn(util.Logger).Log("msg", "error fetching index entries", "err", err) + return nil, keys + } + + // Reverse the hash, unmarshal the index entries, check we got what we expected + // and that its still valid. + batches = make([]ReadBatch, 0, len(foundHashes)) + for j, foundHash := range foundHashes { + key := hashedKeys[foundHash] + var readBatch ReadBatch + + if err := proto.Unmarshal(bufs[j], &readBatch); err != nil { + level.Warn(util.Logger).Log("msg", "error unmarshalling index entry from cache", "err", err) + cacheCorruptErrs.Inc() + continue + } + + // Make sure the hash(key) is not a collision in the cache by looking at the + // key in the value. + if key != readBatch.Key || time.Now().After(time.Unix(0, readBatch.Expiry)) { + cacheCorruptErrs.Inc() + continue + } + cacheHits.Inc() - return rb, true, nil + batches = append(batches, readBatch) + } + + // Finally work out what we're missing. + misses := make(map[string]struct{}, len(keys)) + for _, key := range keys { + misses[key] = struct{}{} + } + for i := range batches { + delete(misses, batches[i].Key) + } + missed = make([]string, 0, len(misses)) + for miss := range misses { + missed = append(missed, miss) } - return ReadBatch{}, false, nil + return batches, missed } func hashKey(key string) string { @@ -116,21 +159,31 @@ func newCachingStorageClient(client chunk.StorageClient, cache cache.Cache, vali func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { // We cache the entire row, so filter client side. callback = chunk_util.QueryFilter(callback) - cacheableMissed := []chunk.IndexQuery{} - missed := map[string]chunk.IndexQuery{} - span, ctx := ot.StartSpanFromContext(ctx, "Index cache lookups") + // Build list of keys to lookup in the cache. + keys := make([]string, 0, len(queries)) + queriesByKey := make(map[string]chunk.IndexQuery, len(queries)) for _, query := range queries { key := queryKey(query) - batch, ok, err := s.cache.Fetch(ctx, key) - if err != nil { - cacheCorruptErrs.Inc() - } else if ok { - callback(query, batch) - continue - } + keys = append(keys, key) + queriesByKey[key] = query + } + + batches, misses := s.cache.Fetch(ctx, keys) + for _, batch := range batches { + query := queriesByKey[batch.Key] + callback(query, batch) + } - // Just reads the entire row and caches it; filter client side. + if len(misses) == 0 { + return nil + } + + // Build list of cachable queries for the queries that missed the cache. + cacheableMissed := []chunk.IndexQuery{} + missed := map[string]chunk.IndexQuery{} + for _, key := range misses { + query := queriesByKey[key] cacheableMissed = append(cacheableMissed, chunk.IndexQuery{ TableName: query.TableName, HashValue: query.HashValue, @@ -138,12 +191,6 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I missed[key] = query } - if len(cacheableMissed) == 0 { - return nil - } - span.LogFields(otlog.Int("queries", len(queries)), otlog.Int("hits", len(queries)-len(missed)), otlog.Int("misses", len(missed))) - span.Finish() - var resultsMtx sync.Mutex results := map[string]ReadBatch{} expiryTime := time.Now().Add(s.validity) diff --git a/pkg/chunk/storage/caching_storage_client_test.go b/pkg/chunk/storage/caching_storage_client_test.go index 00be68793da..b252545c1ee 100644 --- a/pkg/chunk/storage/caching_storage_client_test.go +++ b/pkg/chunk/storage/caching_storage_client_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/cache" @@ -16,21 +17,29 @@ type mockStore struct { } func (m *mockStore) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { - m.queries++ for _, query := range queries { - callback(query, mockReadBatch{}) + m.queries++ + callback(query, mockReadBatch{ + rangeValue: []byte(query.HashValue), + value: []byte(query.HashValue), + }) } return nil } -type mockReadBatch struct{} +type mockReadBatch struct { + rangeValue, value []byte +} -func (mockReadBatch) Iterator() chunk.ReadBatchIterator { - return &mockReadBatchIterator{} +func (m mockReadBatch) Iterator() chunk.ReadBatchIterator { + return &mockReadBatchIterator{ + mockReadBatch: m, + } } type mockReadBatchIterator struct { consumed bool + mockReadBatch } func (m *mockReadBatchIterator) Next() bool { @@ -41,18 +50,18 @@ func (m *mockReadBatchIterator) Next() bool { return true } -func (mockReadBatchIterator) RangeValue() []byte { - return []byte("foo") +func (m *mockReadBatchIterator) RangeValue() []byte { + return m.mockReadBatch.rangeValue } -func (mockReadBatchIterator) Value() []byte { - return []byte("bar") +func (m *mockReadBatchIterator) Value() []byte { + return m.mockReadBatch.value } -func TestCachingStorageClient(t *testing.T) { - mock := &mockStore{} +func TestCachingStorageClientBasic(t *testing.T) { + store := &mockStore{} cache := cache.NewFifoCache("test", 10, 10*time.Second) - client := newCachingStorageClient(mock, cache, 1*time.Second) + client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{{ TableName: "table", HashValue: "baz", @@ -61,12 +70,49 @@ func TestCachingStorageClient(t *testing.T) { return true }) require.NoError(t, err) - require.EqualValues(t, 1, mock.queries) + assert.EqualValues(t, 1, store.queries) // If we do the query to the cache again, the underlying store shouldn't see it. err = client.QueryPages(context.Background(), queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool { return true }) require.NoError(t, err) - require.EqualValues(t, 1, mock.queries) + assert.EqualValues(t, 1, store.queries) +} + +func TestCachingStorageClient(t *testing.T) { + store := &mockStore{} + cache := cache.NewFifoCache("test", 10, 10*time.Second) + client := newCachingStorageClient(store, cache, 1*time.Second) + queries := []chunk.IndexQuery{ + {TableName: "table", HashValue: "foo"}, + {TableName: "table", HashValue: "bar"}, + {TableName: "table", HashValue: "baz"}, + } + results := 0 + err := client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + iter := batch.Iterator() + for iter.Next() { + assert.Equal(t, query.HashValue, string(iter.RangeValue())) + results++ + } + return true + }) + require.NoError(t, err) + assert.EqualValues(t, len(queries), store.queries) + assert.EqualValues(t, len(queries), results) + + // If we do the query to the cache again, the underlying store shouldn't see it. + results = 0 + err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + iter := batch.Iterator() + for iter.Next() { + assert.Equal(t, query.HashValue, string(iter.RangeValue())) + results++ + } + return true + }) + require.NoError(t, err) + assert.EqualValues(t, len(queries), store.queries) + assert.EqualValues(t, len(queries), results) } From fae388abc2071dc3603e720aa6edcfdcaee710a5 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 14 Sep 2018 11:06:25 +0100 Subject: [PATCH 8/9] Test for query cache key collisions. --- .../storage/caching_storage_client_test.go | 113 ++++++++++++------ 1 file changed, 74 insertions(+), 39 deletions(-) diff --git a/pkg/chunk/storage/caching_storage_client_test.go b/pkg/chunk/storage/caching_storage_client_test.go index b252545c1ee..1cc1f1391e5 100644 --- a/pkg/chunk/storage/caching_storage_client_test.go +++ b/pkg/chunk/storage/caching_storage_client_test.go @@ -14,52 +14,26 @@ import ( type mockStore struct { chunk.StorageClient queries int + results ReadBatch } func (m *mockStore) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { for _, query := range queries { m.queries++ - callback(query, mockReadBatch{ - rangeValue: []byte(query.HashValue), - value: []byte(query.HashValue), - }) + callback(query, m.results) } return nil } -type mockReadBatch struct { - rangeValue, value []byte -} - -func (m mockReadBatch) Iterator() chunk.ReadBatchIterator { - return &mockReadBatchIterator{ - mockReadBatch: m, - } -} - -type mockReadBatchIterator struct { - consumed bool - mockReadBatch -} - -func (m *mockReadBatchIterator) Next() bool { - if m.consumed { - return false - } - m.consumed = true - return true -} - -func (m *mockReadBatchIterator) RangeValue() []byte { - return m.mockReadBatch.rangeValue -} - -func (m *mockReadBatchIterator) Value() []byte { - return m.mockReadBatch.value -} - func TestCachingStorageClientBasic(t *testing.T) { - store := &mockStore{} + store := &mockStore{ + results: ReadBatch{ + Entries: []Entry{{ + Column: []byte("foo"), + Value: []byte("bar"), + }}, + }, + } cache := cache.NewFifoCache("test", 10, 10*time.Second) client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{{ @@ -81,7 +55,14 @@ func TestCachingStorageClientBasic(t *testing.T) { } func TestCachingStorageClient(t *testing.T) { - store := &mockStore{} + store := &mockStore{ + results: ReadBatch{ + Entries: []Entry{{ + Column: []byte("foo"), + Value: []byte("bar"), + }}, + }, + } cache := cache.NewFifoCache("test", 10, 10*time.Second) client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{ @@ -93,7 +74,6 @@ func TestCachingStorageClient(t *testing.T) { err := client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { iter := batch.Iterator() for iter.Next() { - assert.Equal(t, query.HashValue, string(iter.RangeValue())) results++ } return true @@ -107,7 +87,6 @@ func TestCachingStorageClient(t *testing.T) { err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { iter := batch.Iterator() for iter.Next() { - assert.Equal(t, query.HashValue, string(iter.RangeValue())) results++ } return true @@ -116,3 +95,59 @@ func TestCachingStorageClient(t *testing.T) { assert.EqualValues(t, len(queries), store.queries) assert.EqualValues(t, len(queries), results) } + +func TestCachingStorageClientCollision(t *testing.T) { + // These two queries should result in one query to the cache & index, but + // two results, as we cache entire rows. + store := &mockStore{ + results: ReadBatch{ + Entries: []Entry{ + { + Column: []byte("bar"), + Value: []byte("bar"), + }, + { + Column: []byte("baz"), + Value: []byte("baz"), + }, + }, + }, + } + cache := cache.NewFifoCache("test", 10, 10*time.Second) + client := newCachingStorageClient(store, cache, 1*time.Second) + queries := []chunk.IndexQuery{ + {TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar")}, + {TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("baz")}, + } + + var results ReadBatch + err := client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + iter := batch.Iterator() + for iter.Next() { + results.Entries = append(results.Entries, Entry{ + Column: iter.RangeValue(), + Value: iter.Value(), + }) + } + return true + }) + require.NoError(t, err) + assert.EqualValues(t, 1, store.queries) + assert.EqualValues(t, store.results, results) + + // If we do the query to the cache again, the underlying store shouldn't see it. + results = ReadBatch{} + err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + iter := batch.Iterator() + for iter.Next() { + results.Entries = append(results.Entries, Entry{ + Column: iter.RangeValue(), + Value: iter.Value(), + }) + } + return true + }) + require.NoError(t, err) + assert.EqualValues(t, 1, store.queries) + assert.EqualValues(t, store.results, results) +} From f4405617b37443ff1b66e0eeab1357b309124a25 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 14 Sep 2018 11:07:25 +0100 Subject: [PATCH 9/9] Deal with collisions for the query cache key. --- pkg/chunk/storage/caching_storage_client.go | 27 ++++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 2d162ff9bea..770e68c0a0b 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -162,17 +162,19 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I // Build list of keys to lookup in the cache. keys := make([]string, 0, len(queries)) - queriesByKey := make(map[string]chunk.IndexQuery, len(queries)) + queriesByKey := make(map[string][]chunk.IndexQuery, len(queries)) for _, query := range queries { key := queryKey(query) keys = append(keys, key) - queriesByKey[key] = query + queriesByKey[key] = append(queriesByKey[key], query) } batches, misses := s.cache.Fetch(ctx, keys) for _, batch := range batches { - query := queriesByKey[batch.Key] - callback(query, batch) + queries := queriesByKey[batch.Key] + for _, query := range queries { + callback(query, batch) + } } if len(misses) == 0 { @@ -181,14 +183,13 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I // Build list of cachable queries for the queries that missed the cache. cacheableMissed := []chunk.IndexQuery{} - missed := map[string]chunk.IndexQuery{} for _, key := range misses { - query := queriesByKey[key] + // Only need to consider one of the queries as they have the same table & hash. + queries := queriesByKey[key] cacheableMissed = append(cacheableMissed, chunk.IndexQuery{ - TableName: query.TableName, - HashValue: query.HashValue, + TableName: queries[0].TableName, + HashValue: queries[0].HashValue, }) - missed[key] = query } var resultsMtx sync.Mutex @@ -218,9 +219,11 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I resultsMtx.Lock() defer resultsMtx.Unlock() for key, batch := range results { - query := missed[key] - callback(query, batch) - s.cache.Store(ctx, queryKey(query), batch) + queries := queriesByKey[key] + for _, query := range queries { + callback(query, batch) + } + s.cache.Store(ctx, key, batch) } return nil }