diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 1c8138931d5..b633104e5f4 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -275,7 +275,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e return backoff.Err() } -func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() @@ -332,12 +332,15 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c return err } - if getNextPage := callback(response, !page.HasNextPage()); !getNextPage { + if !callback(response) { if err != nil { return fmt.Errorf("QueryPages error: table=%v, err=%v", *input.TableName, page.Error()) } return nil } + if !page.HasNextPage() { + return nil + } } return nil } diff --git a/pkg/chunk/aws/storage_client_test.go b/pkg/chunk/aws/storage_client_test.go deleted file mode 100644 index e8376b8e83c..00000000000 --- a/pkg/chunk/aws/storage_client_test.go +++ /dev/null @@ -1,197 +0,0 @@ -package aws - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/weaveworks/cortex/pkg/chunk" -) - -func TestAWSStorageClient(t *testing.T) { - mockDB := newMockDynamoDB(0, 0) - client := storageClient{ - DynamoDB: mockDB, - queryRequestFn: mockDB.queryRequest, - batchGetItemRequestFn: mockDB.batchGetItemRequest, - batchWriteItemRequestFn: mockDB.batchWriteItemRequest, - } - batch := client.NewWriteBatch() - for i := 0; i < 30; i++ { - batch.Add("table", fmt.Sprintf("hash%d", i), []byte(fmt.Sprintf("range%d", i)), nil) - } - mockDB.createTable("table") - - err := client.BatchWrite(context.Background(), batch) - require.NoError(t, err) - - for i := 0; i < 30; i++ { - entry := chunk.IndexQuery{ - TableName: "table", - HashValue: fmt.Sprintf("hash%d", i), - } - var have []chunk.IndexEntry - err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch, lastPage bool) bool { - for j := 0; j < read.Len(); j++ { - have = append(have, chunk.IndexEntry{ - RangeValue: read.RangeValue(j), - }) - } - return !lastPage - }) - require.NoError(t, err) - require.Equal(t, []chunk.IndexEntry{ - {RangeValue: []byte(fmt.Sprintf("range%d", i))}, - }, have) - } -} - -func TestAWSStorageClientQueryPages(t *testing.T) { - entries := []chunk.IndexEntry{ - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("bar:1"), - Value: []byte("10"), - }, - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("bar:2"), - Value: []byte("20"), - }, - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("bar:3"), - Value: []byte("30"), - }, - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("baz:1"), - Value: []byte("10"), - }, - { - TableName: "table", - HashValue: "foo", - RangeValue: []byte("baz:2"), - Value: []byte("20"), - }, - { - TableName: "table", - HashValue: "flip", - RangeValue: []byte("bar:1"), - Value: []byte("abc"), - }, - { - TableName: "table", - HashValue: "flip", - RangeValue: []byte("bar:2"), - Value: []byte("abc"), - }, - { - TableName: "table", - HashValue: "flip", - RangeValue: []byte("bar:3"), - Value: []byte("abc"), - }, - } - - tests := []struct { - name string - query chunk.IndexQuery - provisionedErr int - want []chunk.IndexEntry - }{ - { - "check HashValue only", - chunk.IndexQuery{ - TableName: "table", - HashValue: "flip", - }, - 0, - []chunk.IndexEntry{entries[5], entries[6], entries[7]}, - }, - { - "check RangeValueStart", - chunk.IndexQuery{ - TableName: "table", - HashValue: "foo", - RangeValueStart: []byte("bar:2"), - }, - 0, - []chunk.IndexEntry{entries[1], entries[2], entries[3], entries[4]}, - }, - { - "check RangeValuePrefix", - chunk.IndexQuery{ - TableName: "table", - HashValue: "foo", - RangeValuePrefix: []byte("baz:"), - }, - 0, - []chunk.IndexEntry{entries[3], entries[4]}, - }, - { - "check ValueEqual", - chunk.IndexQuery{ - TableName: "table", - HashValue: "foo", - RangeValuePrefix: []byte("bar"), - ValueEqual: []byte("20"), - }, - 0, - []chunk.IndexEntry{entries[1]}, - }, - { - "check retry logic", - chunk.IndexQuery{ - TableName: "table", - HashValue: "foo", - RangeValuePrefix: []byte("bar"), - ValueEqual: []byte("20"), - }, - 2, - []chunk.IndexEntry{entries[1]}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - dynamoDB := newMockDynamoDB(0, tt.provisionedErr) - client := storageClient{ - DynamoDB: dynamoDB, - queryRequestFn: dynamoDB.queryRequest, - batchGetItemRequestFn: dynamoDB.batchGetItemRequest, - batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest, - } - - batch := client.NewWriteBatch() - for _, entry := range entries { - batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) - } - dynamoDB.createTable("table") - - err := client.BatchWrite(context.Background(), batch) - require.NoError(t, err) - - var have []chunk.IndexEntry - err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch, lastPage bool) bool { - for i := 0; i < read.Len(); i++ { - have = append(have, chunk.IndexEntry{ - TableName: tt.query.TableName, - HashValue: tt.query.HashValue, - RangeValue: read.RangeValue(i), - Value: read.Value(i), - }) - } - return !lastPage - }) - require.NoError(t, err) - require.Equal(t, tt.want, have) - }) - } -} diff --git a/pkg/chunk/cassandra/fixtures.go b/pkg/chunk/cassandra/fixtures.go new file mode 100644 index 00000000000..a70132b6072 --- /dev/null +++ b/pkg/chunk/cassandra/fixtures.go @@ -0,0 +1,80 @@ +package cassandra + +import ( + "context" + "flag" + "os" + + "github.com/prometheus/common/model" + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/util" +) + +// GOCQL doesn't provide nice mocks, so we use a real Cassandra instance. +// To enable these tests: +// $ docker run --name cassandra --rm -p 9042:9042 cassandra:3.11 +// $ CASSANDRA_TEST_ADDRESSES=localhost:9042 go test ./pkg/chunk/storage + +type fixture struct { + name string + storageClient chunk.StorageClient + tableClient chunk.TableClient + schemaConfig chunk.SchemaConfig +} + +func (f fixture) Name() string { + return f.name +} + +func (f fixture) Clients() (chunk.StorageClient, chunk.TableClient, chunk.SchemaConfig, error) { + return f.storageClient, f.tableClient, f.schemaConfig, nil +} + +func (f fixture) Teardown() error { + return nil +} + +// Fixtures for unit testing Cassandra integration. +func Fixtures() ([]chunk.Fixture, error) { + addresses := os.Getenv("CASSANDRA_TEST_ADDRESSES") + if addresses == "" { + return nil, nil + } + + cfg := Config{ + addresses: addresses, + keyspace: "test", + consistency: "QUORUM", + replicationFactor: 1, + } + + // Get a SchemaConfig with the defaults. + flagSet := flag.NewFlagSet("flags", flag.PanicOnError) + schemaConfig := chunk.SchemaConfig{} + schemaConfig.RegisterFlags(flagSet) + err := flagSet.Parse([]string{}) + if err != nil { + return nil, err + } + schemaConfig.IndexTables.From = util.NewDayValue(model.Now()) + schemaConfig.ChunkTables.From = util.NewDayValue(model.Now()) + + storageClient, err := NewStorageClient(cfg, schemaConfig) + if err != nil { + return nil, err + } + + tableClient, err := NewTableClient(context.Background(), cfg) + if err != nil { + return nil, err + } + + return []chunk.Fixture{ + fixture{ + name: "Cassandra", + storageClient: storageClient, + tableClient: tableClient, + schemaConfig: schemaConfig, + }, + }, nil +} diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index fe180b856ea..e24c5d09d87 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -8,6 +8,7 @@ import ( "time" "github.com/gocql/gocql" + "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" ) @@ -35,11 +36,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) session() (*gocql.Session, error) { consistency, err := gocql.ParseConsistencyWrapper(cfg.consistency) if err != nil { - return nil, err + return nil, errors.WithStack(err) } if err := cfg.createKeyspace(); err != nil { - return nil, err + return nil, errors.WithStack(err) } cluster := gocql.NewCluster(strings.Split(cfg.addresses, ",")...) @@ -58,17 +59,18 @@ func (cfg *Config) createKeyspace() error { cluster.Timeout = 20 * time.Second session, err := cluster.CreateSession() if err != nil { - return err + return errors.WithStack(err) } defer session.Close() - return session.Query(fmt.Sprintf( + err = session.Query(fmt.Sprintf( `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %d }`, cfg.keyspace, cfg.replicationFactor)).Exec() + return errors.WithStack(err) } // storageClient implements chunk.storageClient for GCP. @@ -82,7 +84,7 @@ type storageClient struct { func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { session, err := cfg.session() if err != nil { - return nil, err + return nil, errors.WithStack(err) } return &storageClient{ @@ -96,37 +98,66 @@ func (s *storageClient) Close() { s.session.Close() } +// Cassandra batching isn't really useful in this case, its more to do multiple +// atomic writes. Therefore we just do a bunch of writes in parallel. type writeBatch struct { - b *gocql.Batch + entries []chunk.IndexEntry } func (s *storageClient) NewWriteBatch() chunk.WriteBatch { - return writeBatch{ - b: s.session.NewBatch(gocql.UnloggedBatch), - } + return &writeBatch{} } -func (b writeBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { - b.b.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, ?, ?)", tableName), - hashValue, rangeValue, value) +func (b *writeBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { + b.entries = append(b.entries, chunk.IndexEntry{ + TableName: tableName, + HashValue: hashValue, + RangeValue: rangeValue, + Value: value, + }) } func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { - cassandraBatch := batch.(writeBatch) - return s.session.ExecuteBatch(cassandraBatch.b.WithContext(ctx)) + b := batch.(*writeBatch) + + for _, entry := range b.entries { + err := s.session.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, ?, ?)", + entry.TableName), entry.HashValue, entry.RangeValue, entry.Value).WithContext(ctx).Exec() + if err != nil { + return errors.WithStack(err) + } + } + + return nil } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { var q *gocql.Query - if len(query.RangeValuePrefix) > 0 { - q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?", query.TableName), - query.HashValue, query.RangeValuePrefix) - } else if len(query.RangeValueStart) > 0 { - q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?", query.TableName), - query.HashValue, query.RangeValueStart) - } else { - q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ?", query.TableName), - query.HashValue) + + switch { + case len(query.RangeValuePrefix) > 0 && query.ValueEqual == nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ?", + query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff')) + + case len(query.RangeValuePrefix) > 0 && query.ValueEqual != nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ? AND value = ? ALLOW FILTERING", + query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff'), query.ValueEqual) + + case len(query.RangeValueStart) > 0 && query.ValueEqual == nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?", + query.TableName), query.HashValue, query.RangeValueStart) + + case len(query.RangeValueStart) > 0 && query.ValueEqual != nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND value = ? ALLOW FILTERING", + query.TableName), query.HashValue, query.RangeValueStart, query.ValueEqual) + + case query.ValueEqual == nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ?", + query.TableName), query.HashValue) + + case query.ValueEqual != nil: + q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? value = ? ALLOW FILTERING", + query.TableName), query.HashValue, query.ValueEqual) } iter := q.WithContext(ctx).Iter() @@ -135,13 +166,13 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, for scanner.Next() { var b readBatch if err := scanner.Scan(&b.rangeValue, &b.value); err != nil { - return err + return errors.WithStack(err) } - if callback(b, false) { + if !callback(b) { return nil } } - return scanner.Err() + return errors.WithStack(scanner.Err()) } // readBatch represents a batch of rows read from Cassandra. @@ -171,22 +202,24 @@ func (b readBatch) Value(index int) []byte { } func (s *storageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { - b := gocql.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - for i := range chunks { // Encode the chunk first - checksum is calculated as a side effect. buf, err := chunks[i].Encode() if err != nil { - return err + return errors.WithStack(err) } key := chunks[i].ExternalKey() tableName := s.schemaCfg.ChunkTables.TableFor(chunks[i].From) // Must provide a range key, even though its not useds - hence 0x00. - b.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)", tableName), key, buf) + q := s.session.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)", + tableName), key, buf) + if err := q.WithContext(ctx).Exec(); err != nil { + return errors.WithStack(err) + } } - return s.session.ExecuteBatch(b) + return nil } func (s *storageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { @@ -223,7 +256,7 @@ func (s *storageClient) getChunk(ctx context.Context, input chunk.Chunk) (chunk. var buf []byte if err := s.session.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()). WithContext(ctx).Scan(&buf); err != nil { - return input, err + return input, errors.WithStack(err) } decodeContext := chunk.NewDecodeContext() err := input.Decode(decodeContext, buf) diff --git a/pkg/chunk/cassandra/table_client.go b/pkg/chunk/cassandra/table_client.go index 3feafd0047a..45865d2bba8 100644 --- a/pkg/chunk/cassandra/table_client.go +++ b/pkg/chunk/cassandra/table_client.go @@ -6,6 +6,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/gocql/gocql" + "github.com/pkg/errors" "github.com/weaveworks/cortex/pkg/chunk" ) @@ -19,7 +20,7 @@ type tableClient struct { func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) { session, err := cfg.session() if err != nil { - return nil, err + return nil, errors.WithStack(err) } return &tableClient{ cfg: cfg, @@ -30,7 +31,7 @@ func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { md, err := c.session.KeyspaceMetadata(c.cfg.keyspace) if err != nil { - return nil, err + return nil, errors.WithStack(err) } result := []string{} for name := range md.Tables { @@ -40,13 +41,14 @@ func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { } func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error { - return c.session.Query(fmt.Sprintf(` + err := c.session.Query(fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( hash text, range blob, value blob, PRIMARY KEY (hash, range) )`, desc.Name)).WithContext(ctx).Exec() + return errors.WithStack(err) } func (c *tableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, status string, err error) { diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 6dedf14d4b0..d277c9c9522 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -466,7 +466,7 @@ func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { var entries []IndexEntry - if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch, lastPage bool) (shouldContinue bool) { + if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) { for i := 0; i < resp.Len(); i++ { entries = append(entries, IndexEntry{ TableName: query.TableName, @@ -475,7 +475,7 @@ func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]I Value: resp.Value(i), }) } - return !lastPage + return true }); err != nil { level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err) return nil, err diff --git a/pkg/chunk/gcp/storage_client.go b/pkg/chunk/gcp/storage_client.go index d3666fc474a..b2d256b760a 100644 --- a/pkg/chunk/gcp/storage_client.go +++ b/pkg/chunk/gcp/storage_client.go @@ -1,6 +1,7 @@ package gcp import ( + "bytes" "context" "flag" "fmt" @@ -109,13 +110,24 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) return nil } -func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) defer sp.Finish() table := s.client.Open(query.TableName) var rowRange bigtable.RowRange + + /* BigTable only seems to support regex match on cell values, so doing it + client side for now + readOpts := []bigtable.ReadOption{ + bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)), + } + if query.ValueEqual != nil { + readOpts = append(readOpts, bigtable.RowFilter(bigtable.ValueFilter(string(query.ValueEqual)))) + } + */ + if len(query.RangeValuePrefix) > 0 { rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) } else if len(query.RangeValueStart) > 0 { @@ -125,8 +137,11 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, } err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { - return callback(bigtableReadBatch(r), false) - }, bigtable.RowFilter(bigtable.FamilyFilter(columnFamily))) + if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { + return callback(bigtableReadBatch(r)) + } + return true + }) if err != nil { sp.LogFields(otlog.String("error", err.Error())) return errors.WithStack(err) diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index 443c9f12b08..3dea4728db9 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -158,7 +158,7 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { } // QueryPages implements StorageClient. -func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error { +func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error { logger := util.WithContext(ctx, util.Logger) m.mtx.RLock() defer m.mtx.RUnlock() @@ -232,7 +232,7 @@ func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback result = append(result, item) } - callback(result, true) + callback(result) return nil } diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go new file mode 100644 index 00000000000..b6cdc0f172c --- /dev/null +++ b/pkg/chunk/storage/index_test.go @@ -0,0 +1,184 @@ +package storage + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk" +) + +func TestIndexBasic(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) { + // Write out 30 entries, into different hash and range values. + batch := client.NewWriteBatch() + for i := 0; i < 30; i++ { + batch.Add(tableName, fmt.Sprintf("hash%d", i), []byte(fmt.Sprintf("range%d", i)), nil) + } + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + // Make sure we get back the correct entries by hash value. + for i := 0; i < 30; i++ { + entry := chunk.IndexQuery{ + TableName: tableName, + HashValue: fmt.Sprintf("hash%d", i), + } + var have []chunk.IndexEntry + err := client.QueryPages(context.Background(), entry, func(read chunk.ReadBatch) bool { + for j := 0; j < read.Len(); j++ { + have = append(have, chunk.IndexEntry{ + RangeValue: read.RangeValue(j), + }) + } + return true + }) + require.NoError(t, err) + require.Equal(t, []chunk.IndexEntry{ + {RangeValue: []byte(fmt.Sprintf("range%d", i))}, + }, have) + } + }) +} + +var entries = []chunk.IndexEntry{ + { + TableName: tableName, + HashValue: "foo", + RangeValue: []byte("bar:1"), + Value: []byte("10"), + }, + { + TableName: tableName, + HashValue: "foo", + RangeValue: []byte("bar:2"), + Value: []byte("20"), + }, + { + TableName: tableName, + HashValue: "foo", + RangeValue: []byte("bar:3"), + Value: []byte("30"), + }, + { + TableName: tableName, + HashValue: "foo", + RangeValue: []byte("baz:1"), + Value: []byte("10"), + }, + { + TableName: tableName, + HashValue: "foo", + RangeValue: []byte("baz:2"), + Value: []byte("20"), + }, + { + TableName: tableName, + HashValue: "flip", + RangeValue: []byte("bar:1"), + Value: []byte("abc"), + }, + { + TableName: tableName, + HashValue: "flip", + RangeValue: []byte("bar:2"), + Value: []byte("abc"), + }, + { + TableName: tableName, + HashValue: "flip", + RangeValue: []byte("bar:3"), + Value: []byte("abc"), + }, +} + +func TestQueryPages(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) { + batch := client.NewWriteBatch() + for _, entry := range entries { + batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } + + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + tests := []struct { + name string + query chunk.IndexQuery + provisionedErr int + want []chunk.IndexEntry + }{ + { + "check HashValue only", + chunk.IndexQuery{ + TableName: tableName, + HashValue: "flip", + }, + 0, + []chunk.IndexEntry{entries[5], entries[6], entries[7]}, + }, + { + "check RangeValueStart", + chunk.IndexQuery{ + TableName: tableName, + HashValue: "foo", + RangeValueStart: []byte("bar:2"), + }, + 0, + []chunk.IndexEntry{entries[1], entries[2], entries[3], entries[4]}, + }, + { + "check RangeValuePrefix", + chunk.IndexQuery{ + TableName: tableName, + HashValue: "foo", + RangeValuePrefix: []byte("baz:"), + }, + 0, + []chunk.IndexEntry{entries[3], entries[4]}, + }, + { + "check ValueEqual", + chunk.IndexQuery{ + TableName: tableName, + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEqual: []byte("20"), + }, + 0, + []chunk.IndexEntry{entries[1]}, + }, + { + "check retry logic", + chunk.IndexQuery{ + TableName: tableName, + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEqual: []byte("20"), + }, + 2, + []chunk.IndexEntry{entries[1]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var have []chunk.IndexEntry + err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, chunk.IndexEntry{ + TableName: tt.query.TableName, + HashValue: tt.query.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return true + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) + }) + } + }) +} diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index 7691ae41aec..34457ff992f 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -12,106 +12,57 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/cortex/pkg/chunk" - "github.com/weaveworks/cortex/pkg/chunk/aws" - "github.com/weaveworks/cortex/pkg/chunk/gcp" - promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" ) -var fixtures = append(aws.Fixtures, gcp.Fixtures...) - -func TestStoreChunks(t *testing.T) { - for _, fixture := range fixtures { - t.Run(fixture.Name(), func(t *testing.T) { - storageClient, tableClient, schemaConfig, err := fixture.Clients() - require.NoError(t, err) - defer fixture.Teardown() - - tableManager, err := chunk.NewTableManager(schemaConfig, 12*time.Hour, tableClient) - require.NoError(t, err) - - err = tableManager.SyncTables(context.Background()) - require.NoError(t, err) - - testStorageClientChunks(t, storageClient) - }) - } -} - -func testStorageClientChunks(t *testing.T, client chunk.StorageClient) { - const batchSize = 50 - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() +func TestChunksBasic(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.StorageClient) { + const batchSize = 50 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Write a few batches of chunks. + written := []string{} + for i := 0; i < 50; i++ { + chunks := []chunk.Chunk{} + for j := 0; j < batchSize; j++ { + chunk := dummyChunkFor(model.Now(), model.Metric{ + model.MetricNameLabel: "foo", + "index": model.LabelValue(strconv.Itoa(i*batchSize + j)), + }) + chunks = append(chunks, chunk) + _, err := chunk.Encode() // Need to encode it, side effect calculates crc + require.NoError(t, err) + written = append(written, chunk.ExternalKey()) + } - // Write a few batches of chunks. - written := []string{} - for i := 0; i < 50; i++ { - chunks := []chunk.Chunk{} - for j := 0; j < batchSize; j++ { - chunk := dummyChunkFor(model.Now(), model.Metric{ - model.MetricNameLabel: "foo", - "index": model.LabelValue(strconv.Itoa(i*batchSize + j)), - }) - chunks = append(chunks, chunk) - _, err := chunk.Encode() // Need to encode it, side effect calculates crc + err := client.PutChunks(ctx, chunks) require.NoError(t, err) - written = append(written, chunk.ExternalKey()) } - err := client.PutChunks(ctx, chunks) - require.NoError(t, err) - } - - // Get a few batches of chunks. - for i := 0; i < 50; i++ { - keysToGet := map[string]struct{}{} - chunksToGet := []chunk.Chunk{} - for len(chunksToGet) < batchSize { - key := written[rand.Intn(len(written))] - if _, ok := keysToGet[key]; ok { - continue + // Get a few batches of chunks. + for i := 0; i < 50; i++ { + keysToGet := map[string]struct{}{} + chunksToGet := []chunk.Chunk{} + for len(chunksToGet) < batchSize { + key := written[rand.Intn(len(written))] + if _, ok := keysToGet[key]; ok { + continue + } + keysToGet[key] = struct{}{} + chunk, err := chunk.ParseExternalKey(userID, key) + require.NoError(t, err) + chunksToGet = append(chunksToGet, chunk) } - keysToGet[key] = struct{}{} - chunk, err := chunk.ParseExternalKey(userID, key) - require.NoError(t, err) - chunksToGet = append(chunksToGet, chunk) - } - chunksWeGot, err := client.GetChunks(ctx, chunksToGet) - require.NoError(t, err) - require.Equal(t, len(chunksToGet), len(chunksWeGot)) + chunksWeGot, err := client.GetChunks(ctx, chunksToGet) + require.NoError(t, err) + require.Equal(t, len(chunksToGet), len(chunksWeGot)) - sort.Sort(chunk.ByKey(chunksToGet)) - sort.Sort(chunk.ByKey(chunksWeGot)) - for j := 0; j < len(chunksWeGot); j++ { - require.Equal(t, chunksToGet[i].ExternalKey(), chunksWeGot[i].ExternalKey(), strconv.Itoa(i)) + sort.Sort(chunk.ByKey(chunksToGet)) + sort.Sort(chunk.ByKey(chunksWeGot)) + for j := 0; j < len(chunksWeGot); j++ { + require.Equal(t, chunksToGet[i].ExternalKey(), chunksWeGot[i].ExternalKey(), strconv.Itoa(i)) + } } - } -} - -const userID = "userID" - -func dummyChunk(now model.Time) chunk.Chunk { - return dummyChunkFor(now, model.Metric{ - model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", }) } - -func dummyChunkFor(now model.Time, metric model.Metric) chunk.Chunk { - cs, _ := promchunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) - chunk := chunk.NewChunk( - userID, - metric.Fingerprint(), - metric, - cs[0], - now.Add(-time.Hour), - now, - ) - // Force checksum calculation. - _, err := chunk.Encode() - if err != nil { - panic(err) - } - return chunk -} diff --git a/pkg/chunk/storage/utils_test.go b/pkg/chunk/storage/utils_test.go new file mode 100644 index 00000000000..989a6222fc5 --- /dev/null +++ b/pkg/chunk/storage/utils_test.go @@ -0,0 +1,77 @@ +package storage + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/aws" + "github.com/weaveworks/cortex/pkg/chunk/cassandra" + "github.com/weaveworks/cortex/pkg/chunk/gcp" + promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" +) + +const ( + userID = "userID" + tableName = "test" +) + +type storageClientTest func(*testing.T, chunk.StorageClient) + +func forAllFixtures(t *testing.T, storageClientTest storageClientTest) { + fixtures := append(aws.Fixtures, gcp.Fixtures...) + + cassandraFixtures, err := cassandra.Fixtures() + require.NoError(t, err) + fixtures = append(fixtures, cassandraFixtures...) + + for _, fixture := range fixtures { + t.Run(fixture.Name(), func(t *testing.T) { + storageClient, tableClient, schemaConfig, err := fixture.Clients() + require.NoError(t, err) + defer fixture.Teardown() + + tableManager, err := chunk.NewTableManager(schemaConfig, 12*time.Hour, tableClient) + require.NoError(t, err) + + err = tableManager.SyncTables(context.Background()) + require.NoError(t, err) + + err = tableClient.CreateTable(context.Background(), chunk.TableDesc{ + Name: tableName, + }) + require.NoError(t, err) + + storageClientTest(t, storageClient) + }) + } +} + +func dummyChunk(now model.Time) chunk.Chunk { + return dummyChunkFor(now, model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + "toms": "code", + }) +} + +func dummyChunkFor(now model.Time, metric model.Metric) chunk.Chunk { + cs, _ := promchunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) + chunk := chunk.NewChunk( + userID, + metric.Fingerprint(), + metric, + cs[0], + now.Add(-time.Hour), + now, + ) + // Force checksum calculation. + _, err := chunk.Encode() + if err != nil { + panic(err) + } + return chunk +} diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index 7ba0eace983..662fe62afe1 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -9,7 +9,7 @@ type StorageClient interface { BatchWrite(context.Context, WriteBatch) error // For the read path. - QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error + QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch) (shouldContinue bool)) error // For storing and retrieving chunks. PutChunks(ctx context.Context, chunks []Chunk) error