From 8b74f9053037c4590ec1512b65288e4869d5e748 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 3 Sep 2018 13:01:16 +0100 Subject: [PATCH] Revert "Merge pull request #971 from grafana/batch-index-lookups" This reverts commit 0d275f02acedb80b6f78d8cdfcef72b0534c4cac, reversing changes made to 1fffffffa71d93f89a0e96f26b7495dbee8851ae. Signed-off-by: Tom Wilkie --- pkg/chunk/aws/storage_client.go | 32 ++--- pkg/chunk/cassandra/storage_client.go | 28 ++-- pkg/chunk/chunk_store.go | 45 +++++- pkg/chunk/gcp/storage_client.go | 149 ++++---------------- pkg/chunk/inmemory_storage_client.go | 47 ++---- pkg/chunk/storage/caching_storage_client.go | 110 ++++++++++----- pkg/chunk/storage/index_test.go | 22 ++- pkg/chunk/storage_client.go | 8 +- pkg/chunk/util/util.go | 77 ---------- 9 files changed, 189 insertions(+), 329 deletions(-) delete mode 100644 pkg/chunk/util/util.go diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 0561b755b99..3bf4b168694 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -29,7 +29,6 @@ import ( "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/chunk" - chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" "github.com/weaveworks/cortex/pkg/util" ) @@ -289,11 +288,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e return backoff.Err() } -func (a storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { - return chunk_util.DoParallelQueries(ctx, a.query, queries, callback) -} - -func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() @@ -363,7 +358,7 @@ func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callba return nil } -func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (*dynamoDBReadResponse, error) { +func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) { backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) @@ -393,10 +388,7 @@ func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput } queryOutput := page.Data().(*dynamodb.QueryOutput) - return &dynamoDBReadResponse{ - i: -1, - items: queryOutput.Items, - }, nil + return dynamoDBReadResponse(queryOutput.Items), nil } return nil, fmt.Errorf("QueryPage error: %s for table %v, last error %v", backoff.Err(), *input.TableName, err) } @@ -780,22 +772,18 @@ func (a storageClient) putS3Chunk(ctx context.Context, key string, buf []byte) e } // Slice of values returned; map key is attribute name -type dynamoDBReadResponse struct { - i int - items []map[string]*dynamodb.AttributeValue -} +type dynamoDBReadResponse []map[string]*dynamodb.AttributeValue -func (b *dynamoDBReadResponse) Next() bool { - b.i++ - return b.i < len(b.items) +func (b dynamoDBReadResponse) Len() int { + return len(b) } -func (b *dynamoDBReadResponse) RangeValue() []byte { - return b.items[b.i][rangeKey].B +func (b dynamoDBReadResponse) RangeValue(i int) []byte { + return b[i][rangeKey].B } -func (b *dynamoDBReadResponse) Value() []byte { - chunkValue, ok := b.items[b.i][valueKey] +func (b dynamoDBReadResponse) Value(i int) []byte { + chunkValue, ok := b[i][valueKey] if !ok { return nil } diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index 7f8aa7830ae..54d40be2f39 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -11,7 +11,6 @@ import ( "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" - "github.com/weaveworks/cortex/pkg/chunk/util" ) const ( @@ -173,11 +172,7 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return nil } -func (s *storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { - return util.DoParallelQueries(ctx, s.query, queries, callback) -} - -func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { var q *gocql.Query switch { @@ -210,7 +205,7 @@ func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callb defer iter.Close() scanner := iter.Scanner() for scanner.Next() { - b := &readBatch{} + var b readBatch if err := scanner.Scan(&b.rangeValue, &b.value); err != nil { return errors.WithStack(err) } @@ -223,26 +218,27 @@ func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callb // readBatch represents a batch of rows read from Cassandra. type readBatch struct { - consumed bool rangeValue []byte value []byte } // Len implements chunk.ReadBatch; in Cassandra we 'stream' results back // one-by-one, so this always returns 1. -func (b *readBatch) Next() bool { - if b.consumed { - return false - } - b.consumed = true - return true +func (readBatch) Len() int { + return 1 } -func (b *readBatch) RangeValue() []byte { +func (b readBatch) RangeValue(index int) []byte { + if index != 0 { + panic("index != 0") + } return b.rangeValue } -func (b *readBatch) Value() []byte { +func (b readBatch) Value(index int) []byte { + if index != 0 { + panic("index != 0") + } return b.value } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index f811a9cf7b8..c8abb23441c 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -347,22 +347,53 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode } func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { + incomingEntries := make(chan []IndexEntry) + incomingErrors := make(chan error) + for _, query := range queries { + go func(query IndexQuery) { + entries, err := c.lookupEntriesByQuery(ctx, query) + if err != nil { + incomingErrors <- err + } else { + incomingEntries <- entries + } + }(query) + } + + // Combine the results into one slice + var entries []IndexEntry + var lastErr error + for i := 0; i < len(queries); i++ { + select { + case incoming := <-incomingEntries: + entries = append(entries, incoming...) + case err := <-incomingErrors: + lastErr = err + } + } + + return entries, lastErr +} + +func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry - err := c.storage.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { - for resp.Next() { + + if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) { + for i := 0; i < resp.Len(); i++ { entries = append(entries, IndexEntry{ TableName: query.TableName, HashValue: query.HashValue, - RangeValue: resp.RangeValue(), - Value: resp.Value(), + RangeValue: resp.RangeValue(i), + Value: resp.Value(i), }) } return true - }) - if err != nil { + }); err != nil { level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err) + return nil, err } - return entries, err + + return entries, nil } func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index da6def35168..18b779c2e9f 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -13,13 +13,11 @@ import ( "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" - chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" "github.com/weaveworks/cortex/pkg/util" ) const ( columnFamily = "f" - columnPrefix = columnFamily + ":" column = "c" separator = "\000" maxRowReads = 100 @@ -164,88 +162,7 @@ func (s *storageClientColumnKey) BatchWrite(ctx context.Context, batch chunk.Wri return nil } -func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { - sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages") - defer sp.Finish() - - // A limitation of this approach is that this only fetches whole rows; but - // whatever, we filter them in the cache on the client. But for unit tests to - // pass, we must do this. - callback = chunk_util.QueryFilter(callback) - - type tableQuery struct { - name string - queries map[string]chunk.IndexQuery - rows bigtable.RowList - } - - tableQueries := map[string]tableQuery{} - for _, query := range queries { - tq, ok := tableQueries[query.TableName] - if !ok { - tq = tableQuery{ - name: query.TableName, - queries: map[string]chunk.IndexQuery{}, - } - } - tq.queries[query.HashValue] = query - tq.rows = append(tq.rows, query.HashValue) - tableQueries[query.TableName] = tq - } - - errs := make(chan error) - - for _, tq := range tableQueries { - - table := s.client.Open(tq.name) - for i := 0; i < len(tq.rows); i += maxRowReads { - - page := tq.rows[i:util.Min(i+maxRowReads, len(tq.rows))] - go func(page bigtable.RowList, tq tableQuery) { - var processingErr error - // rows are returned in key order, not order in row list - err := table.ReadRows(ctx, page, func(row bigtable.Row) bool { - - query, ok := tq.queries[row.Key()] - if !ok { - processingErr = errors.WithStack(fmt.Errorf("Got row for unknown chunk: %s", row.Key())) - return false - } - - val, ok := row[columnFamily] - if !ok { - // There are no matching rows. - return true - } - - return callback(query, &bigtableReadBatchColumnKey{ - i: -1, - items: val, - }) - }) - - if processingErr != nil { - errs <- processingErr - } else { - errs <- err - } - }(page, tq) - } - } - - var lastErr error - for _, tq := range tableQueries { - for i := 0; i < len(tq.rows); i += maxRowReads { - err := <-errs - if err != nil { - lastErr = err - } - } - } - return lastErr -} - -func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClientColumnKey) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { const null = string('\xff') sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) @@ -285,30 +202,31 @@ func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQue val = filteredItems } - callback(&bigtableReadBatchColumnKey{ - i: -1, - items: val, + callback(bigtableReadBatchColumnKey{ + items: val, + columnPrefix: columnFamily + ":", }) return nil } // bigtableReadBatchColumnKey represents a batch of values read from Bigtable. type bigtableReadBatchColumnKey struct { - i int - items []bigtable.ReadItem + items []bigtable.ReadItem + columnPrefix string } -func (b *bigtableReadBatchColumnKey) Next() bool { - b.i++ - return b.i < len(b.items) +func (b bigtableReadBatchColumnKey) Len() int { + return len(b.items) } -func (b *bigtableReadBatchColumnKey) RangeValue() []byte { - return []byte(strings.TrimPrefix(b.items[b.i].Column, columnPrefix)) +func (b bigtableReadBatchColumnKey) RangeValue(index int) []byte { + return []byte( + strings.TrimPrefix(b.items[index].Column, b.columnPrefix), + ) } -func (b *bigtableReadBatchColumnKey) Value() []byte { - return b.items[b.i].Value +func (b bigtableReadBatchColumnKey) Value(index int) []byte { + return b.items[index].Value } func (s *storageClientColumnKey) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { @@ -425,11 +343,7 @@ func (s *storageClientColumnKey) GetChunks(ctx context.Context, input []chunk.Ch return output, nil } -func (s *storageClientV1) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { - return chunk_util.DoParallelQueries(ctx, s.query, queries, callback) -} - -func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClientV1) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { const null = string('\xff') sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) @@ -459,9 +373,7 @@ func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, cal err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { - return callback(&bigtableReadBatchV1{ - row: r, - }) + return callback(bigtableReadBatchV1(r)) } return true @@ -476,27 +388,24 @@ func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, cal // bigtableReadBatchV1 represents a batch of rows read from Bigtable. As the // bigtable interface gives us rows one-by-one, a batch always only contains // a single row. -type bigtableReadBatchV1 struct { - consumed bool - row bigtable.Row -} +type bigtableReadBatchV1 bigtable.Row -func (b *bigtableReadBatchV1) Next() bool { - if b.consumed { - return false - } - b.consumed = true - return true +func (bigtableReadBatchV1) Len() int { + return 1 } - -func (b *bigtableReadBatchV1) RangeValue() []byte { +func (b bigtableReadBatchV1) RangeValue(index int) []byte { + if index != 0 { + panic("index != 0") + } // String before the first separator is the hashkey - parts := strings.SplitN(b.row.Key(), separator, 2) + parts := strings.SplitN(bigtable.Row(b).Key(), separator, 2) return []byte(parts[1]) } - -func (b *bigtableReadBatchV1) Value() []byte { - cf, ok := b.row[columnFamily] +func (b bigtableReadBatchV1) Value(index int) []byte { + if index != 0 { + panic("index != 0") + } + cf, ok := b[columnFamily] if !ok || len(cf) != 1 { panic("bad response from bigtable") } diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index df2f3971711..c95f7e5a4ea 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -158,26 +158,13 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { } // QueryPages implements StorageClient. -func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error { - m.mtx.RLock() - defer m.mtx.RUnlock() - - for _, query := range queries { - err := m.query(ctx, query, func(b ReadBatch) bool { - return callback(query, b) - }) - if err != nil { - return err - } - } - - return nil -} - -func (m *MockStorage) query(ctx context.Context, query IndexQuery, callback func(ReadBatch) (shouldContinue bool)) error { +func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error { logger := util.WithContext(ctx, util.Logger) level.Debug(logger).Log("msg", "QueryPages", "query", query.HashValue) + m.mtx.RLock() + defer m.mtx.RUnlock() + table, ok := m.tables[query.TableName] if !ok { return fmt.Errorf("table not found") @@ -243,14 +230,12 @@ func (m *MockStorage) query(ctx context.Context, query IndexQuery, callback func items = filtered } - result := mockReadBatch{ - index: -1, - } + result := mockReadBatch{} for _, item := range items { - result.items = append(result.items, item) + result = append(result, item) } - callback(&result) + callback(result) return nil } @@ -304,20 +289,16 @@ func (b *mockWriteBatch) Add(tableName, hashValue string, rangeValue []byte, val }{tableName, hashValue, rangeValue, value}) } -type mockReadBatch struct { - index int - items []mockItem -} +type mockReadBatch []mockItem -func (b *mockReadBatch) Next() bool { - b.index++ - return b.index < len(b.items) +func (b mockReadBatch) Len() int { + return len(b) } -func (b *mockReadBatch) RangeValue() []byte { - return b.items[b.index].rangeValue +func (b mockReadBatch) RangeValue(i int) []byte { + return b[i].rangeValue } -func (b *mockReadBatch) Value() []byte { - return b.items[b.index].value +func (b mockReadBatch) Value(i int) []byte { + return b[i].value } diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index b8ff6275d21..5b58078b2af 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -1,13 +1,13 @@ package storage import ( + "bytes" "context" - "sync" + "strings" "time" "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/chunk/cache" - chunk_util "github.com/weaveworks/cortex/pkg/chunk/util" ) type cachingStorageClient struct { @@ -27,53 +27,87 @@ func newCachingStorageClient(client chunk.StorageClient, size int, validity time } } -func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { - // We cache the entire row, so filter client side. - callback = chunk_util.QueryFilter(callback) - cacheableMissed := []chunk.IndexQuery{} - missed := map[string]chunk.IndexQuery{} - - for _, query := range queries { - value, ok := s.cache.Get(ctx, queryKey(query)) - if !ok { - cacheableMissed = append(cacheableMissed, chunk.IndexQuery{ - TableName: query.TableName, - HashValue: query.HashValue, - }) - missed[queryKey(query)] = query - continue - } +func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { + value, ok := s.cache.Get(ctx, queryKey(query)) + if ok { + batches := value.([]chunk.ReadBatch) + filteredBatch := filterBatchByQuery(query, batches) + callback(filteredBatch) - for _, batch := range value.([]chunk.ReadBatch) { - callback(query, batch) - } + return nil } - var resultsMtx sync.Mutex - results := map[string][]chunk.ReadBatch{} - err := s.StorageClient.QueryPages(ctx, cacheableMissed, func(cacheableQuery chunk.IndexQuery, r chunk.ReadBatch) bool { - resultsMtx.Lock() - defer resultsMtx.Unlock() - key := queryKey(cacheableQuery) - results[key] = append(results[key], r) - return true - }) + batches := []chunk.ReadBatch{} + cacheableQuery := chunk.IndexQuery{ + TableName: query.TableName, + HashValue: query.HashValue, + } // Just reads the entire row and caches it. + + err := s.StorageClient.QueryPages(ctx, cacheableQuery, copyingCallback(&batches)) if err != nil { return err } - resultsMtx.Lock() - defer resultsMtx.Unlock() - for key, batches := range results { - query := missed[key] - for _, batch := range batches { - callback(query, batch) - } - } + filteredBatch := filterBatchByQuery(query, batches) + callback(filteredBatch) + + s.cache.Put(ctx, queryKey(query), batches) + return nil } +type readBatch []cell + +func (b readBatch) Len() int { return len(b) } +func (b readBatch) RangeValue(i int) []byte { return b[i].column } +func (b readBatch) Value(i int) []byte { return b[i].value } + +type cell struct { + column []byte + value []byte +} + +func copyingCallback(readBatches *[]chunk.ReadBatch) func(chunk.ReadBatch) bool { + return func(result chunk.ReadBatch) bool { + *readBatches = append(*readBatches, result) + return true + } +} + func queryKey(q chunk.IndexQuery) string { const sep = "\xff" return q.TableName + sep + q.HashValue } + +func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) readBatch { + filter := func([]byte, []byte) bool { return true } + + if len(query.RangeValuePrefix) != 0 { + filter = func(rangeValue []byte, value []byte) bool { + return strings.HasPrefix(string(rangeValue), string(query.RangeValuePrefix)) + } + } + if len(query.RangeValueStart) != 0 { + filter = func(rangeValue []byte, value []byte) bool { + return string(rangeValue) >= string(query.RangeValueStart) + } + } + if len(query.ValueEqual) != 0 { + // This is on top of the existing filters. + existingFilter := filter + filter = func(rangeValue []byte, value []byte) bool { + return existingFilter(rangeValue, value) && bytes.Equal(value, query.ValueEqual) + } + } + + finalBatch := make(readBatch, 0, len(batches)) // On the higher side for most queries. On the lower side for column key schema. + for _, batch := range batches { + for i := 0; i < batch.Len(); i++ { + if filter(batch.RangeValue(i), batch.Value(i)) { + finalBatch = append(finalBatch, cell{column: batch.RangeValue(i), value: batch.Value(i)}) + } + } + } + + return finalBatch +} diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index 260d0fb5632..91d8532bb96 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -21,17 +21,15 @@ func TestIndexBasic(t *testing.T) { // Make sure we get back the correct entries by hash value. for i := 0; i < 30; i++ { - entries := []chunk.IndexQuery{ - { - TableName: tableName, - HashValue: fmt.Sprintf("hash%d", i), - }, + entry := chunk.IndexQuery{ + TableName: tableName, + HashValue: fmt.Sprintf("hash%d", i), } var have []chunk.IndexEntry - err := client.QueryPages(context.Background(), entries, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { - for read.Next() { + err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch) bool { + for j := 0; j < read.Len(); j++ { have = append(have, chunk.IndexEntry{ - RangeValue: read.RangeValue(), + RangeValue: read.RangeValue(j), }) } return true @@ -169,13 +167,13 @@ func TestQueryPages(t *testing.T) { run := true for run { var have []chunk.IndexEntry - err = client.QueryPages(context.Background(), []chunk.IndexQuery{tt.query}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { - for read.Next() { + err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { + for i := 0; i < read.Len(); i++ { have = append(have, chunk.IndexEntry{ TableName: tt.query.TableName, HashValue: tt.query.HashValue, - RangeValue: read.RangeValue(), - Value: read.Value(), + RangeValue: read.RangeValue(i), + Value: read.Value(i), }) } return true diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index d867def5ab3..c86f573b77b 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -9,7 +9,7 @@ type StorageClient interface { BatchWrite(context.Context, WriteBatch) error // For the read path. - QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error + QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error // For storing and retrieving chunks. PutChunks(ctx context.Context, chunks []Chunk) error @@ -23,7 +23,7 @@ type WriteBatch interface { // ReadBatch represents the results of a QueryPages. type ReadBatch interface { - Next() bool - RangeValue() []byte - Value() []byte + Len() int + RangeValue(index int) []byte + Value(index int) []byte } diff --git a/pkg/chunk/util/util.go b/pkg/chunk/util/util.go deleted file mode 100644 index 6a6c412d303..00000000000 --- a/pkg/chunk/util/util.go +++ /dev/null @@ -1,77 +0,0 @@ -package util - -import ( - "bytes" - "context" - "strings" - - "github.com/weaveworks/cortex/pkg/chunk" -) - -// DoSingleQuery is the interface for indexes that don't support batching yet. -type DoSingleQuery func( - ctx context.Context, query chunk.IndexQuery, - callback func(chunk.ReadBatch) bool, -) error - -// DoParallelQueries translates between our interface for query batching, -// and indexes that don't yet support batching. -func DoParallelQueries( - ctx context.Context, doSingleQuery DoSingleQuery, queries []chunk.IndexQuery, - callback func(chunk.IndexQuery, chunk.ReadBatch) bool, -) error { - incomingErrors := make(chan error) - for _, query := range queries { - go func(query chunk.IndexQuery) { - incomingErrors <- doSingleQuery(ctx, query, func(r chunk.ReadBatch) bool { - return callback(query, r) - }) - }(query) - } - var lastErr error - for i := 0; i < len(queries); i++ { - err := <-incomingErrors - if err != nil { - - lastErr = err - } - } - return lastErr -} - -// Callback from an IndexQuery. -type Callback func(chunk.IndexQuery, chunk.ReadBatch) bool - -type filteringBatch struct { - query chunk.IndexQuery - chunk.ReadBatch -} - -func (f *filteringBatch) Next() bool { - for f.ReadBatch.Next() { - rangeValue, value := f.ReadBatch.RangeValue(), f.ReadBatch.Value() - - if len(f.query.RangeValuePrefix) != 0 && !strings.HasPrefix(string(rangeValue), string(f.query.RangeValuePrefix)) { - continue - } - if len(f.query.RangeValueStart) != 0 && string(rangeValue) < string(f.query.RangeValueStart) { - continue - } - if len(f.query.ValueEqual) != 0 && !bytes.Equal(value, f.query.ValueEqual) { - continue - } - - return true - } - - return false -} - -// QueryFilter wraps a callback to ensure the results are filtered correctly; -// useful for the cache and BigTable backend, which only ever fetches the whole -// row. -func QueryFilter(callback Callback) Callback { - return func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { - return callback(query, &filteringBatch{query, batch}) - } -}